• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tstack / lnav / 19368739780-2669

14 Nov 2025 03:03PM UTC coverage: 68.855% (-0.005%) from 68.86%
19368739780-2669

push

github

tstack
[perf] disable extra compression in line_buffer for non-logfile uses

15 of 15 new or added lines in 3 files covered. (100.0%)

7 existing lines in 2 files now uncovered.

50930 of 73967 relevant lines covered (68.86%)

433113.51 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.69
/src/line_buffer.cc
1
/**
2
 * Copyright (c) 2007-2012, Timothy Stack
3
 *
4
 * All rights reserved.
5
 *
6
 * Redistribution and use in source and binary forms, with or without
7
 * modification, are permitted provided that the following conditions are met:
8
 *
9
 * * Redistributions of source code must retain the above copyright notice, this
10
 * list of conditions and the following disclaimer.
11
 * * Redistributions in binary form must reproduce the above copyright notice,
12
 * this list of conditions and the following disclaimer in the documentation
13
 * and/or other materials provided with the distribution.
14
 * * Neither the name of Timothy Stack nor the names of its contributors
15
 * may be used to endorse or promote products derived from this software
16
 * without specific prior written permission.
17
 *
18
 * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ''AS IS'' AND ANY
19
 * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
20
 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
21
 * DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE FOR ANY
22
 * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
23
 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
24
 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
25
 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
26
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
27
 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28
 *
29
 * @file line_buffer.cc
30
 */
31

32
#include <errno.h>
33
#include <fcntl.h>
34
#include <stdio.h>
35
#include <string.h>
36
#include <unistd.h>
37

38
#include "config.h"
39

40
#ifdef HAVE_BZLIB_H
41
#    include <bzlib.h>
42
#endif
43

44
#include <algorithm>
45
#include <set>
46

47
#include "base/auto_mem.hh"
48
#include "base/auto_pid.hh"
49
#include "base/fs_util.hh"
50
#include "base/injector.bind.hh"
51
#include "base/injector.hh"
52
#include "base/is_utf8.hh"
53
#include "base/isc.hh"
54
#include "base/math_util.hh"
55
#include "base/paths.hh"
56
#include "fmtlib/fmt/format.h"
57
#include "hasher.hh"
58
#include "line_buffer.hh"
59
#include "piper.header.hh"
60
#include "scn/scan.h"
61
#include "yajlpp/yajlpp_def.hh"
62

63
using namespace std::chrono_literals;
64

65
static const ssize_t INITIAL_REQUEST_SIZE = 16 * 1024;
66
static const ssize_t DEFAULT_INCREMENT = 128 * 1024;
67
static const ssize_t INITIAL_COMPRESSED_BUFFER_SIZE = 5 * 1024 * 1024;
68
static const ssize_t MAX_COMPRESSED_BUFFER_SIZE = 32 * 1024 * 1024;
69

70
const ssize_t line_buffer::DEFAULT_LINE_BUFFER_SIZE = 256 * 1024;
71
const ssize_t line_buffer::MAX_LINE_BUFFER_SIZE
72
    = 4 * 4 * line_buffer::DEFAULT_LINE_BUFFER_SIZE;
73

74
class io_looper : public isc::service<io_looper> {};
75

76
struct io_looper_tag {};
77

78
static auto bound_io = injector::bind_multiple<isc::service_base>()
79
                           .add_singleton<io_looper, io_looper_tag>();
80

81
namespace injector {
82
template<>
83
void
84
force_linking(io_looper_tag anno)
644✔
85
{
86
}
644✔
87
}  // namespace injector
88

89
/*
90
 * XXX REMOVE ME
91
 *
92
 * The stock bzipped file code does not use pread, so we need to use a lock to
93
 * get exclusive access to the file.  In the future, we should just rewrite
94
 * the bzipped file code to use pread.
95
 */
96
class lock_hack {
97
public:
98
    class guard {
99
    public:
100
        guard() : g_lock(singleton()) { this->g_lock.lock(); }
2✔
101

102
        ~guard() { this->g_lock.unlock(); }
2✔
103

104
    private:
105
        lock_hack& g_lock;
106
    };
107

108
    static lock_hack& singleton()
2✔
109
    {
110
        static lock_hack retval;
2✔
111

112
        return retval;
2✔
113
    }
114

115
    void lock() { lockf(this->lh_fd, F_LOCK, 0); }
2✔
116

117
    void unlock() { lockf(this->lh_fd, F_ULOCK, 0); }
2✔
118

119
private:
120
    lock_hack()
1✔
121
    {
1✔
122
        char lockname[64];
123

124
        snprintf(lockname, sizeof(lockname), "/tmp/lnav.%d.lck", getpid());
1✔
125
        this->lh_fd = open(lockname, O_CREAT | O_RDWR, 0600);
1✔
126
        log_perror(fcntl(this->lh_fd, F_SETFD, FD_CLOEXEC));
1✔
127
        unlink(lockname);
1✔
128
    }
1✔
129

130
    auto_fd lh_fd;
131
};
132
/* XXX END */
133

134
#define Z_BUFSIZE      65536U
135
#define SYNCPOINT_SIZE (1024 * 1024)
136
line_buffer::gz_indexed::gz_indexed()
1,427✔
137
{
138
    if ((this->inbuf = auto_mem<Bytef>::malloc(Z_BUFSIZE)) == nullptr) {
1,427✔
139
        throw std::bad_alloc();
×
140
    }
141
}
1,427✔
142

143
void
144
line_buffer::gz_indexed::close()
1,441✔
145
{
146
    // Release old stream, if we were open
147
    if (*this) {
1,441✔
148
        inflateEnd(&this->strm);
7✔
149
        ::close(this->gz_fd);
7✔
150
        this->syncpoints.clear();
7✔
151
        this->gz_fd = -1;
7✔
152
    }
153
}
1,441✔
154

155
void
156
line_buffer::gz_indexed::init_stream()
25✔
157
{
158
    if (*this) {
25✔
159
        inflateEnd(&this->strm);
18✔
160
    }
161

162
    // initialize inflate struct
163
    int rc = inflateInit2(&this->strm, GZ_HEADER_MODE);
25✔
164
    this->strm.avail_in = 0;
25✔
165
    if (rc != Z_OK) {
25✔
166
        throw(rc);  // FIXME: exception wrapper
×
167
    }
168
}
25✔
169

170
void
171
line_buffer::gz_indexed::continue_stream()
8✔
172
{
173
    // Save our position and output buffer
174
    auto total_in = this->strm.total_in;
8✔
175
    auto total_out = this->strm.total_out;
8✔
176
    auto avail_out = this->strm.avail_out;
8✔
177
    auto next_out = this->strm.next_out;
8✔
178

179
    init_stream();
8✔
180

181
    // Restore position and output buffer
182
    this->strm.total_in = total_in;
8✔
183
    this->strm.total_out = total_out;
8✔
184
    this->strm.avail_out = avail_out;
8✔
185
    this->strm.next_out = next_out;
8✔
186
}
8✔
187

188
void
189
line_buffer::gz_indexed::open(int fd, lnav::gzip::header& hd)
7✔
190
{
191
    this->close();
7✔
192
    this->init_stream();
7✔
193
    this->gz_fd = fd;
7✔
194

195
    unsigned char name[1024];
196
    unsigned char comment[4096];
197

198
    name[0] = '\0';
7✔
199
    comment[0] = '\0';
7✔
200

201
    gz_header gz_hd;
202
    memset(&gz_hd, 0, sizeof(gz_hd));
7✔
203
    gz_hd.name = name;
7✔
204
    gz_hd.name_max = sizeof(name);
7✔
205
    gz_hd.comment = comment;
7✔
206
    gz_hd.comm_max = sizeof(comment);
7✔
207

208
    Bytef inbuf[8192];
209
    Bytef outbuf[8192];
210
    this->strm.next_out = outbuf;
7✔
211
    this->strm.total_out = 0;
7✔
212
    this->strm.avail_out = sizeof(outbuf);
7✔
213
    this->strm.next_in = inbuf;
7✔
214
    this->strm.total_in = 0;
7✔
215

216
    if (inflateGetHeader(&this->strm, &gz_hd) == Z_OK) {
7✔
217
        auto rc = pread(fd, inbuf, sizeof(inbuf), 0);
7✔
218
        if (rc >= 0) {
7✔
219
            this->strm.avail_in = rc;
7✔
220

221
            inflate(&this->strm, Z_BLOCK);
7✔
222
            inflateEnd(&this->strm);
7✔
223
            this->strm.next_out = Z_NULL;
7✔
224
            this->strm.next_in = Z_NULL;
7✔
225
            this->strm.next_in = Z_NULL;
7✔
226
            this->strm.total_in = 0;
7✔
227
            this->strm.avail_in = 0;
7✔
228
            this->init_stream();
7✔
229

230
            switch (gz_hd.done) {
7✔
231
                case 0:
×
232
                    log_debug("%d: no gzip header data", fd);
×
233
                    break;
×
234
                case 1:
7✔
235
                    hd.h_mtime.tv_sec = gz_hd.time;
7✔
236
                    name[sizeof(name) - 1] = '\0';
7✔
237
                    comment[sizeof(comment) - 1] = '\0';
7✔
238
                    hd.h_name = std::string((char*) name);
14✔
239
                    hd.h_comment = std::string((char*) comment);
7✔
240
                    log_info(
7✔
241
                        "%d: read gzip header (mtime=%ld; name='%s'; "
242
                        "comment='%s'; crc=%x)",
243
                        fd,
244
                        hd.h_mtime.tv_sec,
245
                        hd.h_name.c_str(),
246
                        hd.h_comment.c_str(),
247
                        gz_hd.hcrc);
248
                    break;
7✔
249
                default:
×
250
                    log_error("%d: failed to read gzip header data", fd);
×
251
                    break;
×
252
            }
253
        } else {
254
            log_error("%d: failed to read gzip header from file: %s",
×
255
                      fd,
256
                      strerror(errno));
257
        }
258
    } else {
259
        log_error("%d: unable to get gzip header", fd);
×
260
    }
261
}
7✔
262

263
int
264
line_buffer::gz_indexed::stream_data(void* buf, size_t size)
13✔
265
{
266
    this->strm.avail_out = size;
13✔
267
    this->strm.next_out = (unsigned char*) buf;
13✔
268

269
    size_t last = this->syncpoints.empty() ? 0 : this->syncpoints.back().in;
13✔
270
    while (this->strm.avail_out) {
39✔
271
        if (!this->strm.avail_in) {
39✔
272
            int rc = ::pread(
21✔
273
                this->gz_fd, &this->inbuf[0], Z_BUFSIZE, this->strm.total_in);
21✔
274
            if (rc < 0) {
21✔
275
                return rc;
×
276
            }
277
            this->strm.next_in = this->inbuf;
21✔
278
            this->strm.avail_in = rc;
21✔
279
        }
280
        if (this->strm.avail_in) {
39✔
281
            int flush = last > this->strm.total_in ? Z_SYNC_FLUSH : Z_BLOCK;
28✔
282
            auto err = inflate(&this->strm, flush);
28✔
283
            if (err == Z_STREAM_END) {
28✔
284
                // Reached end of stream; re-init for a possible subsequent
285
                // stream
286
                continue_stream();
8✔
287
            } else if (err != Z_OK) {
20✔
288
                log_error(" inflate-error at offset %lu: %d  %s",
2✔
289
                          this->strm.total_in,
290
                          (int) err,
291
                          this->strm.msg ? this->strm.msg : "");
292
                this->parent->lb_decompress_error = fmt::format(
2✔
293
                    FMT_STRING("inflate-error at offset {}: {}  {}"),
4✔
294
                    this->strm.total_in,
2✔
295
                    err,
296
                    this->strm.msg ? this->strm.msg : "");
4✔
297
                break;
2✔
298
            }
299

300
            if (this->strm.total_in >= last + SYNCPOINT_SIZE
26✔
301
                && size > this->strm.avail_out + GZ_WINSIZE
×
302
                && (this->strm.data_type & GZ_END_OF_BLOCK_MASK)
×
303
                && !(this->strm.data_type & GZ_END_OF_FILE_MASK))
×
304
            {
305
                this->syncpoints.emplace_back(this->strm, size);
×
306
                last = this->strm.total_out;
×
307
            }
308
        } else if (this->strm.avail_out) {
11✔
309
            // Processed all the gz file data but didn't fill
310
            // the output buffer.  We're done, even though we
311
            // produced fewer bytes than requested.
312
            break;
11✔
313
        }
314
    }
315
    return size - this->strm.avail_out;
13✔
316
}
317

318
void
319
line_buffer::gz_indexed::seek(off_t offset)
3✔
320
{
321
    if ((size_t) offset == this->strm.total_out) {
3✔
322
        return;
×
323
    }
324

325
    indexDict* dict = nullptr;
3✔
326
    // Find highest syncpoint not past offset
327
    // FIXME: Make this a binary-tree search
328
    for (auto& d : this->syncpoints) {
3✔
329
        if (d.out <= offset) {
×
330
            dict = &d;
×
331
        } else {
332
            break;
×
333
        }
334
    }
335

336
    // Choose highest available syncpoint, or keep current offset if it's ok
337
    if ((size_t) offset < this->strm.total_out
3✔
338
        || (dict && this->strm.total_out < (size_t) dict->out))
×
339
    {
340
        // Release the old z_stream
341
        inflateEnd(&this->strm);
3✔
342
        if (dict) {
3✔
343
            dict->apply(&this->strm);
×
344
        } else {
345
            init_stream();
3✔
346
        }
347
    }
348

349
    // Stream from compressed file until we reach our offset
350
    unsigned char dummy[Z_BUFSIZE];
351
    while ((size_t) offset > this->strm.total_out) {
3✔
352
        size_t to_copy
353
            = std::min(static_cast<size_t>(Z_BUFSIZE),
×
354
                       static_cast<size_t>(offset - this->strm.total_out));
355
        auto bytes = stream_data(dummy, to_copy);
×
356
        if (bytes <= 0) {
×
357
            break;
×
358
        }
359
    }
360
}
361

362
int
363
line_buffer::gz_indexed::read(void* buf, size_t offset, size_t size)
13✔
364
{
365
    if (offset != this->strm.total_out) {
13✔
366
        // log_debug("doing seek!  %zu %lu", offset, this->strm.total_out);
367
        this->seek(offset);
3✔
368
    }
369

370
    int bytes = stream_data(buf, size);
13✔
371

372
    return bytes;
13✔
373
}
374

375
line_buffer::line_buffer()
1,427✔
376
{
377
    this->lb_gz_file.writeAccess()->parent = this;
1,427✔
378

379
    ensure(this->invariant());
1,427✔
380
}
1,427✔
381

382
line_buffer::~line_buffer()
1,427✔
383
{
384
    if (this->lb_loader_future.valid()) {
1,427✔
385
        this->lb_loader_future.wait();
53✔
386
    }
387

388
    auto empty_fd = auto_fd();
1,427✔
389

390
    // Make sure any shared refs take ownership of the data.
391
    this->lb_share_manager.invalidate_refs();
1,427✔
392
    this->set_fd(empty_fd);
1,427✔
393
}
1,427✔
394

395
void
396
line_buffer::set_fd(auto_fd& fd)
2,831✔
397
{
398
    file_off_t newoff = 0;
2,831✔
399

400
    {
401
        safe::WriteAccess<safe_gz_indexed> gi(this->lb_gz_file);
2,831✔
402

403
        if (*gi) {
2,831✔
404
            gi->close();
7✔
405
        }
406
    }
2,831✔
407

408
    if (this->lb_bz_file) {
2,831✔
409
        this->lb_bz_file = false;
1✔
410
    }
411

412
    if (fd != -1) {
2,831✔
413
        /* Sync the fd's offset with the object. */
414
        newoff = lseek(fd, 0, SEEK_CUR);
1,404✔
415
        if (newoff == -1) {
1,404✔
416
            if (errno != ESPIPE) {
69✔
417
                throw error(errno);
×
418
            }
419

420
            /* It's a pipe, start with a zero offset. */
421
            newoff = 0;
69✔
422
            this->lb_seekable = false;
69✔
423
        } else {
424
            char gz_id[2 + 1 + 1 + 4];
425

426
            if (pread(fd, gz_id, sizeof(gz_id), 0) == sizeof(gz_id)) {
1,335✔
427
                auto piper_hdr_opt = lnav::piper::read_header(fd, gz_id);
1,302✔
428

429
                if (piper_hdr_opt) {
1,302✔
430
                    static const intern_string_t SRC
431
                        = intern_string::lookup("piper");
202✔
432

433
                    auto meta_buf = std::move(piper_hdr_opt.value());
120✔
434

435
                    auto meta_sf = string_fragment::from_bytes(meta_buf.in(),
120✔
436
                                                               meta_buf.size());
437
                    auto meta_parse_res
438
                        = lnav::piper::header_handlers.parser_for(SRC).of(
240✔
439
                            meta_sf);
120✔
440
                    if (meta_parse_res.isErr()) {
120✔
441
                        log_error("failed to parse piper header: %s",
×
442
                                  meta_parse_res.unwrapErr()[0]
443
                                      .to_attr_line()
444
                                      .get_string()
445
                                      .c_str());
446
                        throw error(EINVAL);
×
447
                    }
448

449
                    this->lb_line_metadata = true;
120✔
450
                    this->lb_file_offset
451
                        = lnav::piper::HEADER_SIZE + meta_buf.size();
120✔
452
                    this->lb_piper_header_size = this->lb_file_offset;
120✔
453
                    this->lb_header = meta_parse_res.unwrap();
120✔
454
                } else if (gz_id[0] == '\037' && gz_id[1] == '\213') {
1,302✔
455
                    int gzfd = dup(fd);
7✔
456

457
                    log_perror(fcntl(gzfd, F_SETFD, FD_CLOEXEC));
7✔
458
                    if (lseek(fd, 0, SEEK_SET) < 0) {
7✔
459
                        close(gzfd);
×
460
                        throw error(errno);
×
461
                    }
462
                    lnav::gzip::header hdr;
7✔
463

464
                    this->lb_gz_file.writeAccess()->open(gzfd, hdr);
7✔
465
                    this->lb_compressed = true;
7✔
466
                    this->lb_file_time = hdr.h_mtime.tv_sec;
7✔
467
                    if (this->lb_file_time < 0) {
7✔
468
                        this->lb_file_time = 0;
×
469
                    }
470
                    this->lb_compressed_offset = 0;
7✔
471
                    if (!hdr.empty()) {
7✔
472
                        this->lb_header = std::move(hdr);
7✔
473
                    }
474
                    if (this->lb_decompress_extra) {
7✔
475
                        this->resize_buffer(INITIAL_COMPRESSED_BUFFER_SIZE);
4✔
476
                    }
477
                }
7✔
478
#ifdef HAVE_BZLIB_H
479
                else if (gz_id[0] == 'B' && gz_id[1] == 'Z')
1,175✔
480
                {
481
                    if (lseek(fd, 0, SEEK_SET) < 0) {
1✔
482
                        throw error(errno);
×
483
                    }
484
                    this->lb_bz_file = true;
1✔
485
                    this->lb_compressed = true;
1✔
486

487
                    /*
488
                     * Loading data from a bzip2 file is pretty slow, so we try
489
                     * to keep as much in memory as possible.
490
                     */
491
                    if (this->lb_decompress_extra) {
1✔
492
                        this->resize_buffer(INITIAL_COMPRESSED_BUFFER_SIZE);
1✔
493
                    }
494

495
                    this->lb_compressed_offset = 0;
1✔
496
                }
497
#endif
498
            }
1,302✔
499
            this->lb_seekable = true;
1,335✔
500
        }
501
    }
502
    this->lb_file_offset = newoff;
2,831✔
503
    this->lb_buffer.clear();
2,831✔
504
    this->lb_fd = std::move(fd);
2,831✔
505

506
    ensure(this->invariant());
2,831✔
507
}
2,831✔
508

509
void
510
line_buffer::resize_buffer(size_t new_max)
17✔
511
{
512
    if (((this->lb_compressed && new_max <= MAX_COMPRESSED_BUFFER_SIZE)
14✔
513
         || (!this->lb_compressed && new_max <= MAX_LINE_BUFFER_SIZE))
3✔
514
        && !this->lb_buffer.has_capacity_for(new_max))
34✔
515
    {
516
        /* Still need more space, try a realloc. */
517
        this->lb_share_manager.invalidate_refs();
13✔
518
        this->lb_buffer.expand_to(new_max);
13✔
519
    }
520
}
17✔
521

522
void
523
line_buffer::ensure_available(file_off_t start,
3,707✔
524
                              ssize_t max_length,
525
                              scan_direction dir)
526
{
527
    ssize_t prefill;
528

529
    require(this->lb_compressed || max_length <= MAX_LINE_BUFFER_SIZE);
3,707✔
530

531
    // log_debug("ensure avail %d %d", start, max_length);
532

533
    if (this->lb_file_size != -1) {
3,707✔
534
        if (start + (file_off_t) max_length > this->lb_file_size) {
94✔
535
            max_length = (this->lb_file_size - start);
83✔
536
        }
537
    }
538

539
    /*
540
     * Check to see if the start is inside the cached range or immediately
541
     * after.
542
     */
543
    if (start < this->lb_file_offset
7,414✔
544
        || start > (file_off_t) (this->lb_file_offset + this->lb_buffer.size()))
3,707✔
545
    {
546
        /*
547
         * The request is outside the cached range, need to reload the
548
         * whole thing.
549
         */
550
        this->lb_share_manager.invalidate_refs();
1,096✔
551
        prefill = 0;
1,096✔
552
        this->lb_buffer.clear();
1,096✔
553

554
        switch (dir) {
1,096✔
555
            case scan_direction::forward:
1,094✔
556
                break;
1,094✔
557
            case scan_direction::backward: {
2✔
558
                auto padded_max_length = max_length * 4;
2✔
559
                if (this->lb_buffer.has_capacity_for(padded_max_length)) {
2✔
560
                    start = std::max(
4✔
561
                        file_off_t{0},
4✔
562
                        static_cast<file_off_t>(start
2✔
563
                                                - (this->lb_buffer.capacity()
4✔
564
                                                   - padded_max_length)));
2✔
565
                }
566
                break;
2✔
567
            }
568
        }
569

570
        if (this->lb_file_size == (ssize_t) -1) {
1,096✔
571
            this->lb_file_offset = start;
1,096✔
572
        } else {
573
            require(start <= this->lb_file_size);
×
574
            /*
575
             * If the start is near the end of the file, move the offset back a
576
             * bit so we can get more of the file in the cache.
577
             */
578
            if (start + (ssize_t) this->lb_buffer.capacity()
×
579
                > this->lb_file_size)
×
580
            {
581
                this->lb_file_offset = this->lb_file_size
×
582
                    - std::min(this->lb_file_size,
×
583
                               (file_ssize_t) this->lb_buffer.capacity());
×
584
            } else {
585
                this->lb_file_offset = start;
×
586
            }
587
        }
588
    } else {
589
        /* The request is in the cached range.  Record how much extra data is in
590
         * the buffer before the requested range.
591
         */
592
        prefill = start - this->lb_file_offset;
2,611✔
593
    }
594
    require(this->lb_file_offset <= start);
3,707✔
595
    require(prefill <= (ssize_t) this->lb_buffer.size());
3,707✔
596

597
    ssize_t available
598
        = this->lb_buffer.capacity() - (start - this->lb_file_offset);
3,707✔
599
    if (max_length > available) {
3,707✔
600
        // log_debug("need more space!");
601
        /*
602
         * Need more space, move any existing data to the front of the
603
         * buffer.
604
         */
605
        this->lb_share_manager.invalidate_refs();
695✔
606

607
        this->lb_buffer.resize_by(-prefill);
695✔
608
        this->lb_file_offset += prefill;
695✔
609
        // log_debug("adjust file offset for prefill %d", this->lb_file_offset);
610
        memmove(this->lb_buffer.at(0),
1,390✔
611
                this->lb_buffer.at(prefill),
695✔
612
                this->lb_buffer.size());
613

614
        available = this->lb_buffer.capacity() - (start - this->lb_file_offset);
695✔
615
        if (max_length > available) {
695✔
616
            this->resize_buffer(roundup_size(max_length, DEFAULT_INCREMENT));
3✔
617
        }
618
    }
619
    this->lb_line_starts.clear();
3,707✔
620
    this->lb_line_is_utf.clear();
3,707✔
621
}
3,707✔
622

623
bool
624
line_buffer::load_next_buffer()
706✔
625
{
626
    // log_debug("loader here!");
627
    auto retval = false;
706✔
628
    auto start = this->lb_loader_file_offset.value();
706✔
629
    ssize_t rc = 0;
706✔
630
    safe::WriteAccess<safe_gz_indexed> gi(this->lb_gz_file);
706✔
631

632
    // log_debug("BEGIN preload read");
633
    /* ... read in the new data. */
634
    if (!this->lb_cached_fd && *gi) {
706✔
635
        if (this->lb_file_size != (ssize_t) -1 && this->in_range(start)
×
636
            && this->in_range(this->lb_file_size - 1))
4✔
637
        {
638
            rc = 0;
×
639
        } else {
640
            // log_debug("async decomp start");
641
            rc = gi->read(this->lb_alt_buffer.value().end(),
8✔
642
                          start + this->lb_alt_buffer.value().size(),
4✔
643
                          this->lb_alt_buffer.value().available());
4✔
644
            this->lb_compressed_offset = gi->get_source_offset();
4✔
645
            ensure(this->lb_compressed_offset >= 0);
4✔
646
            if (rc != -1
4✔
647
                && rc < (ssize_t) this->lb_alt_buffer.value().available()
4✔
648
                && (start + (ssize_t) this->lb_alt_buffer.value().size() + rc
12✔
649
                    > this->lb_file_size))
4✔
650
            {
651
                this->lb_file_size
652
                    = (start + this->lb_alt_buffer.value().size() + rc);
4✔
653
                log_info("fd(%d): set file size to %llu",
4✔
654
                         this->lb_fd.get(),
655
                         this->lb_file_size);
656
            }
657
#if 0
658
            log_debug("async decomp end  %d+%d:%d",
659
                      this->lb_alt_buffer->size(),
660
                      rc,
661
                      this->lb_alt_buffer->capacity());
662
#endif
663
        }
664
    }
665
#ifdef HAVE_BZLIB_H
666
    else if (!this->lb_cached_fd && this->lb_bz_file)
702✔
667
    {
668
        if (this->lb_file_size != (ssize_t) -1
2✔
669
            && (((ssize_t) start >= this->lb_file_size)
1✔
670
                || (this->in_range(start)
×
671
                    && this->in_range(this->lb_file_size - 1))))
×
672
        {
673
            rc = 0;
×
674
        } else {
675
            lock_hack::guard guard;
1✔
676
            char scratch[32 * 1024];
677
            BZFILE* bz_file;
678
            file_off_t seek_to;
679
            int bzfd;
680

681
            /*
682
             * Unfortunately, there is no bzseek, so we need to reopen the
683
             * file every time we want to do a read.
684
             */
685
            bzfd = dup(this->lb_fd);
1✔
686
            if (lseek(this->lb_fd, 0, SEEK_SET) < 0) {
1✔
687
                close(bzfd);
×
688
                throw error(errno);
×
689
            }
690
            if ((bz_file = BZ2_bzdopen(bzfd, "r")) == nullptr) {
1✔
691
                close(bzfd);
×
692
                if (errno == 0) {
×
693
                    throw std::bad_alloc();
×
694
                } else {
695
                    throw error(errno);
×
696
                }
697
            }
698

699
            seek_to = start + this->lb_alt_buffer.value().size();
1✔
700
            while (seek_to > 0) {
1✔
701
                int count;
702

703
                count = BZ2_bzread(bz_file,
×
704
                                   scratch,
705
                                   std::min((size_t) seek_to, sizeof(scratch)));
×
706
                seek_to -= count;
×
707
            }
708
            rc = BZ2_bzread(bz_file,
1✔
709
                            this->lb_alt_buffer->end(),
1✔
710
                            this->lb_alt_buffer->available());
1✔
711
            this->lb_compressed_offset = 0;
1✔
712
            BZ2_bzclose(bz_file);
1✔
713

714
            if (rc != -1
1✔
715
                && (rc < (ssize_t) (this->lb_alt_buffer.value().available()))
1✔
716
                && (start + (ssize_t) this->lb_alt_buffer.value().size() + rc
3✔
717
                    > this->lb_file_size))
1✔
718
            {
719
                this->lb_file_size
720
                    = (start + this->lb_alt_buffer.value().size() + rc);
1✔
721
                log_info("fd(%d): set file size to %llu",
1✔
722
                         this->lb_fd.get(),
723
                         this->lb_file_size);
724
            }
725
        }
1✔
726
    }
727
#endif
728
    else
729
    {
730
        rc = pread(this->lb_cached_fd ? this->lb_cached_fd.value().get()
1,402✔
731
                                      : this->lb_fd.get(),
701✔
732
                   this->lb_alt_buffer.value().end(),
701✔
733
                   this->lb_alt_buffer.value().available(),
701✔
734
                   start + this->lb_alt_buffer.value().size());
701✔
735
    }
736
    // XXX For some reason, cygwin is giving us a bogus return value when
737
    // up to the end of the file.
738
    if (rc > (ssize_t) this->lb_alt_buffer.value().available()) {
706✔
739
        rc = -1;
×
740
#ifdef ENODATA
741
        errno = ENODATA;
×
742
#else
743
        errno = EAGAIN;
744
#endif
745
    }
746
    switch (rc) {
706✔
747
        case 0:
14✔
748
            if (start < (file_off_t) this->lb_file_size) {
14✔
749
                retval = true;
×
750
            }
751
            break;
14✔
752

753
        case (ssize_t) -1:
×
754
            switch (errno) {
×
755
#ifdef ENODATA
756
                /* Cygwin seems to return this when pread reaches the end of
757
                 * the file. */
758
                case ENODATA:
×
759
#endif
760
                case EINTR:
761
                case EAGAIN:
762
                    break;
×
763

764
                default:
×
765
                    throw error(errno);
×
766
            }
767
            break;
×
768

769
        default:
692✔
770
            this->lb_alt_buffer.value().resize_by(rc);
692✔
771
            retval = true;
692✔
772
            break;
692✔
773
    }
774
    // log_debug("END preload read");
775

776
    if (start > this->lb_last_line_offset) {
706✔
777
        const auto* line_start = this->lb_alt_buffer.value().begin();
706✔
778

779
        do {
780
            auto before = line_start - this->lb_alt_buffer->begin();
19,308✔
781
            const auto remaining = this->lb_alt_buffer.value().size() - before;
19,308✔
782
            const auto frag
783
                = string_fragment::from_bytes(line_start, remaining);
19,308✔
784
            auto utf_scan_res = is_utf8(frag, '\n');
19,308✔
785
            const auto* lf = utf_scan_res.remaining_ptr();
19,308✔
786
            this->lb_alt_line_starts.emplace_back(before);
19,308✔
787
            this->lb_alt_line_is_utf.emplace_back(utf_scan_res.is_valid());
19,308✔
788
            this->lb_alt_line_has_ansi.emplace_back(utf_scan_res.usr_has_ansi);
19,308✔
789

790
            line_start = lf;
19,308✔
791
        } while (line_start != nullptr
792
                 && line_start < this->lb_alt_buffer->end());
19,308✔
793
    }
794

795
    return retval;
706✔
796
}
706✔
797

798
bool
799
line_buffer::fill_range(file_off_t start,
3,153✔
800
                        ssize_t max_length,
801
                        scan_direction dir)
802
{
803
    bool retval = false;
3,153✔
804

805
    require(start >= 0);
3,153✔
806

807
#if 0
808
    log_debug("(%p) fill range %d %d (%d) %d",
809
              this,
810
              start,
811
              max_length,
812
              this->lb_file_offset,
813
              this->lb_buffer.size());
814
#endif
815
    if (!lnav::pid::in_child && this->lb_loader_future.valid()
3,153✔
816
        && start >= this->lb_loader_file_offset.value())
6,306✔
817
    {
818
#if 0
819
        log_debug("getting preload! %d %d",
820
                  start,
821
                  this->lb_loader_file_offset.value());
822
#endif
823
        std::optional<std::chrono::system_clock::time_point> wait_start;
653✔
824

825
        if (this->lb_loader_future.wait_for(std::chrono::seconds(0))
653✔
826
            != std::future_status::ready)
653✔
827
        {
828
            wait_start = std::make_optional(std::chrono::system_clock::now());
163✔
829
        }
830
        retval = this->lb_loader_future.get();
653✔
831
        if (false && wait_start) {
832
            auto diff = std::chrono::system_clock::now() - wait_start.value();
833
            log_debug("wait done! %lld", diff.count());
834
        }
835
        // log_debug("got preload");
836
        this->lb_loader_future = {};
653✔
837
        this->lb_share_manager.invalidate_refs();
653✔
838
        this->lb_file_offset = this->lb_loader_file_offset.value();
653✔
839
        this->lb_loader_file_offset = std::nullopt;
653✔
840
        this->lb_buffer.swap(this->lb_alt_buffer.value());
653✔
841
        this->lb_alt_buffer.value().clear();
653✔
842
        this->lb_line_starts = std::move(this->lb_alt_line_starts);
653✔
843
        this->lb_alt_line_starts.clear();
653✔
844
        this->lb_line_is_utf = std::move(this->lb_alt_line_is_utf);
653✔
845
        this->lb_alt_line_is_utf.clear();
653✔
846
        this->lb_line_has_ansi = std::move(this->lb_alt_line_has_ansi);
653✔
847
        this->lb_alt_line_has_ansi.clear();
653✔
848
        this->lb_stats.s_used_preloads += 1;
653✔
849
        this->lb_next_line_start_index = 0;
653✔
850
        this->lb_next_buffer_offset = 0;
653✔
851
    }
852
    if (this->in_range(start)
3,153✔
853
        && (max_length == 0 || this->in_range(start + max_length - 1)))
3,153✔
854
    {
855
        /* Cache already has the data, nothing to do. */
856
        retval = true;
63✔
857
        if (this->lb_do_preloading && !lnav::pid::in_child && this->lb_seekable
63✔
858
            && this->lb_buffer.full() && !this->lb_loader_file_offset)
126✔
859
        {
860
            // log_debug("loader available start=%d", start);
861
            auto last_lf_iter = std::find(
862
                this->lb_buffer.rbegin(), this->lb_buffer.rend(), '\n');
3✔
863
            if (last_lf_iter != this->lb_buffer.rend()) {
3✔
864
                auto usable_size
865
                    = std::distance(last_lf_iter, this->lb_buffer.rend());
3✔
866
                // log_debug("found linefeed %d", usable_size);
867
                if (!this->lb_alt_buffer) {
3✔
868
                    // log_debug("allocating new buffer!");
869
                    this->lb_alt_buffer
870
                        = auto_buffer::alloc(this->lb_buffer.capacity());
×
871
                }
872
                this->lb_alt_buffer->resize(this->lb_buffer.size()
3✔
873
                                            - usable_size);
3✔
874
                memcpy(this->lb_alt_buffer.value().begin(),
6✔
875
                       this->lb_buffer.at(usable_size),
3✔
876
                       this->lb_alt_buffer->size());
877
                this->lb_loader_file_offset
878
                    = this->lb_file_offset + usable_size;
3✔
879
#if 0
880
                log_debug("load offset %d",
881
                          this->lb_loader_file_offset.value());
882
                log_debug("launch loader");
883
#endif
884
                auto prom = std::make_shared<std::promise<bool>>();
3✔
885
                this->lb_loader_future = prom->get_future();
3✔
886
                this->lb_stats.s_requested_preloads += 1;
3✔
887
                isc::to<io_looper&, io_looper_tag>().send(
3✔
888
                    [this, prom](auto& ioloop) mutable {
6✔
889
                        prom->set_value(this->load_next_buffer());
3✔
890
                    });
3✔
891
            }
3✔
892
        }
893
    } else if (this->lb_fd != -1) {
3,090✔
894
        ssize_t rc;
895

896
        /* Make sure there is enough space, then */
897
        this->ensure_available(start, max_length, dir);
3,090✔
898

899
        safe::WriteAccess<safe_gz_indexed> gi(this->lb_gz_file);
3,090✔
900

901
        /* ... read in the new data. */
902
        if (!this->lb_cached_fd && *gi) {
3,090✔
903
            // log_debug("old decomp start");
904
            if (this->lb_file_size != (ssize_t) -1 && this->in_range(start)
10✔
905
                && this->in_range(this->lb_file_size - 1))
23✔
906
            {
907
                rc = 0;
4✔
908
            } else {
909
                this->lb_stats.s_decompressions += 1;
9✔
910
                if (false && this->lb_last_line_offset > 0) {
911
                    this->lb_stats.s_hist[(this->lb_file_offset * 10)
912
                                          / this->lb_last_line_offset]
913
                        += 1;
914
                }
915
                rc = gi->read(this->lb_buffer.end(),
18✔
916
                              this->lb_file_offset + this->lb_buffer.size(),
9✔
917
                              this->lb_buffer.available());
918
                this->lb_compressed_offset = gi->get_source_offset();
9✔
919
                ensure(this->lb_compressed_offset >= 0);
9✔
920
                if (rc != -1 && (rc < (ssize_t) this->lb_buffer.available())) {
9✔
921
                    this->lb_file_size
922
                        = (this->lb_file_offset + this->lb_buffer.size() + rc);
9✔
923
                    log_info("fd(%d): rc (%zd) -- set file size to %llu",
9✔
924
                             this->lb_fd.get(),
925
                             rc,
926
                             this->lb_file_size);
927
                }
928
            }
929
#if 0
930
            log_debug("old decomp end -- %d+%d:%d",
931
                      this->lb_buffer.size(),
932
                      rc,
933
                      this->lb_buffer.capacity());
934
#endif
935
        }
936
#ifdef HAVE_BZLIB_H
937
        else if (!this->lb_cached_fd && this->lb_bz_file)
3,077✔
938
        {
939
            if (this->lb_file_size != (ssize_t) -1
6✔
940
                && (((ssize_t) start >= this->lb_file_size)
5✔
941
                    || (this->in_range(start)
2✔
942
                        && this->in_range(this->lb_file_size - 1))))
1✔
943
            {
944
                rc = 0;
2✔
945
            } else {
946
                lock_hack::guard guard;
1✔
947
                char scratch[32 * 1024];
948
                BZFILE* bz_file;
949
                file_off_t seek_to;
950
                int bzfd;
951

952
                /*
953
                 * Unfortunately, there is no bzseek, so we need to reopen the
954
                 * file every time we want to do a read.
955
                 */
956
                bzfd = dup(this->lb_fd);
1✔
957
                if (lseek(this->lb_fd, 0, SEEK_SET) < 0) {
1✔
958
                    close(bzfd);
×
959
                    throw error(errno);
×
960
                }
961
                if ((bz_file = BZ2_bzdopen(bzfd, "r")) == NULL) {
1✔
962
                    close(bzfd);
×
963
                    if (errno == 0) {
×
964
                        throw std::bad_alloc();
×
965
                    } else {
966
                        throw error(errno);
×
967
                    }
968
                }
969

970
                seek_to = this->lb_file_offset + this->lb_buffer.size();
1✔
971
                while (seek_to > 0) {
1✔
972
                    int count;
973

974
                    count = BZ2_bzread(
×
975
                        bz_file,
976
                        scratch,
977
                        std::min((size_t) seek_to, sizeof(scratch)));
×
978
                    seek_to -= count;
×
979
                }
980
                rc = BZ2_bzread(bz_file,
1✔
981
                                this->lb_buffer.end(),
1✔
982
                                this->lb_buffer.available());
1✔
983
                this->lb_compressed_offset = 0;
1✔
984
                BZ2_bzclose(bz_file);
1✔
985

986
                if (rc != -1 && (rc < (ssize_t) this->lb_buffer.available())) {
1✔
987
                    this->lb_file_size
988
                        = (this->lb_file_offset + this->lb_buffer.size() + rc);
1✔
989
                    log_info("fd(%d): set file size to %llu",
1✔
990
                             this->lb_fd.get(),
991
                             this->lb_file_size);
992
                }
993
            }
1✔
994
        }
995
#endif
996
        else if (this->lb_seekable)
3,074✔
997
        {
998
            this->lb_stats.s_preads += 1;
2,893✔
999
            if (false && this->lb_last_line_offset > 0) {
1000
                this->lb_stats.s_hist[(this->lb_file_offset * 10)
1001
                                      / this->lb_last_line_offset]
1002
                    += 1;
1003
            }
1004
#if 0
1005
            log_debug("%d: pread %lld",
1006
                      this->lb_fd.get(),
1007
                      this->lb_file_offset + this->lb_buffer.size());
1008
#endif
1009
            rc = pread(this->lb_cached_fd ? this->lb_cached_fd.value().get()
5,786✔
1010
                                          : this->lb_fd.get(),
2,893✔
1011
                       this->lb_buffer.end(),
2,893✔
1012
                       this->lb_buffer.available(),
1013
                       this->lb_file_offset + this->lb_buffer.size());
2,893✔
1014
            // log_debug("pread rc %d", rc);
1015
        } else {
1016
            rc = read(this->lb_fd,
362✔
1017
                      this->lb_buffer.end(),
181✔
1018
                      this->lb_buffer.available());
1019
        }
1020
        // XXX For some reason, cygwin is giving us a bogus return value when
1021
        // up to the end of the file.
1022
        if (rc > (ssize_t) this->lb_buffer.available()) {
3,090✔
1023
            rc = -1;
×
1024
#ifdef ENODATA
1025
            errno = ENODATA;
×
1026
#else
1027
            errno = EAGAIN;
1028
#endif
1029
        }
1030
        switch (rc) {
3,090✔
1031
            case 0:
1,415✔
1032
                if (!this->lb_seekable) {
1,415✔
1033
                    this->lb_file_size
1034
                        = this->lb_file_offset + this->lb_buffer.size();
133✔
1035
                }
1036
                if (start < (file_off_t) this->lb_file_size) {
1,415✔
1037
                    retval = true;
12✔
1038
                }
1039

1040
                if (this->lb_compressed) {
1,415✔
1041
                    /*
1042
                     * For compressed files, increase the buffer size so we
1043
                     * don't have to spend as much time uncompressing the data.
1044
                     */
1045
                    if (this->lb_decompress_extra) {
9✔
1046
                        this->resize_buffer(MAX_COMPRESSED_BUFFER_SIZE);
9✔
1047
                    }
1048
                }
1049
                break;
1,415✔
1050

UNCOV
1051
            case (ssize_t) -1:
×
UNCOV
1052
                switch (errno) {
×
1053
#ifdef ENODATA
1054
                    /* Cygwin seems to return this when pread reaches the end of
1055
                     * the */
1056
                    /* file. */
UNCOV
1057
                    case ENODATA:
×
1058
#endif
1059
                    case EINTR:
1060
                    case EAGAIN:
UNCOV
1061
                        break;
×
1062

1063
                    default:
×
1064
                        throw error(errno);
×
1065
                }
UNCOV
1066
                break;
×
1067

1068
            default:
1,675✔
1069
                this->lb_buffer.resize_by(rc);
1,675✔
1070
                retval = true;
1,675✔
1071
                break;
1,675✔
1072
        }
1073

1074
        if (this->lb_do_preloading && !lnav::pid::in_child && this->lb_seekable
2,289✔
1075
            && this->lb_buffer.full() && !this->lb_loader_file_offset)
5,379✔
1076
        {
1077
            // log_debug("loader available2 start=%d", start);
1078
            auto last_lf_iter = std::find(
1079
                this->lb_buffer.rbegin(), this->lb_buffer.rend(), '\n');
×
1080
            if (last_lf_iter != this->lb_buffer.rend()) {
×
1081
                auto usable_size
1082
                    = std::distance(last_lf_iter, this->lb_buffer.rend());
×
1083
                // log_debug("found linefeed %d", usable_size);
1084
                if (!this->lb_alt_buffer) {
×
1085
                    // log_debug("allocating new buffer!");
1086
                    this->lb_alt_buffer
1087
                        = auto_buffer::alloc(this->lb_buffer.capacity());
×
1088
                } else if (this->lb_alt_buffer->capacity()
×
1089
                           < this->lb_buffer.capacity())
×
1090
                {
1091
                    this->lb_alt_buffer->expand_to(this->lb_buffer.capacity());
×
1092
                }
1093
                this->lb_alt_buffer->resize(this->lb_buffer.size()
×
1094
                                            - usable_size);
×
1095
                memcpy(this->lb_alt_buffer->begin(),
×
1096
                       this->lb_buffer.at(usable_size),
×
1097
                       this->lb_alt_buffer->size());
1098
                this->lb_loader_file_offset
1099
                    = this->lb_file_offset + usable_size;
×
1100
#if 0
1101
                log_debug("load offset %d",
1102
                          this->lb_loader_file_offset.value());
1103
                log_debug("launch loader");
1104
#endif
1105
                auto prom = std::make_shared<std::promise<bool>>();
×
1106
                this->lb_loader_future = prom->get_future();
×
1107
                this->lb_stats.s_requested_preloads += 1;
×
1108
                isc::to<io_looper&, io_looper_tag>().send(
×
1109
                    [this, prom](auto& ioloop) mutable {
×
1110
                        prom->set_value(this->load_next_buffer());
×
1111
                    });
×
1112
            }
1113
        }
1114
        ensure(this->lb_buffer.size() <= this->lb_buffer.capacity());
3,090✔
1115
    }
3,090✔
1116

1117
    return retval;
3,153✔
1118
}
1119

1120
Result<line_info, std::string>
1121
line_buffer::load_next_line(file_range prev_line)
20,423✔
1122
{
1123
    const char* line_start = nullptr;
20,423✔
1124
    bool done = false;
20,423✔
1125
    line_info retval;
20,423✔
1126

1127
    require(this->lb_fd != -1);
20,423✔
1128

1129
    if (this->lb_line_metadata && prev_line.fr_offset == 0) {
20,423✔
1130
        prev_line.fr_offset = this->lb_piper_header_size;
120✔
1131
    }
1132

1133
    auto offset = prev_line.next_offset();
20,423✔
1134
    ssize_t request_size = INITIAL_REQUEST_SIZE;
20,423✔
1135
    retval.li_file_range.fr_offset = offset;
20,423✔
1136
    if (this->lb_buffer.empty() || !this->in_range(offset)) {
20,423✔
1137
        this->fill_range(offset, this->lb_buffer.capacity());
2,448✔
1138
    } else if (offset
17,975✔
1139
               == this->lb_file_offset + (ssize_t) this->lb_buffer.size())
17,975✔
1140
    {
1141
        if (!this->fill_range(offset, INITIAL_REQUEST_SIZE)) {
×
1142
            retval.li_file_range.fr_offset = offset;
×
1143
            retval.li_file_range.fr_size = 0;
×
1144
            if (this->is_pipe()) {
×
1145
                retval.li_partial = !this->is_pipe_closed();
×
1146
            } else {
1147
                retval.li_partial = true;
×
1148
            }
1149
            return Ok(retval);
×
1150
        }
1151
    }
1152
    if (prev_line.next_offset() == 0) {
20,423✔
1153
        auto is_utf_res = is_utf8(string_fragment::from_bytes(
3,314✔
1154
            this->lb_buffer.begin(), this->lb_buffer.size()));
1,657✔
1155
        this->lb_is_utf8 = is_utf_res.is_valid();
1,657✔
1156
        if (!this->lb_is_utf8) {
1,657✔
1157
            log_warning("fd(%d): input is not utf8 -- %s",
12✔
1158
                        this->lb_fd.get(),
1159
                        is_utf_res.usr_message);
1160
        }
1161
    }
1162
    while (!done) {
40,771✔
1163
        auto old_retval_size = retval.li_file_range.fr_size;
20,433✔
1164
        const char* lf = nullptr;
20,433✔
1165

1166
        /* Find the data in the cache and */
1167
        line_start = this->get_range(offset, retval.li_file_range.fr_size);
20,433✔
1168
        /* ... look for the end-of-line or end-of-file. */
1169
        ssize_t utf8_end = -1;
20,433✔
1170

1171
        if (!retval.li_utf8_scan_result.is_valid()) {
20,433✔
UNCOV
1172
            retval.li_utf8_scan_result = {};
×
1173
        }
1174
        auto found_in_cache = false;
20,433✔
1175
        auto has_ansi = false;
20,433✔
1176
        auto valid_utf8 = true;
20,433✔
1177
        if (!this->lb_line_starts.empty()) {
20,433✔
1178
            auto buffer_offset = offset - this->lb_file_offset;
3✔
1179

1180
            if (this->lb_next_buffer_offset == buffer_offset) {
3✔
1181
                require(this->lb_next_line_start_index
3✔
1182
                        < this->lb_line_starts.size());
1183
                auto start_iter = this->lb_line_starts.begin()
6✔
1184
                    + this->lb_next_line_start_index;
3✔
1185
                auto next_line_iter = start_iter + 1;
3✔
1186
                if (next_line_iter != this->lb_line_starts.end()) {
3✔
1187
                    utf8_end = *next_line_iter - 1 - *start_iter;
3✔
1188
                    found_in_cache = true;
3✔
1189
                    lf = line_start + utf8_end;
3✔
1190
                    has_ansi = this->lb_line_has_ansi
1191
                                   [this->lb_next_line_start_index];
3✔
1192
                    valid_utf8
1193
                        = this->lb_line_is_utf[this->lb_next_line_start_index];
3✔
1194

1195
                    // log_debug("hit cache");
1196
                    this->lb_next_buffer_offset = *next_line_iter;
3✔
1197
                    this->lb_next_line_start_index += 1;
3✔
1198
                } else {
1199
                    // log_debug("no next iter");
1200
                }
1201
            } else {
1202
                auto start_iter = std::lower_bound(this->lb_line_starts.begin(),
×
1203
                                                   this->lb_line_starts.end(),
1204
                                                   buffer_offset);
1205
                if (start_iter != this->lb_line_starts.end()) {
×
1206
                    auto next_line_iter = start_iter + 1;
×
1207

1208
                    // log_debug("found offset %d %d", buffer_offset,
1209
                    // *start_iter);
1210
                    if (next_line_iter != this->lb_line_starts.end()) {
×
1211
                        auto start_index = std::distance(
×
1212
                            this->lb_line_starts.begin(), start_iter);
1213
                        utf8_end = *next_line_iter - 1 - *start_iter;
×
1214
                        found_in_cache = true;
×
1215
                        lf = line_start + utf8_end;
×
1216
                        has_ansi = this->lb_line_has_ansi[start_index];
×
1217
                        valid_utf8 = this->lb_line_is_utf[start_index];
×
1218

1219
                        this->lb_next_line_start_index = start_index + 1;
×
1220
                        this->lb_next_buffer_offset = *next_line_iter;
×
1221
                    } else {
1222
                        // log_debug("no next iter");
1223
                    }
1224
                } else {
1225
                    // log_debug("no buffer_offset found");
1226
                }
1227
            }
1228
        }
1229

1230
        if (found_in_cache && valid_utf8) {
20,433✔
1231
            retval.li_utf8_scan_result.usr_has_ansi = has_ansi;
3✔
1232
        } else {
1233
            auto frag = string_fragment::from_bytes(
20,430✔
1234
                line_start, retval.li_file_range.fr_size);
20,430✔
1235
            auto scan_res = is_utf8(frag, '\n');
20,430✔
1236
            lf = scan_res.remaining_ptr();
20,430✔
1237
            if (lf != nullptr) {
20,430✔
1238
                lf -= 1;
19,716✔
1239
            }
1240
            retval.li_utf8_scan_result = scan_res;
20,430✔
1241
            if (!scan_res.is_valid()) {
20,430✔
1242
                log_warning("fd(%d): line is not utf8 -- %lld",
60✔
1243
                            this->lb_fd.get(),
1244
                            retval.li_file_range.fr_offset);
1245
            }
1246
        }
1247

1248
        auto got_new_data = old_retval_size != retval.li_file_range.fr_size;
20,433✔
1249
#if 0
1250
        log_debug("load next loop %p reqsize %d lsize %d",
1251
                  lf,
1252
                  request_size,
1253
                  retval.li_file_range.fr_size);
1254
#endif
1255
        if (lf != nullptr
20,433✔
1256
            || (retval.li_file_range.fr_size >= MAX_LINE_BUFFER_SIZE)
714✔
1257
            || (request_size >= MAX_LINE_BUFFER_SIZE)
714✔
1258
            || (!got_new_data
21,827✔
1259
                && (!this->is_pipe() || request_size > DEFAULT_INCREMENT)))
680✔
1260
        {
1261
            if ((lf != nullptr)
20,336✔
1262
                && ((size_t) (lf - line_start) >= MAX_LINE_BUFFER_SIZE - 1))
19,719✔
1263
            {
1264
                lf = nullptr;
×
1265
            }
1266
            if (lf != nullptr) {
20,336✔
1267
                retval.li_partial = false;
19,719✔
1268
                retval.li_file_range.fr_size = lf - line_start;
19,719✔
1269
                // delim
1270
                retval.li_file_range.fr_size += 1;
19,719✔
1271
                if (offset >= this->lb_last_line_offset) {
19,719✔
1272
                    this->lb_last_line_offset
1273
                        = offset + retval.li_file_range.fr_size;
18,932✔
1274
                }
1275
            } else {
1276
                if (retval.li_file_range.fr_size >= MAX_LINE_BUFFER_SIZE) {
617✔
1277
                    log_warning("Line exceeded max size: offset=%zd", offset);
×
1278
                    retval.li_file_range.fr_size = MAX_LINE_BUFFER_SIZE - 1;
×
1279
                    retval.li_partial = false;
×
1280
                } else {
1281
                    retval.li_partial = true;
617✔
1282
                }
1283
                this->ensure_available(offset, retval.li_file_range.fr_size);
617✔
1284

1285
                if (retval.li_file_range.fr_size >= MAX_LINE_BUFFER_SIZE) {
617✔
1286
                    retval.li_file_range.fr_size = MAX_LINE_BUFFER_SIZE - 1;
×
1287
                }
1288
                if (retval.li_partial) {
617✔
1289
                    /*
1290
                     * Since no delimiter was seen, we need to remember the
1291
                     * offset of the last line in the file so we don't
1292
                     * mistakenly return two partial lines to the caller.
1293
                     *
1294
                     *   1. read_line() - returns partial line
1295
                     *   2. file is written
1296
                     *   3. read_line() - returns the middle of partial line.
1297
                     */
1298
                    this->lb_last_line_offset = offset;
617✔
1299
                } else if (offset >= this->lb_last_line_offset) {
×
1300
                    this->lb_last_line_offset
1301
                        = offset + retval.li_file_range.fr_size;
×
1302
                }
1303
            }
1304

1305
            offset += retval.li_file_range.fr_size;
20,336✔
1306

1307
            done = true;
20,336✔
1308
        } else if (!retval.li_utf8_scan_result.is_valid()) {
97✔
1309
            retval.li_partial = true;
2✔
1310
            done = true;
2✔
1311
        } else {
1312
            if (!this->is_pipe() || !this->is_pipe_closed()) {
95✔
1313
                retval.li_partial = true;
32✔
1314
            }
1315
            request_size
1316
                = std::min<ssize_t>(this->lb_buffer.size() + DEFAULT_INCREMENT,
95✔
1317
                                    MAX_LINE_BUFFER_SIZE);
1318
        }
1319

1320
        if (!done
40,866✔
1321
            && !this->fill_range(
20,528✔
1322
                offset,
1323
                std::max(request_size, (ssize_t) this->lb_buffer.available())))
20,528✔
1324
        {
1325
            break;
85✔
1326
        }
1327
    }
1328

1329
    ensure(retval.li_file_range.fr_size <= (ssize_t) this->lb_buffer.size());
20,423✔
1330
    ensure(this->invariant());
20,423✔
1331
#if 0
1332
    log_debug("got line part %d %d",
1333
              retval.li_file_range.fr_offset,
1334
              (int) retval.li_partial);
1335
#endif
1336

1337
    retval.li_file_range.fr_metadata.m_has_ansi
1338
        = retval.li_utf8_scan_result.usr_has_ansi;
20,423✔
1339
    retval.li_file_range.fr_metadata.m_valid_utf
1340
        = retval.li_utf8_scan_result.is_valid();
20,423✔
1341

1342
    if (this->lb_line_metadata) {
20,423✔
1343
        auto sv = std::string_view{
1344
            line_start,
1345
            (size_t) retval.li_file_range.fr_size,
462✔
1346
        };
462✔
1347

1348
        auto scan_res = scn::scan<int64_t, int64_t, char>(sv, "{}.{}:{};");
462✔
1349
        if (scan_res) {
462✔
1350
            auto& [tv_sec, tv_usec, level] = scan_res->values();
381✔
1351
            retval.li_timestamp.tv_sec = tv_sec;
381✔
1352
            retval.li_timestamp.tv_usec = tv_usec;
381✔
1353
            retval.li_timestamp.tv_sec
1354
                = lnav::to_local_time(date::sys_seconds{std::chrono::seconds{
381✔
1355
                                          retval.li_timestamp.tv_sec}})
381✔
1356
                      .time_since_epoch()
381✔
1357
                      .count();
381✔
1358
            retval.li_level = abbrev2level(&level, 1);
381✔
1359
        }
1360
    }
1361

1362
    return Ok(retval);
20,423✔
1363
}
1364

1365
Result<shared_buffer_ref, std::string>
1366
line_buffer::read_range(file_range fr, scan_direction dir)
91,997✔
1367
{
1368
    shared_buffer_ref retval;
91,997✔
1369
    const char* line_start;
1370
    file_ssize_t avail;
1371

1372
#if 0
1373
    if (this->lb_last_line_offset != -1
1374
        && fr.fr_offset > this->lb_last_line_offset)
1375
    {
1376
        /*
1377
         * Don't return anything past the last known line.  The caller needs
1378
         * to try reading at the offset of the last line again.
1379
         */
1380
        return Err(
1381
            fmt::format(FMT_STRING("attempt to read past the known end of the "
1382
                                   "file: read-offset={}; last_line_offset={}"),
1383
                        fr.fr_offset,
1384
                        this->lb_last_line_offset));
1385
    }
1386
#endif
1387

1388
    if (!(this->in_range(fr.fr_offset)
183,444✔
1389
          && this->in_range(fr.fr_offset + fr.fr_size - 1)))
91,447✔
1390
    {
1391
        if (!this->fill_range(fr.fr_offset, fr.fr_size, dir)) {
610✔
1392
            return Err(std::string("unable to read file"));
×
1393
        }
1394
    }
1395
    line_start = this->get_range(fr.fr_offset, avail);
91,997✔
1396

1397
    if (fr.fr_size > avail) {
91,997✔
1398
        return Err(fmt::format(
×
1399
            FMT_STRING("short-read (need: {}; avail: {})"), fr.fr_size, avail));
×
1400
    }
1401
    if (this->lb_line_metadata) {
91,997✔
1402
        auto new_start
1403
            = static_cast<const char*>(memchr(line_start, ';', fr.fr_size));
2,095✔
1404
        if (new_start) {
2,095✔
1405
            auto offset = new_start - line_start + 1;
1,850✔
1406
            line_start += offset;
1,850✔
1407
            fr.fr_size -= offset;
1,850✔
1408
        }
1409
    }
1410
    retval.share(this->lb_share_manager, line_start, fr.fr_size);
91,997✔
1411
    retval.get_metadata() = fr.fr_metadata;
91,997✔
1412

1413
    return Ok(std::move(retval));
91,997✔
1414
}
91,997✔
1415

1416
Result<auto_buffer, std::string>
1417
line_buffer::peek_range(file_range fr)
2✔
1418
{
1419
    static const std::string SHORT_READ_MSG = "short read";
4✔
1420

1421
    require(this->lb_seekable);
2✔
1422

1423
    auto buf = auto_buffer::alloc(fr.fr_size);
2✔
1424

1425
    if (this->lb_cached_fd) {
2✔
1426
        auto rc = pread(this->lb_cached_fd.value().get(),
×
1427
                        buf.data(),
×
1428
                        fr.fr_size,
×
1429
                        fr.fr_offset);
1430
        if (rc == -1) {
×
1431
            return Err(lnav::from_errno().message());
×
1432
        }
1433
        if (rc != fr.fr_size) {
×
1434
            return Err(SHORT_READ_MSG);
×
1435
        }
1436
        buf.resize(rc);
×
1437

1438
        return Ok(std::move(buf));
×
1439
    }
1440

1441
    if (this->lb_compressed) {
2✔
1442
        safe::WriteAccess<safe_gz_indexed> gi(this->lb_gz_file);
×
1443

1444
        if (*gi) {
×
1445
            auto rc = gi->read(buf.data(), fr.fr_offset, fr.fr_size);
×
1446

1447
            if (rc == -1) {
×
1448
                return Err(lnav::from_errno().message());
×
1449
            }
1450
            if (rc != fr.fr_size) {
×
1451
                return Err(SHORT_READ_MSG);
×
1452
            }
1453
            buf.resize(rc);
×
1454
            return Ok(std::move(buf));
×
1455
        }
1456
        if (this->lb_bz_file) {
×
1457
            lock_hack::guard guard;
×
1458
            char scratch[32 * 1024];
1459
            BZFILE* bz_file;
1460
            file_off_t seek_to;
1461
            int bzfd;
1462

1463
            /*
1464
             * Unfortunately, there is no bzseek, so we need to reopen the
1465
             * file every time we want to do a read.
1466
             */
1467
            bzfd = dup(this->lb_fd);
×
1468
            if (lseek(this->lb_fd, 0, SEEK_SET) < 0) {
×
1469
                close(bzfd);
×
1470
                throw error(errno);
×
1471
            }
1472
            if ((bz_file = BZ2_bzdopen(bzfd, "r")) == nullptr) {
×
1473
                close(bzfd);
×
1474
                if (errno == 0) {
×
1475
                    throw std::bad_alloc();
×
1476
                } else {
1477
                    throw error(errno);
×
1478
                }
1479
            }
1480

1481
            seek_to = fr.fr_offset;
×
1482
            while (seek_to > 0) {
×
1483
                int count;
1484

1485
                count = BZ2_bzread(bz_file,
×
1486
                                   scratch,
1487
                                   std::min((size_t) seek_to, sizeof(scratch)));
×
1488
                seek_to -= count;
×
1489
            }
1490
            auto rc = BZ2_bzread(bz_file, buf.data(), fr.fr_size);
×
1491
            this->lb_compressed_offset = 0;
×
1492
            BZ2_bzclose(bz_file);
×
1493

1494
            if (rc == -1) {
×
1495
                return Err(lnav::from_errno().message());
×
1496
            }
1497
            if (rc != fr.fr_size) {
×
1498
                return Err(SHORT_READ_MSG);
×
1499
            }
1500
            buf.resize(rc);
×
1501
            return Ok(std::move(buf));
×
1502
        }
1503
    }
1504

1505
    auto rc = pread(this->lb_fd, buf.data(), fr.fr_size, fr.fr_offset);
2✔
1506
    if (rc == -1) {
2✔
1507
        return Err(lnav::from_errno().message());
×
1508
    }
1509
    if (rc != fr.fr_size) {
2✔
1510
        return Err(SHORT_READ_MSG);
×
1511
    }
1512
    buf.resize(rc);
2✔
1513
    return Ok(std::move(buf));
2✔
1514
}
2✔
1515

1516
file_range
1517
line_buffer::get_available()
638✔
1518
{
1519
    return {
1520
        this->lb_file_offset,
638✔
1521
        static_cast<file_ssize_t>(this->lb_buffer.size()),
1,276✔
1522
    };
638✔
1523
}
1524

1525
line_buffer::gz_indexed::indexDict::indexDict(const z_stream& s,
×
1526
                                              const file_size_t size)
×
1527
{
1528
    assert((s.data_type & GZ_END_OF_BLOCK_MASK));
1529
    assert(!(s.data_type & GZ_END_OF_FILE_MASK));
1530
    assert(size >= s.avail_out + GZ_WINSIZE);
1531
    this->bits = s.data_type & GZ_BORROW_BITS_MASK;
×
1532
    this->in = s.total_in;
×
1533
    this->out = s.total_out;
×
1534
    auto last_byte_in = s.next_in[-1];
×
1535
    this->in_bits = last_byte_in >> (8 - this->bits);
×
1536
    // Copy the last 32k uncompressed data (sliding window) to our
1537
    // index
1538
    memcpy(this->index, s.next_out - GZ_WINSIZE, GZ_WINSIZE);
×
1539
}
1540

1541
int
1542
line_buffer::gz_indexed::indexDict::apply(z_streamp s)
×
1543
{
1544
    s->zalloc = Z_NULL;
×
1545
    s->zfree = Z_NULL;
×
1546
    s->opaque = Z_NULL;
×
1547
    s->avail_in = 0;
×
1548
    s->next_in = Z_NULL;
×
1549
    auto ret = inflateInit2(s, GZ_RAW_MODE);
×
1550
    if (ret != Z_OK) {
×
1551
        return ret;
×
1552
    }
1553
    if (this->bits) {
×
1554
        inflatePrime(s, this->bits, this->in_bits);
×
1555
    }
1556
    s->total_in = this->in;
×
1557
    s->total_out = this->out;
×
1558
    inflateSetDictionary(s, this->index, GZ_WINSIZE);
×
1559
    return ret;
×
1560
}
1561

1562
bool
1563
line_buffer::is_likely_to_flush(file_range prev_line)
×
1564
{
1565
    auto avail = this->get_available();
×
1566

1567
    if (prev_line.fr_offset < avail.fr_offset) {
×
1568
        return true;
×
1569
    }
1570
    auto prev_line_end = prev_line.fr_offset + prev_line.fr_size;
×
1571
    auto avail_end = avail.fr_offset + avail.fr_size;
×
1572
    if (avail_end < prev_line_end) {
×
1573
        return true;
×
1574
    }
1575
    auto remaining = avail_end - prev_line_end;
×
1576
    return remaining < INITIAL_REQUEST_SIZE;
×
1577
}
1578

1579
void
1580
line_buffer::quiesce()
42✔
1581
{
1582
    if (this->lb_loader_future.valid()) {
42✔
1583
        this->lb_loader_future.wait();
×
1584
    }
1585
}
42✔
1586

1587
static std::filesystem::path
1588
line_buffer_cache_path()
612✔
1589
{
1590
    return lnav::paths::workdir() / "buffer-cache";
1,224✔
1591
}
1592

1593
void
1594
line_buffer::enable_cache()
230✔
1595
{
1596
    if (!this->lb_compressed || this->lb_cached_fd) {
230✔
1597
        log_info("%d: skipping cache request (compressed=%d already-cached=%d)",
230✔
1598
                 this->lb_fd.get(),
1599
                 this->lb_compressed,
1600
                 (bool) this->lb_cached_fd);
1601
        return;
230✔
1602
    }
1603

1604
    struct stat st;
1605

1606
    if (fstat(this->lb_fd, &st) == -1) {
×
1607
        log_error("failed to fstat(%d) - %d", this->lb_fd.get(), errno);
×
1608
        return;
×
1609
    }
1610

1611
    auto cached_base_name = hasher()
×
1612
                                .update(st.st_dev)
×
1613
                                .update(st.st_ino)
×
1614
                                .update(st.st_size)
×
1615
                                .to_string();
×
1616
    auto cache_dir = line_buffer_cache_path() / cached_base_name.substr(0, 2);
×
1617

1618
    std::filesystem::create_directories(cache_dir);
×
1619

1620
    auto cached_file_name = fmt::format(FMT_STRING("{}.bin"), cached_base_name);
×
1621
    auto cached_file_path = cache_dir / cached_file_name;
×
1622
    auto cached_done_path
1623
        = cache_dir / fmt::format(FMT_STRING("{}.done"), cached_base_name);
×
1624

1625
    log_info(
×
1626
        "%d:cache file path: %s", this->lb_fd.get(), cached_file_path.c_str());
1627

1628
    auto fl = lnav::filesystem::file_lock(cached_file_path);
×
1629
    auto guard = lnav::filesystem::file_lock::guard(&fl);
×
1630

1631
    if (std::filesystem::exists(cached_done_path)) {
×
1632
        log_info("%d:using existing cache file", this->lb_fd.get());
×
1633
        auto open_res = lnav::filesystem::open_file(cached_file_path, O_RDWR);
×
1634
        if (open_res.isOk()) {
×
1635
            this->lb_cached_fd = open_res.unwrap();
×
1636
            return;
×
1637
        }
1638
        std::filesystem::remove(cached_done_path);
×
1639
    }
1640

1641
    auto create_res = lnav::filesystem::create_file(
1642
        cached_file_path, O_RDWR | O_TRUNC, 0600);
×
1643
    if (create_res.isErr()) {
×
1644
        log_error("failed to create cache file: %s -- %s",
×
1645
                  cached_file_path.c_str(),
1646
                  create_res.unwrapErr().c_str());
1647
        return;
×
1648
    }
1649

1650
    auto write_fd = create_res.unwrap();
×
1651
    auto done = false;
×
1652

1653
    static constexpr ssize_t FILL_LENGTH = 1024 * 1024;
1654
    auto off = file_off_t{0};
×
1655
    while (!done) {
×
1656
        log_debug("%d: caching file content at %lld", this->lb_fd.get(), off);
×
1657
        if (!this->fill_range(off, FILL_LENGTH)) {
×
1658
            log_debug("%d: caching finished", this->lb_fd.get());
×
1659
            done = true;
×
1660
        } else {
1661
            file_ssize_t avail;
1662

1663
            const auto* data = this->get_range(off, avail);
×
1664
            auto rc = write(write_fd, data, avail);
×
1665
            if (rc != avail) {
×
1666
                log_error("%d: short write!", this->lb_fd.get());
×
1667
                return;
×
1668
            }
1669

1670
            off += avail;
×
1671
        }
1672
    }
1673

1674
    lnav::filesystem::create_file(cached_done_path, O_WRONLY, 0600);
×
1675

1676
    this->lb_cached_fd = std::move(write_fd);
×
1677
}
1678

1679
std::future<void>
1680
line_buffer::cleanup_cache()
612✔
1681
{
1682
    return std::async(
1683
        std::launch::async, +[]() {
612✔
1684
            auto now = std::filesystem::file_time_type::clock::now();
612✔
1685
            auto cache_path = line_buffer_cache_path();
612✔
1686
            std::vector<std::filesystem::path> to_remove;
612✔
1687
            std::error_code ec;
612✔
1688

1689
            for (const auto& cache_subdir :
612✔
1690
                 std::filesystem::directory_iterator(cache_path, ec))
612✔
1691
            {
1692
                for (const auto& entry :
×
1693
                     std::filesystem::directory_iterator(cache_subdir, ec))
×
1694
                {
1695
                    auto mtime = std::filesystem::last_write_time(entry.path());
×
1696
                    auto exp_time = mtime + 1h;
×
1697
                    if (now < exp_time) {
×
1698
                        continue;
×
1699
                    }
1700

1701
                    to_remove.emplace_back(entry.path());
×
1702
                }
1703
            }
612✔
1704

1705
            for (auto& entry : to_remove) {
612✔
1706
                log_debug("removing compressed file cache: %s", entry.c_str());
×
1707
                std::filesystem::remove_all(entry, ec);
×
1708
            }
1709
        });
1,836✔
1710
}
1711

1712
void
1713
line_buffer::send_initial_load()
703✔
1714
{
1715
    if (!this->lb_seekable) {
703✔
1716
        log_warning("file is not seekable, not doing preload");
×
1717
        return;
×
1718
    }
1719

1720
    if (this->lb_loader_future.valid()) {
703✔
1721
        log_warning("preload is already active");
×
1722
        return;
×
1723
    }
1724

1725
    log_debug("sending initial load");
703✔
1726
    if (!this->lb_alt_buffer) {
703✔
1727
        // log_debug("allocating new buffer!");
1728
        this->lb_alt_buffer = auto_buffer::alloc(this->lb_buffer.capacity());
703✔
1729
    }
1730
    this->lb_loader_file_offset = 0;
703✔
1731
    auto prom = std::make_shared<std::promise<bool>>();
703✔
1732
    this->lb_loader_future = prom->get_future();
703✔
1733
    this->lb_stats.s_requested_preloads += 1;
703✔
1734
    isc::to<io_looper&, io_looper_tag>().send(
703✔
1735
        [this, prom](auto& ioloop) mutable {
1,406✔
1736
            prom->set_value(this->load_next_buffer());
703✔
1737
        });
703✔
1738
}
703✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc