• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 26195612357

20 May 2026 11:19PM UTC coverage: 49.859% (-2.3%) from 52.2%
26195612357

push

github

hariharan-devarajan
feat(aggregator): improve system metrics scanning and persistence error handling

16041 of 43831 branches covered (36.6%)

Branch coverage included in aggregate %.

6 of 17 new or added lines in 2 files covered. (35.29%)

1072 existing lines in 104 files now uncovered.

21423 of 31309 relevant lines covered (68.42%)

13054.31 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

59.75
/src/dftracer/utils/binaries/dftracer_reader.cpp
1
#include <dftracer/utils/core/common/config.h>
2
#include <dftracer/utils/core/common/constants.h>
3
#include <dftracer/utils/core/common/filesystem.h>
4
#include <dftracer/utils/core/common/logging.h>
5
#include <dftracer/utils/core/coro/task.h>
6
#include <dftracer/utils/core/io/io.h>
7
#include <dftracer/utils/utilities/composites/composites.h>
8
#include <dftracer/utils/utilities/indexer/internal/indexer.h>
9
#include <dftracer/utils/utilities/indexer/internal/indexer_factory.h>
10
#include <dftracer/utils/utilities/reader/internal/reader.h>
11
#include <dftracer/utils/utilities/reader/internal/reader_factory.h>
12
#include <dftracer/utils/utilities/reader/internal/stream.h>
13
#include <dftracer/utils/utilities/reader/internal/stream_config.h>
14
#include <dftracer/utils/utilities/reader/internal/stream_type.h>
15
#include <fcntl.h>
16
#include <unistd.h>
17

18
#include <argparse/argparse.hpp>
19
#include <cstdio>
20
#include <cstdlib>
21
#include <cstring>
22
using namespace dftracer::utils;
23
using namespace dftracer::utils::utilities::indexer::internal;
24
using namespace dftracer::utils::utilities::reader::internal;
25

26
static coro::CoroTask<int> run_reader(const std::string &gz_path,
4!
27
                                      const std::string &index_path,
28
                                      std::size_t checkpoint_size,
29
                                      bool force_rebuild, bool check_rebuild,
30
                                      const std::string &read_mode,
31
                                      std::size_t read_buffer_size,
32
                                      int64_t start, int64_t end) {
33
    const std::string index_root = normalize_index_root(index_path);
34

35
    // Create indexer first
36
    std::shared_ptr<Indexer> indexer;
37
    try {
38
        // Check whether the root-local .dftindex store already exists.
39
        if (!fs::exists(index_root)) {
40
            if (check_rebuild) {
41
                DFTRACER_UTILS_LOG_ERROR(
42
                    "Index store '%s' does not exist, cannot check",
43
                    index_root.c_str());
44
                co_return 1;
45
            }
46
            DFTRACER_UTILS_LOG_DEBUG("Index store '%s' does not exist",
47
                                     index_root.c_str());
48
            DFTRACER_UTILS_LOG_DEBUG("%s", "Will create new index store");
49
            force_rebuild = true;
50
        }
51

52
        // Use IndexerFactory to create appropriate indexer
53
        indexer = IndexerFactory::create(gz_path, index_path, checkpoint_size,
54
                                         force_rebuild);
55

56
        if (check_rebuild) {
57
            if (!indexer->need_rebuild()) {
58
                DFTRACER_UTILS_LOG_DEBUG(
59
                    "%s", "Index is up to date, no rebuild needed");
60
                co_return 0;
61
            }
62
        }
63

64
        if (force_rebuild) {
65
            if (fs::exists(index_root)) {
66
                DFTRACER_UTILS_LOG_DEBUG("Removing existing index store: %s",
67
                                         index_root.c_str());
68
                fs::remove_all(index_root);
69
            }
70
            // Recreate the store after removing the old .dftindex root.
71
            indexer = IndexerFactory::create(gz_path, index_path,
72
                                             checkpoint_size, true);
73
            DFTRACER_UTILS_LOG_INFO("Building index store for file: %s",
74
                                    gz_path.c_str());
75
            co_await indexer->build_async();
76
        }
77
    } catch (const std::runtime_error &e) {
78
        DFTRACER_UTILS_LOG_ERROR("Indexer error: %s", e.what());
79
        co_return 1;
80
    }
81

82
    // read operations
83
    try {
84
        // Use ReaderFactory to create appropriate reader, sharing
85
        // ownership of indexer
86
        auto reader = ReaderFactory::create(indexer);
87

88
        if (read_mode.find("bytes") == std::string::npos) {
89
            std::size_t start_line =
90
                (start == -1) ? 1 : static_cast<std::size_t>(start);
91
            std::size_t end_line = static_cast<std::size_t>(end);
92
            if (end == -1) {
93
                end_line = reader->get_num_lines();
94
            }
95

96
            DFTRACER_UTILS_LOG_DEBUG("Reading lines from %zu to %zu",
97
                                     start_line, end_line);
98

99
            auto stream =
100
                reader->stream(StreamConfig()
101
                                   .stream_type(StreamType::MULTI_LINES)
102
                                   .range_type(RangeType::LINE_RANGE)
103
                                   .from(start_line)
104
                                   .to(end_line)
105
                                   .buffer_size(read_buffer_size));
106

107
#if DFTRACER_UTILS_LOGGER_DEBUG_ENABLED
108
            std::size_t line_count = 0;
109
#endif
110

111
            while (!stream->done()) {
112
                auto chunk = co_await stream->read_async();
113
                if (chunk.empty()) break;
114
                co_await io::write(STDOUT_FILENO, chunk.data(), chunk.size());
115
#if DFTRACER_UTILS_LOGGER_DEBUG_ENABLED
116
                line_count += std::count(chunk.begin(), chunk.end(), '\n');
117
#endif
118
            }
119

120
            DFTRACER_UTILS_LOG_DEBUG("Successfully read %zu lines from range",
121
                                     line_count);
122
        } else {
123
            std::size_t start_bytes_ =
124
                (start == -1) ? 0 : static_cast<std::size_t>(start);
125
            std::size_t end_bytes_ =
126
                end == -1 ? std::numeric_limits<std::size_t>::max()
127
                          : static_cast<size_t>(end);
128

129
            auto max_bytes = reader->get_max_bytes();
130
            if (end_bytes_ > max_bytes) {
131
                end_bytes_ = max_bytes;
132
            }
133
            DFTRACER_UTILS_LOG_DEBUG("%s",
134
                                     "Performing byte range read operation");
135
            DFTRACER_UTILS_LOG_DEBUG("Using read buffer size: %zu bytes",
136
                                     read_buffer_size);
137

138
            StreamType stream_type = (read_mode == "bytes")
139
                                         ? StreamType::BYTES
140
                                         : StreamType::MULTI_LINES_BYTES;
141

142
            auto stream = reader->stream(StreamConfig()
143
                                             .stream_type(stream_type)
144
                                             .range_type(RangeType::BYTE_RANGE)
145
                                             .from(start_bytes_)
146
                                             .to(end_bytes_)
147
                                             .buffer_size(read_buffer_size));
148

149
#if DFTRACER_UTILS_LOGGER_DEBUG_ENABLED == 1
150
            std::size_t total_bytes = 0;
151
#endif
152

153
            while (!stream->done()) {
154
                auto chunk = co_await stream->read_async();
155
                if (chunk.empty()) break;
156
                co_await io::write(STDOUT_FILENO, chunk.data(), chunk.size());
157
#if DFTRACER_UTILS_LOGGER_DEBUG_ENABLED == 1
158
                total_bytes += chunk.size();
159
#endif
160
            }
161

162
            DFTRACER_UTILS_LOG_DEBUG("Successfully read %zu bytes from range",
163
                                     total_bytes);
164
        }
165
        fsync(STDOUT_FILENO);
166
    } catch (const std::runtime_error &e) {
167
        DFTRACER_UTILS_LOG_ERROR("Reader error: %s", e.what());
168
        co_return 1;
169
    }
170

171
    co_return 0;
172
}
8!
173

174
int main(int argc, char **argv) {
5✔
175
    DFTRACER_UTILS_LOGGER_INIT();
5!
176
    auto default_checkpoint_size_str =
177
        std::to_string(Indexer::DEFAULT_CHECKPOINT_SIZE) + " B (" +
10!
178
        std::to_string(Indexer::DEFAULT_CHECKPOINT_SIZE / (1024 * 1024)) +
5!
179
        " MB)";
5!
180
    argparse::ArgumentParser program("dft_reader",
181
                                     DFTRACER_UTILS_PACKAGE_VERSION);
10!
182
    program.add_description(
5!
183
        "DFTracer utility for reading and indexing compressed files (GZIP, "
184
        "TAR.GZ)");
185
    program.add_argument("file")
5!
186
        .help("Compressed file to process (GZIP, TAR.GZ)")
10!
187
        .required();
5✔
188
    program.add_argument("-i", "--index")
5!
189
        .help("Path to the .dftindex store to use")
10!
190
        .default_value<std::string>("");
5!
191
    program.add_argument("-s", "--start")
5!
192
        .help("Start position in bytes")
10!
193
        .default_value<std::int64_t>(-1)
10!
194
        .scan<'d', std::int64_t>();
5!
195
    program.add_argument("-e", "--end")
5!
196
        .help("End position in bytes")
10!
197
        .default_value<std::int64_t>(-1)
10!
198
        .scan<'d', std::int64_t>();
5!
199
    program.add_argument("-c", "--checkpoint-size")
5!
200
        .help("Checkpoint size for indexing in bytes (default: " +
10!
201
              default_checkpoint_size_str + ")")
10!
202
        .scan<'d', std::size_t>()
5!
203
        .default_value(
5✔
204
            static_cast<std::size_t>(Indexer::DEFAULT_CHECKPOINT_SIZE));
5!
205
    program.add_argument("-f", "--force-rebuild")
5!
206
        .help("Force rebuild the .dftindex store")
10!
207
        .flag();
5!
208
    program.add_argument("--check")
5!
209
        .help("Check if the .dftindex store is valid")
10!
210
        .flag();
5!
211
    program.add_argument("--read-buffer-size")
5!
212
        .help("Size of the read buffer in bytes (default: 1MB)")
10!
213
        .default_value<std::size_t>(1 * 1024 * 1024)
10!
214
        .scan<'d', std::size_t>();
5!
215
    program.add_argument("--mode")
5!
216
        .help("Set the reading mode (bytes, line_bytes, lines)")
10!
217
        .default_value<std::string>("bytes")
10!
218
        .choices("bytes", "line_bytes", "lines");
5!
219
    program.add_argument("--index-dir")
5!
220
        .help("Directory to store root-local .dftindex directories")
10!
221
        .default_value<std::string>("");
5!
222

223
    try {
224
        program.parse_args(argc, argv);
5!
UNCOV
225
    } catch (const std::exception &err) {
×
226
        DFTRACER_UTILS_LOG_ERROR("Error occurred: %s", err.what());
×
227
        std::cerr << program;
×
228
        return 1;
×
229
    }
×
230

231
    std::string gz_path = program.get<std::string>("file");
5!
232
    std::string index_path = program.get<std::string>("--index");
5!
233
    int64_t start = program.get<int64_t>("--start");
5!
234
    int64_t end = program.get<int64_t>("--end");
5!
235
    std::size_t checkpoint_size = program.get<std::size_t>("--checkpoint-size");
5!
236
    bool force_rebuild = program.get<bool>("--force-rebuild");
5!
237
    bool check_rebuild = program.get<bool>("--check");
5!
238
    std::string read_mode = program.get<std::string>("--mode");
5!
239
    std::size_t read_buffer_size =
240
        program.get<std::size_t>("--read-buffer-size");
5!
241
    std::string index_dir = program.get<std::string>("--index-dir");
5!
242

243
    DFTRACER_UTILS_LOG_DEBUG("Processing file: %s", gz_path.c_str());
244
    DFTRACER_UTILS_LOG_DEBUG("Start position: %lld", (long long)start);
245
    DFTRACER_UTILS_LOG_DEBUG("End position: %lld", (long long)end);
246
    DFTRACER_UTILS_LOG_DEBUG("Mode: %s", read_mode.c_str());
247
    DFTRACER_UTILS_LOG_DEBUG("Checkpoint size: %zu B (%zu MB)", checkpoint_size,
248
                             checkpoint_size / (1024 * 1024));
249
    DFTRACER_UTILS_LOG_DEBUG("Force rebuild: %s",
250
                             force_rebuild ? "true" : "false");
251

252
    if (checkpoint_size <= 0) {
5!
253
        DFTRACER_UTILS_LOG_ERROR(
×
254
            "%s",
255
            "Checkpoint size must be positive (greater than 0 and in MB)");
256
        return 1;
×
257
    }
258

259
    int test_fd = ::open(gz_path.c_str(), O_RDONLY);
5!
260
    if (test_fd < 0) {
5✔
261
        DFTRACER_UTILS_LOG_ERROR("File '%s' does not exist or cannot be opened",
1!
262
                                 gz_path.c_str());
263
        return 1;
1✔
264
    }
265
    ::close(test_fd);
4!
266

267
    if (index_path.empty()) {
4!
268
        index_path = utilities::composites::dft::internal::determine_index_path(
8!
269
            gz_path, index_dir);
4✔
270
    }
271

272
#if DFTRACER_UTILS_LOGGER_DEBUG_ENABLED
273
    ArchiveFormat format = IndexerFactory::detect_format(gz_path);
274

275
    DFTRACER_UTILS_LOG_DEBUG("Detected format: %s",
276
                             format == ArchiveFormat::TAR_GZ ? "TAR.GZ"
277
                             : format == ArchiveFormat::GZIP ? "GZIP"
278
                                                             : "UNKNOWN");
279
#endif
280

281
    return run_reader(gz_path, index_path, checkpoint_size, force_rebuild,
8!
282
                      check_rebuild, read_mode, read_buffer_size, start, end)
283
        .get();
4!
284
}
5✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc