• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tstack / lnav / 17589970077-2502

09 Sep 2025 05:00PM UTC coverage: 65.196% (-5.0%) from 70.225%
17589970077-2502

push

github

tstack
[format] add fields for source file/line

Knowing the source file/line context in a log
message can help find log messages when using
log2src.

56 of 70 new or added lines in 2 files covered. (80.0%)

13954 existing lines in 210 files now uncovered.

45516 of 69814 relevant lines covered (65.2%)

404154.37 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.6
/src/line_buffer.cc
1
/**
2
 * Copyright (c) 2007-2012, Timothy Stack
3
 *
4
 * All rights reserved.
5
 *
6
 * Redistribution and use in source and binary forms, with or without
7
 * modification, are permitted provided that the following conditions are met:
8
 *
9
 * * Redistributions of source code must retain the above copyright notice, this
10
 * list of conditions and the following disclaimer.
11
 * * Redistributions in binary form must reproduce the above copyright notice,
12
 * this list of conditions and the following disclaimer in the documentation
13
 * and/or other materials provided with the distribution.
14
 * * Neither the name of Timothy Stack nor the names of its contributors
15
 * may be used to endorse or promote products derived from this software
16
 * without specific prior written permission.
17
 *
18
 * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ''AS IS'' AND ANY
19
 * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
20
 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
21
 * DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE FOR ANY
22
 * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
23
 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
24
 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
25
 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
26
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
27
 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28
 *
29
 * @file line_buffer.cc
30
 */
31

32
#include <errno.h>
33
#include <fcntl.h>
34
#include <stdio.h>
35
#include <string.h>
36
#include <unistd.h>
37

38
#include "config.h"
39

40
#ifdef HAVE_BZLIB_H
41
#    include <bzlib.h>
42
#endif
43

44
#include <algorithm>
45
#include <set>
46

47
#include "base/auto_mem.hh"
48
#include "base/auto_pid.hh"
49
#include "base/fs_util.hh"
50
#include "base/injector.bind.hh"
51
#include "base/injector.hh"
52
#include "base/is_utf8.hh"
53
#include "base/isc.hh"
54
#include "base/math_util.hh"
55
#include "base/paths.hh"
56
#include "fmtlib/fmt/format.h"
57
#include "hasher.hh"
58
#include "line_buffer.hh"
59
#include "piper.header.hh"
60
#include "scn/scan.h"
61
#include "yajlpp/yajlpp_def.hh"
62

63
using namespace std::chrono_literals;
64

65
static const ssize_t INITIAL_REQUEST_SIZE = 16 * 1024;
66
static const ssize_t DEFAULT_INCREMENT = 128 * 1024;
67
static const ssize_t INITIAL_COMPRESSED_BUFFER_SIZE = 5 * 1024 * 1024;
68
static const ssize_t MAX_COMPRESSED_BUFFER_SIZE = 32 * 1024 * 1024;
69

70
const ssize_t line_buffer::DEFAULT_LINE_BUFFER_SIZE = 256 * 1024;
71
const ssize_t line_buffer::MAX_LINE_BUFFER_SIZE
72
    = 4 * 4 * line_buffer::DEFAULT_LINE_BUFFER_SIZE;
73

74
class io_looper : public isc::service<io_looper> {};
75

76
struct io_looper_tag {};
77

78
static auto bound_io = injector::bind_multiple<isc::service_base>()
79
                           .add_singleton<io_looper, io_looper_tag>();
80

81
namespace injector {
82
template<>
83
void
84
force_linking(io_looper_tag anno)
603✔
85
{
86
}
603✔
87
}  // namespace injector
88

89
/*
90
 * XXX REMOVE ME
91
 *
92
 * The stock bzipped file code does not use pread, so we need to use a lock to
93
 * get exclusive access to the file.  In the future, we should just rewrite
94
 * the bzipped file code to use pread.
95
 */
96
class lock_hack {
97
public:
98
    class guard {
99
    public:
100
        guard() : g_lock(singleton()) { this->g_lock.lock(); }
2✔
101

102
        ~guard() { this->g_lock.unlock(); }
2✔
103

104
    private:
105
        lock_hack& g_lock;
106
    };
107

108
    static lock_hack& singleton()
2✔
109
    {
110
        static lock_hack retval;
2✔
111

112
        return retval;
2✔
113
    }
114

115
    void lock() { lockf(this->lh_fd, F_LOCK, 0); }
2✔
116

117
    void unlock() { lockf(this->lh_fd, F_ULOCK, 0); }
2✔
118

119
private:
120
    lock_hack()
1✔
121
    {
1✔
122
        char lockname[64];
123

124
        snprintf(lockname, sizeof(lockname), "/tmp/lnav.%d.lck", getpid());
1✔
125
        this->lh_fd = open(lockname, O_CREAT | O_RDWR, 0600);
1✔
126
        log_perror(fcntl(this->lh_fd, F_SETFD, FD_CLOEXEC));
1✔
127
        unlink(lockname);
1✔
128
    }
1✔
129

130
    auto_fd lh_fd;
131
};
132
/* XXX END */
133

134
#define Z_BUFSIZE      65536U
135
#define SYNCPOINT_SIZE (1024 * 1024)
136
line_buffer::gz_indexed::gz_indexed()
1,336✔
137
{
138
    if ((this->inbuf = auto_mem<Bytef>::malloc(Z_BUFSIZE)) == nullptr) {
1,336✔
UNCOV
139
        throw std::bad_alloc();
×
140
    }
141
}
1,336✔
142

143
void
144
line_buffer::gz_indexed::close()
1,331✔
145
{
146
    // Release old stream, if we were open
147
    if (*this) {
1,331✔
148
        inflateEnd(&this->strm);
7✔
149
        ::close(this->gz_fd);
7✔
150
        this->syncpoints.clear();
7✔
151
        this->gz_fd = -1;
7✔
152
    }
153
}
1,331✔
154

155
void
156
line_buffer::gz_indexed::init_stream()
25✔
157
{
158
    if (*this) {
25✔
159
        inflateEnd(&this->strm);
18✔
160
    }
161

162
    // initialize inflate struct
163
    int rc = inflateInit2(&this->strm, GZ_HEADER_MODE);
25✔
164
    this->strm.avail_in = 0;
25✔
165
    if (rc != Z_OK) {
25✔
UNCOV
166
        throw(rc);  // FIXME: exception wrapper
×
167
    }
168
}
25✔
169

170
void
171
line_buffer::gz_indexed::continue_stream()
8✔
172
{
173
    // Save our position and output buffer
174
    auto total_in = this->strm.total_in;
8✔
175
    auto total_out = this->strm.total_out;
8✔
176
    auto avail_out = this->strm.avail_out;
8✔
177
    auto next_out = this->strm.next_out;
8✔
178

179
    init_stream();
8✔
180

181
    // Restore position and output buffer
182
    this->strm.total_in = total_in;
8✔
183
    this->strm.total_out = total_out;
8✔
184
    this->strm.avail_out = avail_out;
8✔
185
    this->strm.next_out = next_out;
8✔
186
}
8✔
187

188
void
189
line_buffer::gz_indexed::open(int fd, lnav::gzip::header& hd)
7✔
190
{
191
    this->close();
7✔
192
    this->init_stream();
7✔
193
    this->gz_fd = fd;
7✔
194

195
    unsigned char name[1024];
196
    unsigned char comment[4096];
197

198
    name[0] = '\0';
7✔
199
    comment[0] = '\0';
7✔
200

201
    gz_header gz_hd;
202
    memset(&gz_hd, 0, sizeof(gz_hd));
7✔
203
    gz_hd.name = name;
7✔
204
    gz_hd.name_max = sizeof(name);
7✔
205
    gz_hd.comment = comment;
7✔
206
    gz_hd.comm_max = sizeof(comment);
7✔
207

208
    Bytef inbuf[8192];
209
    Bytef outbuf[8192];
210
    this->strm.next_out = outbuf;
7✔
211
    this->strm.total_out = 0;
7✔
212
    this->strm.avail_out = sizeof(outbuf);
7✔
213
    this->strm.next_in = inbuf;
7✔
214
    this->strm.total_in = 0;
7✔
215

216
    if (inflateGetHeader(&this->strm, &gz_hd) == Z_OK) {
7✔
217
        auto rc = pread(fd, inbuf, sizeof(inbuf), 0);
7✔
218
        if (rc >= 0) {
7✔
219
            this->strm.avail_in = rc;
7✔
220

221
            inflate(&this->strm, Z_BLOCK);
7✔
222
            inflateEnd(&this->strm);
7✔
223
            this->strm.next_out = Z_NULL;
7✔
224
            this->strm.next_in = Z_NULL;
7✔
225
            this->strm.next_in = Z_NULL;
7✔
226
            this->strm.total_in = 0;
7✔
227
            this->strm.avail_in = 0;
7✔
228
            this->init_stream();
7✔
229

230
            switch (gz_hd.done) {
7✔
UNCOV
231
                case 0:
×
UNCOV
232
                    log_debug("%d: no gzip header data", fd);
×
UNCOV
233
                    break;
×
234
                case 1:
7✔
235
                    hd.h_mtime.tv_sec = gz_hd.time;
7✔
236
                    name[sizeof(name) - 1] = '\0';
7✔
237
                    comment[sizeof(comment) - 1] = '\0';
7✔
238
                    hd.h_name = std::string((char*) name);
14✔
239
                    hd.h_comment = std::string((char*) comment);
7✔
240
                    log_info(
7✔
241
                        "%d: read gzip header (mtime=%d; name='%s'; "
242
                        "comment='%s'; crc=%x)",
243
                        fd,
244
                        hd.h_mtime.tv_sec,
245
                        hd.h_name.c_str(),
246
                        hd.h_comment.c_str(),
247
                        gz_hd.hcrc);
248
                    break;
7✔
UNCOV
249
                default:
×
250
                    log_error("%d: failed to read gzip header data", fd);
×
251
                    break;
×
252
            }
253
        } else {
UNCOV
254
            log_error("%d: failed to read gzip header from file: %s",
×
255
                      fd,
256
                      strerror(errno));
257
        }
258
    } else {
UNCOV
259
        log_error("%d: unable to get gzip header", fd);
×
260
    }
261
}
7✔
262

263
int
264
line_buffer::gz_indexed::stream_data(void* buf, size_t size)
13✔
265
{
266
    this->strm.avail_out = size;
13✔
267
    this->strm.next_out = (unsigned char*) buf;
13✔
268

269
    size_t last = this->syncpoints.empty() ? 0 : this->syncpoints.back().in;
13✔
270
    while (this->strm.avail_out) {
39✔
271
        if (!this->strm.avail_in) {
39✔
272
            int rc = ::pread(
21✔
273
                this->gz_fd, &this->inbuf[0], Z_BUFSIZE, this->strm.total_in);
21✔
274
            if (rc < 0) {
21✔
UNCOV
275
                return rc;
×
276
            }
277
            this->strm.next_in = this->inbuf;
21✔
278
            this->strm.avail_in = rc;
21✔
279
        }
280
        if (this->strm.avail_in) {
39✔
281
            int flush = last > this->strm.total_in ? Z_SYNC_FLUSH : Z_BLOCK;
28✔
282
            auto err = inflate(&this->strm, flush);
28✔
283
            if (err == Z_STREAM_END) {
28✔
284
                // Reached end of stream; re-init for a possible subsequent
285
                // stream
286
                continue_stream();
8✔
287
            } else if (err != Z_OK) {
20✔
288
                log_error(" inflate-error at offset %d: %d  %s",
2✔
289
                          this->strm.total_in,
290
                          (int) err,
291
                          this->strm.msg ? this->strm.msg : "");
292
                this->parent->lb_decompress_error = fmt::format(
2✔
293
                    FMT_STRING("inflate-error at offset {}: {}  {}"),
4✔
294
                    this->strm.total_in,
2✔
295
                    err,
296
                    this->strm.msg ? this->strm.msg : "");
4✔
297
                break;
2✔
298
            }
299

300
            if (this->strm.total_in >= last + SYNCPOINT_SIZE
26✔
301
                && size > this->strm.avail_out + GZ_WINSIZE
×
302
                && (this->strm.data_type & GZ_END_OF_BLOCK_MASK)
×
UNCOV
303
                && !(this->strm.data_type & GZ_END_OF_FILE_MASK))
×
304
            {
UNCOV
305
                this->syncpoints.emplace_back(this->strm, size);
×
UNCOV
306
                last = this->strm.total_out;
×
307
            }
308
        } else if (this->strm.avail_out) {
11✔
309
            // Processed all the gz file data but didn't fill
310
            // the output buffer.  We're done, even though we
311
            // produced fewer bytes than requested.
312
            break;
11✔
313
        }
314
    }
315
    return size - this->strm.avail_out;
13✔
316
}
317

318
void
319
line_buffer::gz_indexed::seek(off_t offset)
3✔
320
{
321
    if ((size_t) offset == this->strm.total_out) {
3✔
UNCOV
322
        return;
×
323
    }
324

325
    indexDict* dict = nullptr;
3✔
326
    // Find highest syncpoint not past offset
327
    // FIXME: Make this a binary-tree search
328
    for (auto& d : this->syncpoints) {
3✔
UNCOV
329
        if (d.out <= offset) {
×
UNCOV
330
            dict = &d;
×
331
        } else {
UNCOV
332
            break;
×
333
        }
334
    }
335

336
    // Choose highest available syncpoint, or keep current offset if it's ok
337
    if ((size_t) offset < this->strm.total_out
3✔
UNCOV
338
        || (dict && this->strm.total_out < (size_t) dict->out))
×
339
    {
340
        // Release the old z_stream
341
        inflateEnd(&this->strm);
3✔
342
        if (dict) {
3✔
UNCOV
343
            dict->apply(&this->strm);
×
344
        } else {
345
            init_stream();
3✔
346
        }
347
    }
348

349
    // Stream from compressed file until we reach our offset
350
    unsigned char dummy[Z_BUFSIZE];
351
    while ((size_t) offset > this->strm.total_out) {
3✔
352
        size_t to_copy
353
            = std::min(static_cast<size_t>(Z_BUFSIZE),
×
354
                       static_cast<size_t>(offset - this->strm.total_out));
UNCOV
355
        auto bytes = stream_data(dummy, to_copy);
×
UNCOV
356
        if (bytes <= 0) {
×
UNCOV
357
            break;
×
358
        }
359
    }
360
}
361

362
int
363
line_buffer::gz_indexed::read(void* buf, size_t offset, size_t size)
13✔
364
{
365
    if (offset != this->strm.total_out) {
13✔
366
        // log_debug("doing seek!  %d %d", offset, this->strm.total_out);
367
        this->seek(offset);
3✔
368
    }
369

370
    int bytes = stream_data(buf, size);
13✔
371

372
    return bytes;
13✔
373
}
374

375
line_buffer::line_buffer()
1,336✔
376
{
377
    this->lb_gz_file.writeAccess()->parent = this;
1,336✔
378

379
    ensure(this->invariant());
1,336✔
380
}
1,336✔
381

382
line_buffer::~line_buffer()
1,317✔
383
{
384
    if (this->lb_loader_future.valid()) {
1,317✔
385
        this->lb_loader_future.wait();
53✔
386
    }
387

388
    auto empty_fd = auto_fd();
1,317✔
389

390
    // Make sure any shared refs take ownership of the data.
391
    this->lb_share_manager.invalidate_refs();
1,317✔
392
    this->set_fd(empty_fd);
1,317✔
393
}
1,317✔
394

395
void
396
line_buffer::set_fd(auto_fd& fd)
2,650✔
397
{
398
    file_off_t newoff = 0;
2,650✔
399

400
    {
401
        safe::WriteAccess<safe_gz_indexed> gi(this->lb_gz_file);
2,650✔
402

403
        if (*gi) {
2,650✔
404
            gi->close();
7✔
405
        }
406
    }
2,650✔
407

408
    if (this->lb_bz_file) {
2,650✔
409
        this->lb_bz_file = false;
1✔
410
    }
411

412
    if (fd != -1) {
2,650✔
413
        /* Sync the fd's offset with the object. */
414
        newoff = lseek(fd, 0, SEEK_CUR);
1,333✔
415
        if (newoff == -1) {
1,333✔
416
            if (errno != ESPIPE) {
87✔
UNCOV
417
                throw error(errno);
×
418
            }
419

420
            /* It's a pipe, start with a zero offset. */
421
            newoff = 0;
87✔
422
            this->lb_seekable = false;
87✔
423
        } else {
424
            char gz_id[2 + 1 + 1 + 4];
425

426
            if (pread(fd, gz_id, sizeof(gz_id), 0) == sizeof(gz_id)) {
1,246✔
427
                auto piper_hdr_opt = lnav::piper::read_header(fd, gz_id);
1,217✔
428

429
                if (piper_hdr_opt) {
1,217✔
430
                    static intern_string_t SRC = intern_string::lookup("piper");
194✔
431

432
                    auto meta_buf = std::move(piper_hdr_opt.value());
116✔
433

434
                    auto meta_sf = string_fragment::from_bytes(meta_buf.in(),
116✔
435
                                                               meta_buf.size());
436
                    auto meta_parse_res
437
                        = lnav::piper::header_handlers.parser_for(SRC).of(
232✔
438
                            meta_sf);
116✔
439
                    if (meta_parse_res.isErr()) {
116✔
UNCOV
440
                        log_error("failed to parse piper header: %s",
×
441
                                  meta_parse_res.unwrapErr()[0]
442
                                      .to_attr_line()
443
                                      .get_string()
444
                                      .c_str());
UNCOV
445
                        throw error(EINVAL);
×
446
                    }
447

448
                    this->lb_line_metadata = true;
116✔
449
                    this->lb_file_offset
450
                        = lnav::piper::HEADER_SIZE + meta_buf.size();
116✔
451
                    this->lb_piper_header_size = this->lb_file_offset;
116✔
452
                    this->lb_header = meta_parse_res.unwrap();
116✔
453
                } else if (gz_id[0] == '\037' && gz_id[1] == '\213') {
1,217✔
454
                    int gzfd = dup(fd);
7✔
455

456
                    log_perror(fcntl(gzfd, F_SETFD, FD_CLOEXEC));
7✔
457
                    if (lseek(fd, 0, SEEK_SET) < 0) {
7✔
UNCOV
458
                        close(gzfd);
×
UNCOV
459
                        throw error(errno);
×
460
                    }
461
                    lnav::gzip::header hdr;
7✔
462

463
                    this->lb_gz_file.writeAccess()->open(gzfd, hdr);
7✔
464
                    this->lb_compressed = true;
7✔
465
                    this->lb_file_time = hdr.h_mtime.tv_sec;
7✔
466
                    if (this->lb_file_time < 0) {
7✔
467
                        this->lb_file_time = 0;
×
468
                    }
469
                    this->lb_compressed_offset = 0;
7✔
470
                    if (!hdr.empty()) {
7✔
471
                        this->lb_header = std::move(hdr);
7✔
472
                    }
473
                    this->resize_buffer(INITIAL_COMPRESSED_BUFFER_SIZE);
7✔
474
                }
7✔
475
#ifdef HAVE_BZLIB_H
476
                else if (gz_id[0] == 'B' && gz_id[1] == 'Z')
1,094✔
477
                {
478
                    if (lseek(fd, 0, SEEK_SET) < 0) {
1✔
UNCOV
479
                        throw error(errno);
×
480
                    }
481
                    this->lb_bz_file = true;
1✔
482
                    this->lb_compressed = true;
1✔
483

484
                    /*
485
                     * Loading data from a bzip2 file is pretty slow, so we try
486
                     * to keep as much in memory as possible.
487
                     */
488
                    this->resize_buffer(INITIAL_COMPRESSED_BUFFER_SIZE);
1✔
489

490
                    this->lb_compressed_offset = 0;
1✔
491
                }
492
#endif
493
            }
1,217✔
494
            this->lb_seekable = true;
1,246✔
495
        }
496
    }
497
    this->lb_file_offset = newoff;
2,650✔
498
    this->lb_buffer.clear();
2,650✔
499
    this->lb_fd = std::move(fd);
2,650✔
500

501
    ensure(this->invariant());
2,650✔
502
}
2,650✔
503

504
void
505
line_buffer::resize_buffer(size_t new_max)
22✔
506
{
507
    if (((this->lb_compressed && new_max <= MAX_COMPRESSED_BUFFER_SIZE)
19✔
508
         || (!this->lb_compressed && new_max <= MAX_LINE_BUFFER_SIZE))
3✔
509
        && !this->lb_buffer.has_capacity_for(new_max))
44✔
510
    {
511
        /* Still need more space, try a realloc. */
512
        this->lb_share_manager.invalidate_refs();
17✔
513
        this->lb_buffer.expand_to(new_max);
17✔
514
    }
515
}
22✔
516

517
void
518
line_buffer::ensure_available(file_off_t start,
3,443✔
519
                              ssize_t max_length,
520
                              scan_direction dir)
521
{
522
    ssize_t prefill;
523

524
    require(this->lb_compressed || max_length <= MAX_LINE_BUFFER_SIZE);
3,443✔
525

526
    // log_debug("ensure avail %d %d", start, max_length);
527

528
    if (this->lb_file_size != -1) {
3,443✔
529
        if (start + (file_off_t) max_length > this->lb_file_size) {
98✔
530
            max_length = (this->lb_file_size - start);
85✔
531
        }
532
    }
533

534
    /*
535
     * Check to see if the start is inside the cached range or immediately
536
     * after.
537
     */
538
    if (start < this->lb_file_offset
6,886✔
539
        || start > (file_off_t) (this->lb_file_offset + this->lb_buffer.size()))
3,443✔
540
    {
541
        /*
542
         * The request is outside the cached range, need to reload the
543
         * whole thing.
544
         */
545
        this->lb_share_manager.invalidate_refs();
1,001✔
546
        prefill = 0;
1,001✔
547
        this->lb_buffer.clear();
1,001✔
548

549
        switch (dir) {
1,001✔
550
            case scan_direction::forward:
999✔
551
                break;
999✔
552
            case scan_direction::backward: {
2✔
553
                auto padded_max_length = max_length * 4;
2✔
554
                if (this->lb_buffer.has_capacity_for(padded_max_length)) {
2✔
555
                    start = std::max(
4✔
556
                        file_off_t{0},
4✔
557
                        static_cast<file_off_t>(start
2✔
558
                                                - (this->lb_buffer.capacity()
4✔
559
                                                   - padded_max_length)));
2✔
560
                }
561
                break;
2✔
562
            }
563
        }
564

565
        if (this->lb_file_size == (ssize_t) -1) {
1,001✔
566
            this->lb_file_offset = start;
1,001✔
567
        } else {
UNCOV
568
            require(start <= this->lb_file_size);
×
569
            /*
570
             * If the start is near the end of the file, move the offset back a
571
             * bit so we can get more of the file in the cache.
572
             */
UNCOV
573
            if (start + (ssize_t) this->lb_buffer.capacity()
×
UNCOV
574
                > this->lb_file_size)
×
575
            {
576
                this->lb_file_offset = this->lb_file_size
×
577
                    - std::min(this->lb_file_size,
×
578
                               (file_ssize_t) this->lb_buffer.capacity());
×
579
            } else {
UNCOV
580
                this->lb_file_offset = start;
×
581
            }
582
        }
583
    } else {
584
        /* The request is in the cached range.  Record how much extra data is in
585
         * the buffer before the requested range.
586
         */
587
        prefill = start - this->lb_file_offset;
2,442✔
588
    }
589
    require(this->lb_file_offset <= start);
3,443✔
590
    require(prefill <= (ssize_t) this->lb_buffer.size());
3,443✔
591

592
    ssize_t available
593
        = this->lb_buffer.capacity() - (start - this->lb_file_offset);
3,443✔
594
    if (max_length > available) {
3,443✔
595
        // log_debug("need more space!");
596
        /*
597
         * Need more space, move any existing data to the front of the
598
         * buffer.
599
         */
600
        this->lb_share_manager.invalidate_refs();
647✔
601

602
        this->lb_buffer.resize_by(-prefill);
647✔
603
        this->lb_file_offset += prefill;
647✔
604
        // log_debug("adjust file offset for prefill %d", this->lb_file_offset);
605
        memmove(this->lb_buffer.at(0),
1,294✔
606
                this->lb_buffer.at(prefill),
647✔
607
                this->lb_buffer.size());
608

609
        available = this->lb_buffer.capacity() - (start - this->lb_file_offset);
647✔
610
        if (max_length > available) {
647✔
611
            this->resize_buffer(roundup_size(max_length, DEFAULT_INCREMENT));
3✔
612
        }
613
    }
614
    this->lb_line_starts.clear();
3,443✔
615
    this->lb_line_is_utf.clear();
3,443✔
616
}
3,443✔
617

618
bool
619
line_buffer::load_next_buffer()
663✔
620
{
621
    // log_debug("loader here!");
622
    auto retval = false;
663✔
623
    auto start = this->lb_loader_file_offset.value();
663✔
624
    ssize_t rc = 0;
663✔
625
    safe::WriteAccess<safe_gz_indexed> gi(this->lb_gz_file);
663✔
626

627
    // log_debug("BEGIN preload read");
628
    /* ... read in the new data. */
629
    if (!this->lb_cached_fd && *gi) {
663✔
630
        if (this->lb_file_size != (ssize_t) -1 && this->in_range(start)
×
631
            && this->in_range(this->lb_file_size - 1))
4✔
632
        {
633
            rc = 0;
×
634
        } else {
635
            // log_debug("async decomp start");
636
            rc = gi->read(this->lb_alt_buffer.value().end(),
8✔
637
                          start + this->lb_alt_buffer.value().size(),
4✔
638
                          this->lb_alt_buffer.value().available());
4✔
639
            this->lb_compressed_offset = gi->get_source_offset();
4✔
640
            ensure(this->lb_compressed_offset >= 0);
4✔
641
            if (rc != -1
4✔
642
                && rc < (ssize_t) this->lb_alt_buffer.value().available()
4✔
643
                && (start + (ssize_t) this->lb_alt_buffer.value().size() + rc
12✔
644
                    > this->lb_file_size))
4✔
645
            {
646
                this->lb_file_size
647
                    = (start + this->lb_alt_buffer.value().size() + rc);
4✔
648
                log_info("fd(%d): set file size to %llu",
4✔
649
                         this->lb_fd.get(),
650
                         this->lb_file_size);
651
            }
652
#if 0
653
            log_debug("async decomp end  %d+%d:%d",
654
                      this->lb_alt_buffer->size(),
655
                      rc,
656
                      this->lb_alt_buffer->capacity());
657
#endif
658
        }
659
    }
660
#ifdef HAVE_BZLIB_H
661
    else if (!this->lb_cached_fd && this->lb_bz_file)
659✔
662
    {
663
        if (this->lb_file_size != (ssize_t) -1
2✔
664
            && (((ssize_t) start >= this->lb_file_size)
1✔
665
                || (this->in_range(start)
×
UNCOV
666
                    && this->in_range(this->lb_file_size - 1))))
×
667
        {
UNCOV
668
            rc = 0;
×
669
        } else {
670
            lock_hack::guard guard;
1✔
671
            char scratch[32 * 1024];
672
            BZFILE* bz_file;
673
            file_off_t seek_to;
674
            int bzfd;
675

676
            /*
677
             * Unfortunately, there is no bzseek, so we need to reopen the
678
             * file every time we want to do a read.
679
             */
680
            bzfd = dup(this->lb_fd);
1✔
681
            if (lseek(this->lb_fd, 0, SEEK_SET) < 0) {
1✔
682
                close(bzfd);
×
683
                throw error(errno);
×
684
            }
685
            if ((bz_file = BZ2_bzdopen(bzfd, "r")) == nullptr) {
1✔
UNCOV
686
                close(bzfd);
×
UNCOV
687
                if (errno == 0) {
×
UNCOV
688
                    throw std::bad_alloc();
×
689
                } else {
690
                    throw error(errno);
×
691
                }
692
            }
693

694
            seek_to = start + this->lb_alt_buffer.value().size();
1✔
695
            while (seek_to > 0) {
1✔
696
                int count;
697

698
                count = BZ2_bzread(bz_file,
×
699
                                   scratch,
700
                                   std::min((size_t) seek_to, sizeof(scratch)));
×
701
                seek_to -= count;
×
702
            }
703
            rc = BZ2_bzread(bz_file,
1✔
704
                            this->lb_alt_buffer->end(),
1✔
705
                            this->lb_alt_buffer->available());
1✔
706
            this->lb_compressed_offset = 0;
1✔
707
            BZ2_bzclose(bz_file);
1✔
708

709
            if (rc != -1
1✔
710
                && (rc < (ssize_t) (this->lb_alt_buffer.value().available()))
1✔
711
                && (start + (ssize_t) this->lb_alt_buffer.value().size() + rc
3✔
712
                    > this->lb_file_size))
1✔
713
            {
714
                this->lb_file_size
715
                    = (start + this->lb_alt_buffer.value().size() + rc);
1✔
716
                log_info("fd(%d): set file size to %llu",
1✔
717
                         this->lb_fd.get(),
718
                         this->lb_file_size);
719
            }
720
        }
1✔
721
    }
722
#endif
723
    else
724
    {
725
        rc = pread(this->lb_cached_fd ? this->lb_cached_fd.value().get()
1,316✔
726
                                      : this->lb_fd.get(),
658✔
727
                   this->lb_alt_buffer.value().end(),
658✔
728
                   this->lb_alt_buffer.value().available(),
658✔
729
                   start + this->lb_alt_buffer.value().size());
658✔
730
    }
731
    // XXX For some reason, cygwin is giving us a bogus return value when
732
    // up to the end of the file.
733
    if (rc > (ssize_t) this->lb_alt_buffer.value().available()) {
663✔
734
        rc = -1;
×
735
#ifdef ENODATA
UNCOV
736
        errno = ENODATA;
×
737
#else
738
        errno = EAGAIN;
739
#endif
740
    }
741
    switch (rc) {
663✔
742
        case 0:
14✔
743
            if (start < (file_off_t) this->lb_file_size) {
14✔
744
                retval = true;
×
745
            }
746
            break;
14✔
747

748
        case (ssize_t) -1:
×
UNCOV
749
            switch (errno) {
×
750
#ifdef ENODATA
751
                /* Cygwin seems to return this when pread reaches the end of
752
                 * the file. */
753
                case ENODATA:
×
754
#endif
755
                case EINTR:
756
                case EAGAIN:
UNCOV
757
                    break;
×
758

UNCOV
759
                default:
×
UNCOV
760
                    throw error(errno);
×
761
            }
UNCOV
762
            break;
×
763

764
        default:
649✔
765
            this->lb_alt_buffer.value().resize_by(rc);
649✔
766
            retval = true;
649✔
767
            break;
649✔
768
    }
769
    // log_debug("END preload read");
770

771
    if (start > this->lb_last_line_offset) {
663✔
772
        const auto* line_start = this->lb_alt_buffer.value().begin();
663✔
773

774
        do {
775
            auto before = line_start - this->lb_alt_buffer->begin();
17,047✔
776
            const auto remaining = this->lb_alt_buffer.value().size() - before;
17,047✔
777
            const auto frag
778
                = string_fragment::from_bytes(line_start, remaining);
17,047✔
779
            auto utf_scan_res = is_utf8(frag, '\n');
17,047✔
780
            const auto* lf = utf_scan_res.remaining_ptr();
17,047✔
781
            this->lb_alt_line_starts.emplace_back(before);
17,047✔
782
            this->lb_alt_line_is_utf.emplace_back(utf_scan_res.is_valid());
17,047✔
783
            this->lb_alt_line_has_ansi.emplace_back(utf_scan_res.usr_has_ansi);
17,047✔
784

785
            line_start = lf;
17,047✔
786
        } while (line_start != nullptr
787
                 && line_start < this->lb_alt_buffer->end());
17,047✔
788
    }
789

790
    return retval;
663✔
791
}
663✔
792

793
bool
794
line_buffer::fill_range(file_off_t start,
2,900✔
795
                        ssize_t max_length,
796
                        scan_direction dir)
797
{
798
    bool retval = false;
2,900✔
799

800
    require(start >= 0);
2,900✔
801

802
#if 0
803
    log_debug("(%p) fill range %d %d (%d) %d",
804
              this,
805
              start,
806
              max_length,
807
              this->lb_file_offset,
808
              this->lb_buffer.size());
809
#endif
810
    if (!lnav::pid::in_child && this->lb_loader_future.valid()
2,900✔
811
        && start >= this->lb_loader_file_offset.value())
5,800✔
812
    {
813
#if 0
814
        log_debug("getting preload! %d %d",
815
                  start,
816
                  this->lb_loader_file_offset.value());
817
#endif
818
        std::optional<std::chrono::system_clock::time_point> wait_start;
610✔
819

820
        if (this->lb_loader_future.wait_for(std::chrono::seconds(0))
610✔
821
            != std::future_status::ready)
610✔
822
        {
823
            wait_start = std::make_optional(std::chrono::system_clock::now());
160✔
824
        }
825
        retval = this->lb_loader_future.get();
610✔
826
        if (false && wait_start) {
827
            auto diff = std::chrono::system_clock::now() - wait_start.value();
828
            log_debug("wait done! %d", diff.count());
829
        }
830
        // log_debug("got preload");
831
        this->lb_loader_future = {};
610✔
832
        this->lb_share_manager.invalidate_refs();
610✔
833
        this->lb_file_offset = this->lb_loader_file_offset.value();
610✔
834
        this->lb_loader_file_offset = std::nullopt;
610✔
835
        this->lb_buffer.swap(this->lb_alt_buffer.value());
610✔
836
        this->lb_alt_buffer.value().clear();
610✔
837
        this->lb_line_starts = std::move(this->lb_alt_line_starts);
610✔
838
        this->lb_alt_line_starts.clear();
610✔
839
        this->lb_line_is_utf = std::move(this->lb_alt_line_is_utf);
610✔
840
        this->lb_alt_line_is_utf.clear();
610✔
841
        this->lb_line_has_ansi = std::move(this->lb_alt_line_has_ansi);
610✔
842
        this->lb_alt_line_has_ansi.clear();
610✔
843
        this->lb_stats.s_used_preloads += 1;
610✔
844
        this->lb_next_line_start_index = 0;
610✔
845
        this->lb_next_buffer_offset = 0;
610✔
846
    }
847
    if (this->in_range(start)
2,900✔
848
        && (max_length == 0 || this->in_range(start + max_length - 1)))
2,900✔
849
    {
850
        /* Cache already has the data, nothing to do. */
851
        retval = true;
30✔
852
        if (this->lb_do_preloading && !lnav::pid::in_child && this->lb_seekable
30✔
853
            && this->lb_buffer.full() && !this->lb_loader_file_offset)
60✔
854
        {
855
            // log_debug("loader available start=%d", start);
856
            auto last_lf_iter = std::find(
857
                this->lb_buffer.rbegin(), this->lb_buffer.rend(), '\n');
3✔
858
            if (last_lf_iter != this->lb_buffer.rend()) {
3✔
859
                auto usable_size
860
                    = std::distance(last_lf_iter, this->lb_buffer.rend());
3✔
861
                // log_debug("found linefeed %d", usable_size);
862
                if (!this->lb_alt_buffer) {
3✔
863
                    // log_debug("allocating new buffer!");
864
                    this->lb_alt_buffer
UNCOV
865
                        = auto_buffer::alloc(this->lb_buffer.capacity());
×
866
                }
867
                this->lb_alt_buffer->resize(this->lb_buffer.size()
3✔
868
                                            - usable_size);
3✔
869
                memcpy(this->lb_alt_buffer.value().begin(),
6✔
870
                       this->lb_buffer.at(usable_size),
3✔
871
                       this->lb_alt_buffer->size());
872
                this->lb_loader_file_offset
873
                    = this->lb_file_offset + usable_size;
3✔
874
#if 0
875
                log_debug("load offset %d",
876
                          this->lb_loader_file_offset.value());
877
                log_debug("launch loader");
878
#endif
879
                auto prom = std::make_shared<std::promise<bool>>();
3✔
880
                this->lb_loader_future = prom->get_future();
3✔
881
                this->lb_stats.s_requested_preloads += 1;
3✔
882
                isc::to<io_looper&, io_looper_tag>().send(
3✔
883
                    [this, prom](auto& ioloop) mutable {
6✔
884
                        prom->set_value(this->load_next_buffer());
3✔
885
                    });
3✔
886
            }
3✔
887
        }
888
    } else if (this->lb_fd != -1) {
2,870✔
889
        ssize_t rc;
890

891
        /* Make sure there is enough space, then */
892
        this->ensure_available(start, max_length, dir);
2,870✔
893

894
        safe::WriteAccess<safe_gz_indexed> gi(this->lb_gz_file);
2,870✔
895

896
        /* ... read in the new data. */
897
        if (!this->lb_cached_fd && *gi) {
2,870✔
898
            // log_debug("old decomp start");
899
            if (this->lb_file_size != (ssize_t) -1 && this->in_range(start)
12✔
900
                && this->in_range(this->lb_file_size - 1))
27✔
901
            {
902
                rc = 0;
6✔
903
            } else {
904
                this->lb_stats.s_decompressions += 1;
9✔
905
                if (false && this->lb_last_line_offset > 0) {
906
                    this->lb_stats.s_hist[(this->lb_file_offset * 10)
907
                                          / this->lb_last_line_offset]
908
                        += 1;
909
                }
910
                rc = gi->read(this->lb_buffer.end(),
18✔
911
                              this->lb_file_offset + this->lb_buffer.size(),
9✔
912
                              this->lb_buffer.available());
913
                this->lb_compressed_offset = gi->get_source_offset();
9✔
914
                ensure(this->lb_compressed_offset >= 0);
9✔
915
                if (rc != -1 && (rc < (ssize_t) this->lb_buffer.available())) {
9✔
916
                    this->lb_file_size
917
                        = (this->lb_file_offset + this->lb_buffer.size() + rc);
9✔
918
                    log_info("fd(%d): rc (%d) -- set file size to %llu",
9✔
919
                             this->lb_fd.get(),
920
                             rc,
921
                             this->lb_file_size);
922
                }
923
            }
924
#if 0
925
            log_debug("old decomp end -- %d+%d:%d",
926
                      this->lb_buffer.size(),
927
                      rc,
928
                      this->lb_buffer.capacity());
929
#endif
930
        }
931
#ifdef HAVE_BZLIB_H
932
        else if (!this->lb_cached_fd && this->lb_bz_file)
2,855✔
933
        {
934
            if (this->lb_file_size != (ssize_t) -1
6✔
935
                && (((ssize_t) start >= this->lb_file_size)
5✔
936
                    || (this->in_range(start)
2✔
937
                        && this->in_range(this->lb_file_size - 1))))
1✔
938
            {
939
                rc = 0;
2✔
940
            } else {
941
                lock_hack::guard guard;
1✔
942
                char scratch[32 * 1024];
943
                BZFILE* bz_file;
944
                file_off_t seek_to;
945
                int bzfd;
946

947
                /*
948
                 * Unfortunately, there is no bzseek, so we need to reopen the
949
                 * file every time we want to do a read.
950
                 */
951
                bzfd = dup(this->lb_fd);
1✔
952
                if (lseek(this->lb_fd, 0, SEEK_SET) < 0) {
1✔
UNCOV
953
                    close(bzfd);
×
954
                    throw error(errno);
×
955
                }
956
                if ((bz_file = BZ2_bzdopen(bzfd, "r")) == NULL) {
1✔
UNCOV
957
                    close(bzfd);
×
UNCOV
958
                    if (errno == 0) {
×
UNCOV
959
                        throw std::bad_alloc();
×
960
                    } else {
UNCOV
961
                        throw error(errno);
×
962
                    }
963
                }
964

965
                seek_to = this->lb_file_offset + this->lb_buffer.size();
1✔
966
                while (seek_to > 0) {
1✔
967
                    int count;
968

UNCOV
969
                    count = BZ2_bzread(
×
970
                        bz_file,
971
                        scratch,
UNCOV
972
                        std::min((size_t) seek_to, sizeof(scratch)));
×
UNCOV
973
                    seek_to -= count;
×
974
                }
975
                rc = BZ2_bzread(bz_file,
1✔
976
                                this->lb_buffer.end(),
1✔
977
                                this->lb_buffer.available());
1✔
978
                this->lb_compressed_offset = 0;
1✔
979
                BZ2_bzclose(bz_file);
1✔
980

981
                if (rc != -1 && (rc < (ssize_t) this->lb_buffer.available())) {
1✔
982
                    this->lb_file_size
983
                        = (this->lb_file_offset + this->lb_buffer.size() + rc);
1✔
984
                    log_info("fd(%d): set file size to %llu",
1✔
985
                             this->lb_fd.get(),
986
                             this->lb_file_size);
987
                }
988
            }
1✔
989
        }
990
#endif
991
        else if (this->lb_seekable)
2,852✔
992
        {
993
            this->lb_stats.s_preads += 1;
2,669✔
994
            if (false && this->lb_last_line_offset > 0) {
995
                this->lb_stats.s_hist[(this->lb_file_offset * 10)
996
                                      / this->lb_last_line_offset]
997
                    += 1;
998
            }
999
#if 0
1000
            log_debug("%d: pread %lld",
1001
                      this->lb_fd.get(),
1002
                      this->lb_file_offset + this->lb_buffer.size());
1003
#endif
1004
            rc = pread(this->lb_cached_fd ? this->lb_cached_fd.value().get()
5,338✔
1005
                                          : this->lb_fd.get(),
2,669✔
1006
                       this->lb_buffer.end(),
2,669✔
1007
                       this->lb_buffer.available(),
1008
                       this->lb_file_offset + this->lb_buffer.size());
2,669✔
1009
            // log_debug("pread rc %d", rc);
1010
        } else {
1011
            rc = read(this->lb_fd,
366✔
1012
                      this->lb_buffer.end(),
183✔
1013
                      this->lb_buffer.available());
1014
        }
1015
        // XXX For some reason, cygwin is giving us a bogus return value when
1016
        // up to the end of the file.
1017
        if (rc > (ssize_t) this->lb_buffer.available()) {
2,870✔
UNCOV
1018
            rc = -1;
×
1019
#ifdef ENODATA
UNCOV
1020
            errno = ENODATA;
×
1021
#else
1022
            errno = EAGAIN;
1023
#endif
1024
        }
1025
        switch (rc) {
2,870✔
1026
            case 0:
1,326✔
1027
                if (!this->lb_seekable) {
1,326✔
1028
                    this->lb_file_size
1029
                        = this->lb_file_offset + this->lb_buffer.size();
133✔
1030
                }
1031
                if (start < (file_off_t) this->lb_file_size) {
1,326✔
1032
                    retval = true;
14✔
1033
                }
1034

1035
                if (this->lb_compressed) {
1,326✔
1036
                    /*
1037
                     * For compressed files, increase the buffer size so we
1038
                     * don't have to spend as much time uncompressing the data.
1039
                     */
1040
                    this->resize_buffer(MAX_COMPRESSED_BUFFER_SIZE);
11✔
1041
                }
1042
                break;
1,326✔
1043

1044
            case (ssize_t) -1:
2✔
1045
                switch (errno) {
2✔
1046
#ifdef ENODATA
1047
                    /* Cygwin seems to return this when pread reaches the end of
1048
                     * the */
1049
                    /* file. */
1050
                    case ENODATA:
2✔
1051
#endif
1052
                    case EINTR:
1053
                    case EAGAIN:
1054
                        break;
2✔
1055

UNCOV
1056
                    default:
×
UNCOV
1057
                        throw error(errno);
×
1058
                }
1059
                break;
2✔
1060

1061
            default:
1,542✔
1062
                this->lb_buffer.resize_by(rc);
1,542✔
1063
                retval = true;
1,542✔
1064
                break;
1,542✔
1065
        }
1066

1067
        if (this->lb_do_preloading && !lnav::pid::in_child && this->lb_seekable
2,112✔
1068
            && this->lb_buffer.full() && !this->lb_loader_file_offset)
4,982✔
1069
        {
1070
            // log_debug("loader available2 start=%d", start);
1071
            auto last_lf_iter = std::find(
UNCOV
1072
                this->lb_buffer.rbegin(), this->lb_buffer.rend(), '\n');
×
UNCOV
1073
            if (last_lf_iter != this->lb_buffer.rend()) {
×
1074
                auto usable_size
UNCOV
1075
                    = std::distance(last_lf_iter, this->lb_buffer.rend());
×
1076
                // log_debug("found linefeed %d", usable_size);
UNCOV
1077
                if (!this->lb_alt_buffer) {
×
1078
                    // log_debug("allocating new buffer!");
1079
                    this->lb_alt_buffer
UNCOV
1080
                        = auto_buffer::alloc(this->lb_buffer.capacity());
×
UNCOV
1081
                } else if (this->lb_alt_buffer->capacity()
×
UNCOV
1082
                           < this->lb_buffer.capacity())
×
1083
                {
UNCOV
1084
                    this->lb_alt_buffer->expand_to(this->lb_buffer.capacity());
×
1085
                }
UNCOV
1086
                this->lb_alt_buffer->resize(this->lb_buffer.size()
×
UNCOV
1087
                                            - usable_size);
×
UNCOV
1088
                memcpy(this->lb_alt_buffer->begin(),
×
UNCOV
1089
                       this->lb_buffer.at(usable_size),
×
1090
                       this->lb_alt_buffer->size());
1091
                this->lb_loader_file_offset
UNCOV
1092
                    = this->lb_file_offset + usable_size;
×
1093
#if 0
1094
                log_debug("load offset %d",
1095
                          this->lb_loader_file_offset.value());
1096
                log_debug("launch loader");
1097
#endif
UNCOV
1098
                auto prom = std::make_shared<std::promise<bool>>();
×
UNCOV
1099
                this->lb_loader_future = prom->get_future();
×
UNCOV
1100
                this->lb_stats.s_requested_preloads += 1;
×
UNCOV
1101
                isc::to<io_looper&, io_looper_tag>().send(
×
UNCOV
1102
                    [this, prom](auto& ioloop) mutable {
×
UNCOV
1103
                        prom->set_value(this->load_next_buffer());
×
UNCOV
1104
                    });
×
1105
            }
1106
        }
1107
        ensure(this->lb_buffer.size() <= this->lb_buffer.capacity());
2,870✔
1108
    }
2,870✔
1109

1110
    return retval;
2,900✔
1111
}
1112

1113
Result<line_info, std::string>
1114
line_buffer::load_next_line(file_range prev_line)
17,710✔
1115
{
1116
    const char* line_start = nullptr;
17,710✔
1117
    bool done = false;
17,710✔
1118
    line_info retval;
17,710✔
1119

1120
    require(this->lb_fd != -1);
17,710✔
1121

1122
    if (this->lb_line_metadata && prev_line.fr_offset == 0) {
17,710✔
1123
        prev_line.fr_offset = this->lb_piper_header_size;
116✔
1124
    }
1125

1126
    auto offset = prev_line.next_offset();
17,710✔
1127
    ssize_t request_size = INITIAL_REQUEST_SIZE;
17,710✔
1128
    retval.li_file_range.fr_offset = offset;
17,710✔
1129
    if (this->lb_buffer.empty() || !this->in_range(offset)) {
17,710✔
1130
        this->fill_range(offset, this->lb_buffer.capacity());
2,284✔
1131
    } else if (offset
15,426✔
1132
               == this->lb_file_offset + (ssize_t) this->lb_buffer.size())
15,426✔
1133
    {
UNCOV
1134
        if (!this->fill_range(offset, INITIAL_REQUEST_SIZE)) {
×
UNCOV
1135
            retval.li_file_range.fr_offset = offset;
×
UNCOV
1136
            retval.li_file_range.fr_size = 0;
×
UNCOV
1137
            if (this->is_pipe()) {
×
UNCOV
1138
                retval.li_partial = !this->is_pipe_closed();
×
1139
            } else {
UNCOV
1140
                retval.li_partial = true;
×
1141
            }
UNCOV
1142
            return Ok(retval);
×
1143
        }
1144
    }
1145
    if (prev_line.next_offset() == 0) {
17,710✔
1146
        auto is_utf_res = is_utf8(string_fragment::from_bytes(
3,092✔
1147
            this->lb_buffer.begin(), this->lb_buffer.size()));
1,546✔
1148
        this->lb_is_utf8 = is_utf_res.is_valid();
1,546✔
1149
        if (!this->lb_is_utf8) {
1,546✔
1150
            log_warning("fd(%d): input is not utf8 -- %s",
12✔
1151
                        this->lb_fd.get(),
1152
                        is_utf_res.usr_message);
1153
        }
1154
    }
1155
    while (!done) {
35,348✔
1156
        auto old_retval_size = retval.li_file_range.fr_size;
17,722✔
1157
        const char* lf = nullptr;
17,722✔
1158

1159
        /* Find the data in the cache and */
1160
        line_start = this->get_range(offset, retval.li_file_range.fr_size);
17,722✔
1161
        /* ... look for the end-of-line or end-of-file. */
1162
        ssize_t utf8_end = -1;
17,722✔
1163

1164
        if (!retval.li_utf8_scan_result.is_valid()) {
17,722✔
1165
            retval.li_utf8_scan_result = {};
2✔
1166
        }
1167
        auto found_in_cache = false;
17,722✔
1168
        auto has_ansi = false;
17,722✔
1169
        auto valid_utf8 = true;
17,722✔
1170
        if (!this->lb_line_starts.empty()) {
17,722✔
1171
            auto buffer_offset = offset - this->lb_file_offset;
3✔
1172

1173
            if (this->lb_next_buffer_offset == buffer_offset) {
3✔
1174
                require(this->lb_next_line_start_index
3✔
1175
                        < this->lb_line_starts.size());
1176
                auto start_iter = this->lb_line_starts.begin()
6✔
1177
                    + this->lb_next_line_start_index;
3✔
1178
                auto next_line_iter = start_iter + 1;
3✔
1179
                if (next_line_iter != this->lb_line_starts.end()) {
3✔
1180
                    utf8_end = *next_line_iter - 1 - *start_iter;
3✔
1181
                    found_in_cache = true;
3✔
1182
                    lf = line_start + utf8_end;
3✔
1183
                    has_ansi = this->lb_line_has_ansi
1184
                                   [this->lb_next_line_start_index];
3✔
1185
                    valid_utf8
1186
                        = this->lb_line_is_utf[this->lb_next_line_start_index];
3✔
1187

1188
                    // log_debug("hit cache");
1189
                    this->lb_next_buffer_offset = *next_line_iter;
3✔
1190
                    this->lb_next_line_start_index += 1;
3✔
1191
                } else {
1192
                    // log_debug("no next iter");
1193
                }
1194
            } else {
UNCOV
1195
                auto start_iter = std::lower_bound(this->lb_line_starts.begin(),
×
1196
                                                   this->lb_line_starts.end(),
1197
                                                   buffer_offset);
UNCOV
1198
                if (start_iter != this->lb_line_starts.end()) {
×
UNCOV
1199
                    auto next_line_iter = start_iter + 1;
×
1200

1201
                    // log_debug("found offset %d %d", buffer_offset,
1202
                    // *start_iter);
UNCOV
1203
                    if (next_line_iter != this->lb_line_starts.end()) {
×
UNCOV
1204
                        auto start_index = std::distance(
×
1205
                            this->lb_line_starts.begin(), start_iter);
UNCOV
1206
                        utf8_end = *next_line_iter - 1 - *start_iter;
×
UNCOV
1207
                        found_in_cache = true;
×
UNCOV
1208
                        lf = line_start + utf8_end;
×
UNCOV
1209
                        has_ansi = this->lb_line_has_ansi[start_index];
×
UNCOV
1210
                        valid_utf8 = this->lb_line_is_utf[start_index];
×
1211

UNCOV
1212
                        this->lb_next_line_start_index = start_index + 1;
×
UNCOV
1213
                        this->lb_next_buffer_offset = *next_line_iter;
×
1214
                    } else {
1215
                        // log_debug("no next iter");
1216
                    }
1217
                } else {
1218
                    // log_debug("no buffer_offset found");
1219
                }
1220
            }
1221
        }
1222

1223
        if (found_in_cache && valid_utf8) {
17,722✔
1224
            retval.li_utf8_scan_result.usr_has_ansi = has_ansi;
3✔
1225
        } else {
1226
            auto frag = string_fragment::from_bytes(
17,719✔
1227
                line_start, retval.li_file_range.fr_size);
17,719✔
1228
            auto scan_res = is_utf8(frag, '\n');
17,719✔
1229
            lf = scan_res.remaining_ptr();
17,719✔
1230
            if (lf != nullptr) {
17,719✔
1231
                lf -= 1;
17,050✔
1232
            }
1233
            retval.li_utf8_scan_result = scan_res;
17,719✔
1234
            if (!scan_res.is_valid()) {
17,719✔
1235
                log_warning("fd(%d): line is not utf8 -- %lld",
62✔
1236
                            this->lb_fd.get(),
1237
                            retval.li_file_range.fr_offset);
1238
            }
1239
        }
1240

1241
        auto got_new_data = old_retval_size != retval.li_file_range.fr_size;
17,722✔
1242
#if 0
1243
        log_debug("load next loop %p reqsize %d lsize %d",
1244
                  lf,
1245
                  request_size,
1246
                  retval.li_file_range.fr_size);
1247
#endif
1248
        if (lf != nullptr
17,722✔
1249
            || (retval.li_file_range.fr_size >= MAX_LINE_BUFFER_SIZE)
669✔
1250
            || (request_size >= MAX_LINE_BUFFER_SIZE)
669✔
1251
            || (!got_new_data
19,028✔
1252
                && (!this->is_pipe() || request_size > DEFAULT_INCREMENT)))
637✔
1253
        {
1254
            if ((lf != nullptr)
17,626✔
1255
                && ((size_t) (lf - line_start) >= MAX_LINE_BUFFER_SIZE - 1))
17,053✔
1256
            {
UNCOV
1257
                lf = nullptr;
×
1258
            }
1259
            if (lf != nullptr) {
17,626✔
1260
                retval.li_partial = false;
17,053✔
1261
                retval.li_file_range.fr_size = lf - line_start;
17,053✔
1262
                // delim
1263
                retval.li_file_range.fr_size += 1;
17,053✔
1264
                if (offset >= this->lb_last_line_offset) {
17,053✔
1265
                    this->lb_last_line_offset
1266
                        = offset + retval.li_file_range.fr_size;
16,553✔
1267
                }
1268
            } else {
1269
                if (retval.li_file_range.fr_size >= MAX_LINE_BUFFER_SIZE) {
573✔
UNCOV
1270
                    log_warning("Line exceeded max size: offset=%d", offset);
×
UNCOV
1271
                    retval.li_file_range.fr_size = MAX_LINE_BUFFER_SIZE - 1;
×
UNCOV
1272
                    retval.li_partial = false;
×
1273
                } else {
1274
                    retval.li_partial = true;
573✔
1275
                }
1276
                this->ensure_available(offset, retval.li_file_range.fr_size);
573✔
1277

1278
                if (retval.li_file_range.fr_size >= MAX_LINE_BUFFER_SIZE) {
573✔
UNCOV
1279
                    retval.li_file_range.fr_size = MAX_LINE_BUFFER_SIZE - 1;
×
1280
                }
1281
                if (retval.li_partial) {
573✔
1282
                    /*
1283
                     * Since no delimiter was seen, we need to remember the
1284
                     * offset of the last line in the file so we don't
1285
                     * mistakenly return two partial lines to the caller.
1286
                     *
1287
                     *   1. read_line() - returns partial line
1288
                     *   2. file is written
1289
                     *   3. read_line() - returns the middle of partial line.
1290
                     */
1291
                    this->lb_last_line_offset = offset;
573✔
UNCOV
1292
                } else if (offset >= this->lb_last_line_offset) {
×
1293
                    this->lb_last_line_offset
UNCOV
1294
                        = offset + retval.li_file_range.fr_size;
×
1295
                }
1296
            }
1297

1298
            offset += retval.li_file_range.fr_size;
17,626✔
1299

1300
            done = true;
17,626✔
1301
        } else {
1302
            if (!this->is_pipe() || !this->is_pipe_closed()) {
96✔
1303
                retval.li_partial = true;
33✔
1304
            }
1305
            request_size
1306
                = std::min<ssize_t>(this->lb_buffer.size() + DEFAULT_INCREMENT,
96✔
1307
                                    MAX_LINE_BUFFER_SIZE);
1308
        }
1309

1310
        if (!done
35,444✔
1311
            && !this->fill_range(
17,818✔
1312
                offset,
1313
                std::max(request_size, (ssize_t) this->lb_buffer.available())))
17,818✔
1314
        {
1315
            break;
84✔
1316
        }
1317
    }
1318

1319
    ensure(retval.li_file_range.fr_size <= (ssize_t) this->lb_buffer.size());
17,710✔
1320
    ensure(this->invariant());
17,710✔
1321
#if 0
1322
    log_debug("got line part %d %d",
1323
              retval.li_file_range.fr_offset,
1324
              (int) retval.li_partial);
1325
#endif
1326

1327
    retval.li_file_range.fr_metadata.m_has_ansi
1328
        = retval.li_utf8_scan_result.usr_has_ansi;
17,710✔
1329
    retval.li_file_range.fr_metadata.m_valid_utf
1330
        = retval.li_utf8_scan_result.is_valid();
17,710✔
1331

1332
    if (this->lb_line_metadata) {
17,710✔
1333
        auto sv = std::string_view{
1334
            line_start,
1335
            (size_t) retval.li_file_range.fr_size,
378✔
1336
        };
378✔
1337

1338
        auto scan_res = scn::scan<int64_t, int64_t, char>(sv, "{}.{}:{};");
378✔
1339
        if (scan_res) {
378✔
1340
            auto& [tv_sec, tv_usec, level] = scan_res->values();
299✔
1341
            retval.li_timestamp.tv_sec = tv_sec;
299✔
1342
            retval.li_timestamp.tv_usec = tv_usec;
299✔
1343
            retval.li_timestamp.tv_sec
1344
                = lnav::to_local_time(date::sys_seconds{std::chrono::seconds{
299✔
1345
                                          retval.li_timestamp.tv_sec}})
299✔
1346
                      .time_since_epoch()
299✔
1347
                      .count();
299✔
1348
            retval.li_level = abbrev2level(&level, 1);
299✔
1349
        }
1350
    }
1351

1352
    return Ok(retval);
17,710✔
1353
}
1354

1355
Result<shared_buffer_ref, std::string>
1356
line_buffer::read_range(file_range fr, scan_direction dir)
83,762✔
1357
{
1358
    shared_buffer_ref retval;
83,762✔
1359
    const char* line_start;
1360
    file_ssize_t avail;
1361

1362
#if 0
1363
    if (this->lb_last_line_offset != -1
1364
        && fr.fr_offset > this->lb_last_line_offset)
1365
    {
1366
        /*
1367
         * Don't return anything past the last known line.  The caller needs
1368
         * to try reading at the offset of the last line again.
1369
         */
1370
        return Err(
1371
            fmt::format(FMT_STRING("attempt to read past the known end of the "
1372
                                   "file: read-offset={}; last_line_offset={}"),
1373
                        fr.fr_offset,
1374
                        this->lb_last_line_offset));
1375
    }
1376
#endif
1377

1378
    if (!(this->in_range(fr.fr_offset)
167,031✔
1379
          && this->in_range(fr.fr_offset + fr.fr_size - 1)))
83,269✔
1380
    {
1381
        if (!this->fill_range(fr.fr_offset, fr.fr_size, dir)) {
520✔
1382
            return Err(std::string("unable to read file"));
×
1383
        }
1384
    }
1385
    line_start = this->get_range(fr.fr_offset, avail);
83,762✔
1386

1387
    if (fr.fr_size > avail) {
83,762✔
UNCOV
1388
        return Err(fmt::format(
×
1389
            FMT_STRING("short-read (need: {}; avail: {})"), fr.fr_size, avail));
×
1390
    }
1391
    if (this->lb_line_metadata) {
83,762✔
1392
        auto new_start
1393
            = static_cast<const char*>(memchr(line_start, ';', fr.fr_size));
1,791✔
1394
        if (new_start) {
1,791✔
1395
            auto offset = new_start - line_start + 1;
1,546✔
1396
            line_start += offset;
1,546✔
1397
            fr.fr_size -= offset;
1,546✔
1398
        }
1399
    }
1400
    retval.share(this->lb_share_manager, line_start, fr.fr_size);
83,762✔
1401
    retval.get_metadata() = fr.fr_metadata;
83,762✔
1402

1403
    return Ok(std::move(retval));
83,762✔
1404
}
83,762✔
1405

1406
file_range
1407
line_buffer::get_available()
597✔
1408
{
1409
    return {this->lb_file_offset,
597✔
1410
            static_cast<file_ssize_t>(this->lb_buffer.size())};
597✔
1411
}
1412

UNCOV
1413
line_buffer::gz_indexed::indexDict::indexDict(const z_stream& s,
×
UNCOV
1414
                                              const file_size_t size)
×
1415
{
1416
    assert((s.data_type & GZ_END_OF_BLOCK_MASK));
1417
    assert(!(s.data_type & GZ_END_OF_FILE_MASK));
1418
    assert(size >= s.avail_out + GZ_WINSIZE);
UNCOV
1419
    this->bits = s.data_type & GZ_BORROW_BITS_MASK;
×
UNCOV
1420
    this->in = s.total_in;
×
UNCOV
1421
    this->out = s.total_out;
×
UNCOV
1422
    auto last_byte_in = s.next_in[-1];
×
UNCOV
1423
    this->in_bits = last_byte_in >> (8 - this->bits);
×
1424
    // Copy the last 32k uncompressed data (sliding window) to our
1425
    // index
UNCOV
1426
    memcpy(this->index, s.next_out - GZ_WINSIZE, GZ_WINSIZE);
×
1427
}
1428

1429
int
1430
line_buffer::gz_indexed::indexDict::apply(z_streamp s)
×
1431
{
1432
    s->zalloc = Z_NULL;
×
UNCOV
1433
    s->zfree = Z_NULL;
×
UNCOV
1434
    s->opaque = Z_NULL;
×
1435
    s->avail_in = 0;
×
1436
    s->next_in = Z_NULL;
×
1437
    auto ret = inflateInit2(s, GZ_RAW_MODE);
×
1438
    if (ret != Z_OK) {
×
1439
        return ret;
×
1440
    }
UNCOV
1441
    if (this->bits) {
×
1442
        inflatePrime(s, this->bits, this->in_bits);
×
1443
    }
1444
    s->total_in = this->in;
×
1445
    s->total_out = this->out;
×
1446
    inflateSetDictionary(s, this->index, GZ_WINSIZE);
×
1447
    return ret;
×
1448
}
1449

1450
bool
1451
line_buffer::is_likely_to_flush(file_range prev_line)
×
1452
{
1453
    auto avail = this->get_available();
×
1454

UNCOV
1455
    if (prev_line.fr_offset < avail.fr_offset) {
×
1456
        return true;
×
1457
    }
UNCOV
1458
    auto prev_line_end = prev_line.fr_offset + prev_line.fr_size;
×
1459
    auto avail_end = avail.fr_offset + avail.fr_size;
×
1460
    if (avail_end < prev_line_end) {
×
1461
        return true;
×
1462
    }
1463
    auto remaining = avail_end - prev_line_end;
×
1464
    return remaining < INITIAL_REQUEST_SIZE;
×
1465
}
1466

1467
void
1468
line_buffer::quiesce()
42✔
1469
{
1470
    if (this->lb_loader_future.valid()) {
42✔
1471
        this->lb_loader_future.wait();
×
1472
    }
1473
}
42✔
1474

1475
static std::filesystem::path
1476
line_buffer_cache_path()
563✔
1477
{
1478
    return lnav::paths::workdir() / "buffer-cache";
1,126✔
1479
}
1480

1481
void
1482
line_buffer::enable_cache()
215✔
1483
{
1484
    if (!this->lb_compressed || this->lb_cached_fd) {
215✔
1485
        log_info("%d: skipping cache request (compressed=%d already-cached=%d)",
215✔
1486
                 this->lb_fd.get(),
1487
                 this->lb_compressed,
1488
                 (bool) this->lb_cached_fd);
1489
        return;
215✔
1490
    }
1491

1492
    struct stat st;
1493

1494
    if (fstat(this->lb_fd, &st) == -1) {
×
1495
        log_error("failed to fstat(%d) - %d", this->lb_fd.get(), errno);
×
UNCOV
1496
        return;
×
1497
    }
1498

UNCOV
1499
    auto cached_base_name = hasher()
×
UNCOV
1500
                                .update(st.st_dev)
×
UNCOV
1501
                                .update(st.st_ino)
×
1502
                                .update(st.st_size)
×
UNCOV
1503
                                .to_string();
×
1504
    auto cache_dir = line_buffer_cache_path() / cached_base_name.substr(0, 2);
×
1505

UNCOV
1506
    std::filesystem::create_directories(cache_dir);
×
1507

UNCOV
1508
    auto cached_file_name = fmt::format(FMT_STRING("{}.bin"), cached_base_name);
×
UNCOV
1509
    auto cached_file_path = cache_dir / cached_file_name;
×
1510
    auto cached_done_path
UNCOV
1511
        = cache_dir / fmt::format(FMT_STRING("{}.done"), cached_base_name);
×
1512

UNCOV
1513
    log_info(
×
1514
        "%d:cache file path: %s", this->lb_fd.get(), cached_file_path.c_str());
1515

UNCOV
1516
    auto fl = lnav::filesystem::file_lock(cached_file_path);
×
UNCOV
1517
    auto guard = lnav::filesystem::file_lock::guard(&fl);
×
1518

1519
    if (std::filesystem::exists(cached_done_path)) {
×
1520
        log_info("%d:using existing cache file", this->lb_fd.get());
×
UNCOV
1521
        auto open_res = lnav::filesystem::open_file(cached_file_path, O_RDWR);
×
1522
        if (open_res.isOk()) {
×
1523
            this->lb_cached_fd = open_res.unwrap();
×
1524
            return;
×
1525
        }
UNCOV
1526
        std::filesystem::remove(cached_done_path);
×
1527
    }
1528

1529
    auto create_res = lnav::filesystem::create_file(
UNCOV
1530
        cached_file_path, O_RDWR | O_TRUNC, 0600);
×
UNCOV
1531
    if (create_res.isErr()) {
×
UNCOV
1532
        log_error("failed to create cache file: %s -- %s",
×
1533
                  cached_file_path.c_str(),
1534
                  create_res.unwrapErr().c_str());
UNCOV
1535
        return;
×
1536
    }
1537

UNCOV
1538
    auto write_fd = create_res.unwrap();
×
UNCOV
1539
    auto done = false;
×
1540

1541
    static constexpr ssize_t FILL_LENGTH = 1024 * 1024;
UNCOV
1542
    auto off = file_off_t{0};
×
UNCOV
1543
    while (!done) {
×
UNCOV
1544
        log_debug("%d: caching file content at %d", this->lb_fd.get(), off);
×
UNCOV
1545
        if (!this->fill_range(off, FILL_LENGTH)) {
×
UNCOV
1546
            log_debug("%d: caching finished", this->lb_fd.get());
×
UNCOV
1547
            done = true;
×
1548
        } else {
1549
            file_ssize_t avail;
1550

UNCOV
1551
            const auto* data = this->get_range(off, avail);
×
UNCOV
1552
            auto rc = write(write_fd, data, avail);
×
UNCOV
1553
            if (rc != avail) {
×
UNCOV
1554
                log_error("%d: short write!", this->lb_fd.get());
×
UNCOV
1555
                return;
×
1556
            }
1557

UNCOV
1558
            off += avail;
×
1559
        }
1560
    }
1561

UNCOV
1562
    lnav::filesystem::create_file(cached_done_path, O_WRONLY, 0600);
×
1563

UNCOV
1564
    this->lb_cached_fd = std::move(write_fd);
×
1565
}
1566

1567
std::future<void>
1568
line_buffer::cleanup_cache()
563✔
1569
{
1570
    return std::async(
1571
        std::launch::async, +[]() {
563✔
1572
            auto now = std::filesystem::file_time_type::clock::now();
563✔
1573
            auto cache_path = line_buffer_cache_path();
563✔
1574
            std::vector<std::filesystem::path> to_remove;
563✔
1575
            std::error_code ec;
563✔
1576

1577
            for (const auto& cache_subdir :
563✔
1578
                 std::filesystem::directory_iterator(cache_path, ec))
563✔
1579
            {
UNCOV
1580
                for (const auto& entry :
×
UNCOV
1581
                     std::filesystem::directory_iterator(cache_subdir, ec))
×
1582
                {
UNCOV
1583
                    auto mtime = std::filesystem::last_write_time(entry.path());
×
UNCOV
1584
                    auto exp_time = mtime + 1h;
×
UNCOV
1585
                    if (now < exp_time) {
×
UNCOV
1586
                        continue;
×
1587
                    }
1588

UNCOV
1589
                    to_remove.emplace_back(entry.path());
×
1590
                }
1591
            }
563✔
1592

1593
            for (auto& entry : to_remove) {
563✔
UNCOV
1594
                log_debug("removing compressed file cache: %s", entry.c_str());
×
UNCOV
1595
                std::filesystem::remove_all(entry, ec);
×
1596
            }
1597
        });
1,689✔
1598
}
1599

1600
void
1601
line_buffer::send_initial_load()
660✔
1602
{
1603
    if (!this->lb_seekable) {
660✔
UNCOV
1604
        log_warning("file is not seekable, not doing preload");
×
UNCOV
1605
        return;
×
1606
    }
1607

1608
    if (this->lb_loader_future.valid()) {
660✔
UNCOV
1609
        log_warning("preload is already active");
×
UNCOV
1610
        return;
×
1611
    }
1612

1613
    log_debug("sending initial load");
660✔
1614
    if (!this->lb_alt_buffer) {
660✔
1615
        // log_debug("allocating new buffer!");
1616
        this->lb_alt_buffer = auto_buffer::alloc(this->lb_buffer.capacity());
660✔
1617
    }
1618
    this->lb_loader_file_offset = 0;
660✔
1619
    auto prom = std::make_shared<std::promise<bool>>();
660✔
1620
    this->lb_loader_future = prom->get_future();
660✔
1621
    this->lb_stats.s_requested_preloads += 1;
660✔
1622
    isc::to<io_looper&, io_looper_tag>().send(
660✔
1623
        [this, prom](auto& ioloop) mutable {
1,320✔
1624
            prom->set_value(this->load_next_buffer());
660✔
1625
        });
660✔
1626
}
660✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc