• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tstack / lnav / 19243988760-2657

10 Nov 2025 07:37PM UTC coverage: 68.747% (-0.3%) from 69.055%
19243988760-2657

push

github

tstack
[logfile] lay groundwork for bounding log file times

Related to #1188

308 of 655 new or added lines in 35 files covered. (47.02%)

30 existing lines in 7 files now uncovered.

50645 of 73669 relevant lines covered (68.75%)

430651.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.92
/src/line_buffer.cc
1
/**
2
 * Copyright (c) 2007-2012, Timothy Stack
3
 *
4
 * All rights reserved.
5
 *
6
 * Redistribution and use in source and binary forms, with or without
7
 * modification, are permitted provided that the following conditions are met:
8
 *
9
 * * Redistributions of source code must retain the above copyright notice, this
10
 * list of conditions and the following disclaimer.
11
 * * Redistributions in binary form must reproduce the above copyright notice,
12
 * this list of conditions and the following disclaimer in the documentation
13
 * and/or other materials provided with the distribution.
14
 * * Neither the name of Timothy Stack nor the names of its contributors
15
 * may be used to endorse or promote products derived from this software
16
 * without specific prior written permission.
17
 *
18
 * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ''AS IS'' AND ANY
19
 * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
20
 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
21
 * DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE FOR ANY
22
 * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
23
 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
24
 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
25
 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
26
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
27
 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28
 *
29
 * @file line_buffer.cc
30
 */
31

32
#include <errno.h>
33
#include <fcntl.h>
34
#include <stdio.h>
35
#include <string.h>
36
#include <unistd.h>
37

38
#include "config.h"
39

40
#ifdef HAVE_BZLIB_H
41
#    include <bzlib.h>
42
#endif
43

44
#include <algorithm>
45
#include <set>
46

47
#include "base/auto_mem.hh"
48
#include "base/auto_pid.hh"
49
#include "base/fs_util.hh"
50
#include "base/injector.bind.hh"
51
#include "base/injector.hh"
52
#include "base/is_utf8.hh"
53
#include "base/isc.hh"
54
#include "base/math_util.hh"
55
#include "base/paths.hh"
56
#include "fmtlib/fmt/format.h"
57
#include "hasher.hh"
58
#include "line_buffer.hh"
59
#include "piper.header.hh"
60
#include "scn/scan.h"
61
#include "yajlpp/yajlpp_def.hh"
62

63
using namespace std::chrono_literals;
64

65
static const ssize_t INITIAL_REQUEST_SIZE = 16 * 1024;
66
static const ssize_t DEFAULT_INCREMENT = 128 * 1024;
67
static const ssize_t INITIAL_COMPRESSED_BUFFER_SIZE = 5 * 1024 * 1024;
68
static const ssize_t MAX_COMPRESSED_BUFFER_SIZE = 32 * 1024 * 1024;
69

70
const ssize_t line_buffer::DEFAULT_LINE_BUFFER_SIZE = 256 * 1024;
71
const ssize_t line_buffer::MAX_LINE_BUFFER_SIZE
72
    = 4 * 4 * line_buffer::DEFAULT_LINE_BUFFER_SIZE;
73

74
class io_looper : public isc::service<io_looper> {};
75

76
struct io_looper_tag {};
77

78
static auto bound_io = injector::bind_multiple<isc::service_base>()
79
                           .add_singleton<io_looper, io_looper_tag>();
80

81
namespace injector {
82
template<>
83
void
84
force_linking(io_looper_tag anno)
641✔
85
{
86
}
641✔
87
}  // namespace injector
88

89
/*
90
 * XXX REMOVE ME
91
 *
92
 * The stock bzipped file code does not use pread, so we need to use a lock to
93
 * get exclusive access to the file.  In the future, we should just rewrite
94
 * the bzipped file code to use pread.
95
 */
96
class lock_hack {
97
public:
98
    class guard {
99
    public:
100
        guard() : g_lock(singleton()) { this->g_lock.lock(); }
2✔
101

102
        ~guard() { this->g_lock.unlock(); }
2✔
103

104
    private:
105
        lock_hack& g_lock;
106
    };
107

108
    static lock_hack& singleton()
2✔
109
    {
110
        static lock_hack retval;
2✔
111

112
        return retval;
2✔
113
    }
114

115
    void lock() { lockf(this->lh_fd, F_LOCK, 0); }
2✔
116

117
    void unlock() { lockf(this->lh_fd, F_ULOCK, 0); }
2✔
118

119
private:
120
    lock_hack()
1✔
121
    {
1✔
122
        char lockname[64];
123

124
        snprintf(lockname, sizeof(lockname), "/tmp/lnav.%d.lck", getpid());
1✔
125
        this->lh_fd = open(lockname, O_CREAT | O_RDWR, 0600);
1✔
126
        log_perror(fcntl(this->lh_fd, F_SETFD, FD_CLOEXEC));
1✔
127
        unlink(lockname);
1✔
128
    }
1✔
129

130
    auto_fd lh_fd;
131
};
132
/* XXX END */
133

134
#define Z_BUFSIZE      65536U
135
#define SYNCPOINT_SIZE (1024 * 1024)
136
line_buffer::gz_indexed::gz_indexed()
1,429✔
137
{
138
    if ((this->inbuf = auto_mem<Bytef>::malloc(Z_BUFSIZE)) == nullptr) {
1,429✔
139
        throw std::bad_alloc();
×
140
    }
141
}
1,429✔
142

143
void
144
line_buffer::gz_indexed::close()
1,443✔
145
{
146
    // Release old stream, if we were open
147
    if (*this) {
1,443✔
148
        inflateEnd(&this->strm);
7✔
149
        ::close(this->gz_fd);
7✔
150
        this->syncpoints.clear();
7✔
151
        this->gz_fd = -1;
7✔
152
    }
153
}
1,443✔
154

155
void
156
line_buffer::gz_indexed::init_stream()
25✔
157
{
158
    if (*this) {
25✔
159
        inflateEnd(&this->strm);
18✔
160
    }
161

162
    // initialize inflate struct
163
    int rc = inflateInit2(&this->strm, GZ_HEADER_MODE);
25✔
164
    this->strm.avail_in = 0;
25✔
165
    if (rc != Z_OK) {
25✔
166
        throw(rc);  // FIXME: exception wrapper
×
167
    }
168
}
25✔
169

170
void
171
line_buffer::gz_indexed::continue_stream()
8✔
172
{
173
    // Save our position and output buffer
174
    auto total_in = this->strm.total_in;
8✔
175
    auto total_out = this->strm.total_out;
8✔
176
    auto avail_out = this->strm.avail_out;
8✔
177
    auto next_out = this->strm.next_out;
8✔
178

179
    init_stream();
8✔
180

181
    // Restore position and output buffer
182
    this->strm.total_in = total_in;
8✔
183
    this->strm.total_out = total_out;
8✔
184
    this->strm.avail_out = avail_out;
8✔
185
    this->strm.next_out = next_out;
8✔
186
}
8✔
187

188
void
189
line_buffer::gz_indexed::open(int fd, lnav::gzip::header& hd)
7✔
190
{
191
    this->close();
7✔
192
    this->init_stream();
7✔
193
    this->gz_fd = fd;
7✔
194

195
    unsigned char name[1024];
196
    unsigned char comment[4096];
197

198
    name[0] = '\0';
7✔
199
    comment[0] = '\0';
7✔
200

201
    gz_header gz_hd;
202
    memset(&gz_hd, 0, sizeof(gz_hd));
7✔
203
    gz_hd.name = name;
7✔
204
    gz_hd.name_max = sizeof(name);
7✔
205
    gz_hd.comment = comment;
7✔
206
    gz_hd.comm_max = sizeof(comment);
7✔
207

208
    Bytef inbuf[8192];
209
    Bytef outbuf[8192];
210
    this->strm.next_out = outbuf;
7✔
211
    this->strm.total_out = 0;
7✔
212
    this->strm.avail_out = sizeof(outbuf);
7✔
213
    this->strm.next_in = inbuf;
7✔
214
    this->strm.total_in = 0;
7✔
215

216
    if (inflateGetHeader(&this->strm, &gz_hd) == Z_OK) {
7✔
217
        auto rc = pread(fd, inbuf, sizeof(inbuf), 0);
7✔
218
        if (rc >= 0) {
7✔
219
            this->strm.avail_in = rc;
7✔
220

221
            inflate(&this->strm, Z_BLOCK);
7✔
222
            inflateEnd(&this->strm);
7✔
223
            this->strm.next_out = Z_NULL;
7✔
224
            this->strm.next_in = Z_NULL;
7✔
225
            this->strm.next_in = Z_NULL;
7✔
226
            this->strm.total_in = 0;
7✔
227
            this->strm.avail_in = 0;
7✔
228
            this->init_stream();
7✔
229

230
            switch (gz_hd.done) {
7✔
231
                case 0:
×
232
                    log_debug("%d: no gzip header data", fd);
×
233
                    break;
×
234
                case 1:
7✔
235
                    hd.h_mtime.tv_sec = gz_hd.time;
7✔
236
                    name[sizeof(name) - 1] = '\0';
7✔
237
                    comment[sizeof(comment) - 1] = '\0';
7✔
238
                    hd.h_name = std::string((char*) name);
14✔
239
                    hd.h_comment = std::string((char*) comment);
7✔
240
                    log_info(
7✔
241
                        "%d: read gzip header (mtime=%ld; name='%s'; "
242
                        "comment='%s'; crc=%x)",
243
                        fd,
244
                        hd.h_mtime.tv_sec,
245
                        hd.h_name.c_str(),
246
                        hd.h_comment.c_str(),
247
                        gz_hd.hcrc);
248
                    break;
7✔
249
                default:
×
250
                    log_error("%d: failed to read gzip header data", fd);
×
251
                    break;
×
252
            }
253
        } else {
254
            log_error("%d: failed to read gzip header from file: %s",
×
255
                      fd,
256
                      strerror(errno));
257
        }
258
    } else {
259
        log_error("%d: unable to get gzip header", fd);
×
260
    }
261
}
7✔
262

263
int
264
line_buffer::gz_indexed::stream_data(void* buf, size_t size)
13✔
265
{
266
    this->strm.avail_out = size;
13✔
267
    this->strm.next_out = (unsigned char*) buf;
13✔
268

269
    size_t last = this->syncpoints.empty() ? 0 : this->syncpoints.back().in;
13✔
270
    while (this->strm.avail_out) {
39✔
271
        if (!this->strm.avail_in) {
39✔
272
            int rc = ::pread(
21✔
273
                this->gz_fd, &this->inbuf[0], Z_BUFSIZE, this->strm.total_in);
21✔
274
            if (rc < 0) {
21✔
275
                return rc;
×
276
            }
277
            this->strm.next_in = this->inbuf;
21✔
278
            this->strm.avail_in = rc;
21✔
279
        }
280
        if (this->strm.avail_in) {
39✔
281
            int flush = last > this->strm.total_in ? Z_SYNC_FLUSH : Z_BLOCK;
28✔
282
            auto err = inflate(&this->strm, flush);
28✔
283
            if (err == Z_STREAM_END) {
28✔
284
                // Reached end of stream; re-init for a possible subsequent
285
                // stream
286
                continue_stream();
8✔
287
            } else if (err != Z_OK) {
20✔
288
                log_error(" inflate-error at offset %lu: %d  %s",
2✔
289
                          this->strm.total_in,
290
                          (int) err,
291
                          this->strm.msg ? this->strm.msg : "");
292
                this->parent->lb_decompress_error = fmt::format(
2✔
293
                    FMT_STRING("inflate-error at offset {}: {}  {}"),
4✔
294
                    this->strm.total_in,
2✔
295
                    err,
296
                    this->strm.msg ? this->strm.msg : "");
4✔
297
                break;
2✔
298
            }
299

300
            if (this->strm.total_in >= last + SYNCPOINT_SIZE
26✔
301
                && size > this->strm.avail_out + GZ_WINSIZE
×
302
                && (this->strm.data_type & GZ_END_OF_BLOCK_MASK)
×
303
                && !(this->strm.data_type & GZ_END_OF_FILE_MASK))
×
304
            {
305
                this->syncpoints.emplace_back(this->strm, size);
×
306
                last = this->strm.total_out;
×
307
            }
308
        } else if (this->strm.avail_out) {
11✔
309
            // Processed all the gz file data but didn't fill
310
            // the output buffer.  We're done, even though we
311
            // produced fewer bytes than requested.
312
            break;
11✔
313
        }
314
    }
315
    return size - this->strm.avail_out;
13✔
316
}
317

318
void
319
line_buffer::gz_indexed::seek(off_t offset)
3✔
320
{
321
    if ((size_t) offset == this->strm.total_out) {
3✔
322
        return;
×
323
    }
324

325
    indexDict* dict = nullptr;
3✔
326
    // Find highest syncpoint not past offset
327
    // FIXME: Make this a binary-tree search
328
    for (auto& d : this->syncpoints) {
3✔
329
        if (d.out <= offset) {
×
330
            dict = &d;
×
331
        } else {
332
            break;
×
333
        }
334
    }
335

336
    // Choose highest available syncpoint, or keep current offset if it's ok
337
    if ((size_t) offset < this->strm.total_out
3✔
338
        || (dict && this->strm.total_out < (size_t) dict->out))
×
339
    {
340
        // Release the old z_stream
341
        inflateEnd(&this->strm);
3✔
342
        if (dict) {
3✔
343
            dict->apply(&this->strm);
×
344
        } else {
345
            init_stream();
3✔
346
        }
347
    }
348

349
    // Stream from compressed file until we reach our offset
350
    unsigned char dummy[Z_BUFSIZE];
351
    while ((size_t) offset > this->strm.total_out) {
3✔
352
        size_t to_copy
353
            = std::min(static_cast<size_t>(Z_BUFSIZE),
×
354
                       static_cast<size_t>(offset - this->strm.total_out));
355
        auto bytes = stream_data(dummy, to_copy);
×
356
        if (bytes <= 0) {
×
357
            break;
×
358
        }
359
    }
360
}
361

362
int
363
line_buffer::gz_indexed::read(void* buf, size_t offset, size_t size)
13✔
364
{
365
    if (offset != this->strm.total_out) {
13✔
366
        // log_debug("doing seek!  %zu %lu", offset, this->strm.total_out);
367
        this->seek(offset);
3✔
368
    }
369

370
    int bytes = stream_data(buf, size);
13✔
371

372
    return bytes;
13✔
373
}
374

375
line_buffer::line_buffer()
1,429✔
376
{
377
    this->lb_gz_file.writeAccess()->parent = this;
1,429✔
378

379
    ensure(this->invariant());
1,429✔
380
}
1,429✔
381

382
line_buffer::~line_buffer()
1,429✔
383
{
384
    if (this->lb_loader_future.valid()) {
1,429✔
385
        this->lb_loader_future.wait();
53✔
386
    }
387

388
    auto empty_fd = auto_fd();
1,429✔
389

390
    // Make sure any shared refs take ownership of the data.
391
    this->lb_share_manager.invalidate_refs();
1,429✔
392
    this->set_fd(empty_fd);
1,429✔
393
}
1,429✔
394

395
void
396
line_buffer::set_fd(auto_fd& fd)
2,835✔
397
{
398
    file_off_t newoff = 0;
2,835✔
399

400
    {
401
        safe::WriteAccess<safe_gz_indexed> gi(this->lb_gz_file);
2,835✔
402

403
        if (*gi) {
2,835✔
404
            gi->close();
7✔
405
        }
406
    }
2,835✔
407

408
    if (this->lb_bz_file) {
2,835✔
409
        this->lb_bz_file = false;
1✔
410
    }
411

412
    if (fd != -1) {
2,835✔
413
        /* Sync the fd's offset with the object. */
414
        newoff = lseek(fd, 0, SEEK_CUR);
1,406✔
415
        if (newoff == -1) {
1,406✔
416
            if (errno != ESPIPE) {
69✔
417
                throw error(errno);
×
418
            }
419

420
            /* It's a pipe, start with a zero offset. */
421
            newoff = 0;
69✔
422
            this->lb_seekable = false;
69✔
423
        } else {
424
            char gz_id[2 + 1 + 1 + 4];
425

426
            if (pread(fd, gz_id, sizeof(gz_id), 0) == sizeof(gz_id)) {
1,337✔
427
                auto piper_hdr_opt = lnav::piper::read_header(fd, gz_id);
1,304✔
428

429
                if (piper_hdr_opt) {
1,304✔
430
                    static const intern_string_t SRC
431
                        = intern_string::lookup("piper");
210✔
432

433
                    auto meta_buf = std::move(piper_hdr_opt.value());
128✔
434

435
                    auto meta_sf = string_fragment::from_bytes(meta_buf.in(),
128✔
436
                                                               meta_buf.size());
437
                    auto meta_parse_res
438
                        = lnav::piper::header_handlers.parser_for(SRC).of(
256✔
439
                            meta_sf);
128✔
440
                    if (meta_parse_res.isErr()) {
128✔
441
                        log_error("failed to parse piper header: %s",
×
442
                                  meta_parse_res.unwrapErr()[0]
443
                                      .to_attr_line()
444
                                      .get_string()
445
                                      .c_str());
446
                        throw error(EINVAL);
×
447
                    }
448

449
                    this->lb_line_metadata = true;
128✔
450
                    this->lb_file_offset
451
                        = lnav::piper::HEADER_SIZE + meta_buf.size();
128✔
452
                    this->lb_piper_header_size = this->lb_file_offset;
128✔
453
                    this->lb_header = meta_parse_res.unwrap();
128✔
454
                } else if (gz_id[0] == '\037' && gz_id[1] == '\213') {
1,304✔
455
                    int gzfd = dup(fd);
7✔
456

457
                    log_perror(fcntl(gzfd, F_SETFD, FD_CLOEXEC));
7✔
458
                    if (lseek(fd, 0, SEEK_SET) < 0) {
7✔
459
                        close(gzfd);
×
460
                        throw error(errno);
×
461
                    }
462
                    lnav::gzip::header hdr;
7✔
463

464
                    this->lb_gz_file.writeAccess()->open(gzfd, hdr);
7✔
465
                    this->lb_compressed = true;
7✔
466
                    this->lb_file_time = hdr.h_mtime.tv_sec;
7✔
467
                    if (this->lb_file_time < 0) {
7✔
468
                        this->lb_file_time = 0;
×
469
                    }
470
                    this->lb_compressed_offset = 0;
7✔
471
                    if (!hdr.empty()) {
7✔
472
                        this->lb_header = std::move(hdr);
7✔
473
                    }
474
                    this->resize_buffer(INITIAL_COMPRESSED_BUFFER_SIZE);
7✔
475
                }
7✔
476
#ifdef HAVE_BZLIB_H
477
                else if (gz_id[0] == 'B' && gz_id[1] == 'Z')
1,169✔
478
                {
479
                    if (lseek(fd, 0, SEEK_SET) < 0) {
1✔
480
                        throw error(errno);
×
481
                    }
482
                    this->lb_bz_file = true;
1✔
483
                    this->lb_compressed = true;
1✔
484

485
                    /*
486
                     * Loading data from a bzip2 file is pretty slow, so we try
487
                     * to keep as much in memory as possible.
488
                     */
489
                    this->resize_buffer(INITIAL_COMPRESSED_BUFFER_SIZE);
1✔
490

491
                    this->lb_compressed_offset = 0;
1✔
492
                }
493
#endif
494
            }
1,304✔
495
            this->lb_seekable = true;
1,337✔
496
        }
497
    }
498
    this->lb_file_offset = newoff;
2,835✔
499
    this->lb_buffer.clear();
2,835✔
500
    this->lb_fd = std::move(fd);
2,835✔
501

502
    ensure(this->invariant());
2,835✔
503
}
2,835✔
504

505
void
506
line_buffer::resize_buffer(size_t new_max)
22✔
507
{
508
    if (((this->lb_compressed && new_max <= MAX_COMPRESSED_BUFFER_SIZE)
19✔
509
         || (!this->lb_compressed && new_max <= MAX_LINE_BUFFER_SIZE))
3✔
510
        && !this->lb_buffer.has_capacity_for(new_max))
44✔
511
    {
512
        /* Still need more space, try a realloc. */
513
        this->lb_share_manager.invalidate_refs();
17✔
514
        this->lb_buffer.expand_to(new_max);
17✔
515
    }
516
}
22✔
517

518
void
519
line_buffer::ensure_available(file_off_t start,
3,697✔
520
                              ssize_t max_length,
521
                              scan_direction dir)
522
{
523
    ssize_t prefill;
524

525
    require(this->lb_compressed || max_length <= MAX_LINE_BUFFER_SIZE);
3,697✔
526

527
    // log_debug("ensure avail %d %d", start, max_length);
528

529
    if (this->lb_file_size != -1) {
3,697✔
530
        if (start + (file_off_t) max_length > this->lb_file_size) {
98✔
531
            max_length = (this->lb_file_size - start);
85✔
532
        }
533
    }
534

535
    /*
536
     * Check to see if the start is inside the cached range or immediately
537
     * after.
538
     */
539
    if (start < this->lb_file_offset
7,394✔
540
        || start > (file_off_t) (this->lb_file_offset + this->lb_buffer.size()))
3,697✔
541
    {
542
        /*
543
         * The request is outside the cached range, need to reload the
544
         * whole thing.
545
         */
546
        this->lb_share_manager.invalidate_refs();
1,091✔
547
        prefill = 0;
1,091✔
548
        this->lb_buffer.clear();
1,091✔
549

550
        switch (dir) {
1,091✔
551
            case scan_direction::forward:
1,089✔
552
                break;
1,089✔
553
            case scan_direction::backward: {
2✔
554
                auto padded_max_length = max_length * 4;
2✔
555
                if (this->lb_buffer.has_capacity_for(padded_max_length)) {
2✔
556
                    start = std::max(
4✔
557
                        file_off_t{0},
4✔
558
                        static_cast<file_off_t>(start
2✔
559
                                                - (this->lb_buffer.capacity()
4✔
560
                                                   - padded_max_length)));
2✔
561
                }
562
                break;
2✔
563
            }
564
        }
565

566
        if (this->lb_file_size == (ssize_t) -1) {
1,091✔
567
            this->lb_file_offset = start;
1,091✔
568
        } else {
569
            require(start <= this->lb_file_size);
×
570
            /*
571
             * If the start is near the end of the file, move the offset back a
572
             * bit so we can get more of the file in the cache.
573
             */
574
            if (start + (ssize_t) this->lb_buffer.capacity()
×
575
                > this->lb_file_size)
×
576
            {
577
                this->lb_file_offset = this->lb_file_size
×
578
                    - std::min(this->lb_file_size,
×
579
                               (file_ssize_t) this->lb_buffer.capacity());
×
580
            } else {
581
                this->lb_file_offset = start;
×
582
            }
583
        }
584
    } else {
585
        /* The request is in the cached range.  Record how much extra data is in
586
         * the buffer before the requested range.
587
         */
588
        prefill = start - this->lb_file_offset;
2,606✔
589
    }
590
    require(this->lb_file_offset <= start);
3,697✔
591
    require(prefill <= (ssize_t) this->lb_buffer.size());
3,697✔
592

593
    ssize_t available
594
        = this->lb_buffer.capacity() - (start - this->lb_file_offset);
3,697✔
595
    if (max_length > available) {
3,697✔
596
        // log_debug("need more space!");
597
        /*
598
         * Need more space, move any existing data to the front of the
599
         * buffer.
600
         */
601
        this->lb_share_manager.invalidate_refs();
694✔
602

603
        this->lb_buffer.resize_by(-prefill);
694✔
604
        this->lb_file_offset += prefill;
694✔
605
        // log_debug("adjust file offset for prefill %d", this->lb_file_offset);
606
        memmove(this->lb_buffer.at(0),
1,388✔
607
                this->lb_buffer.at(prefill),
694✔
608
                this->lb_buffer.size());
609

610
        available = this->lb_buffer.capacity() - (start - this->lb_file_offset);
694✔
611
        if (max_length > available) {
694✔
612
            this->resize_buffer(roundup_size(max_length, DEFAULT_INCREMENT));
3✔
613
        }
614
    }
615
    this->lb_line_starts.clear();
3,697✔
616
    this->lb_line_is_utf.clear();
3,697✔
617
}
3,697✔
618

619
bool
620
line_buffer::load_next_buffer()
707✔
621
{
622
    // log_debug("loader here!");
623
    auto retval = false;
707✔
624
    auto start = this->lb_loader_file_offset.value();
707✔
625
    ssize_t rc = 0;
707✔
626
    safe::WriteAccess<safe_gz_indexed> gi(this->lb_gz_file);
707✔
627

628
    // log_debug("BEGIN preload read");
629
    /* ... read in the new data. */
630
    if (!this->lb_cached_fd && *gi) {
707✔
631
        if (this->lb_file_size != (ssize_t) -1 && this->in_range(start)
×
632
            && this->in_range(this->lb_file_size - 1))
4✔
633
        {
634
            rc = 0;
×
635
        } else {
636
            // log_debug("async decomp start");
637
            rc = gi->read(this->lb_alt_buffer.value().end(),
8✔
638
                          start + this->lb_alt_buffer.value().size(),
4✔
639
                          this->lb_alt_buffer.value().available());
4✔
640
            this->lb_compressed_offset = gi->get_source_offset();
4✔
641
            ensure(this->lb_compressed_offset >= 0);
4✔
642
            if (rc != -1
4✔
643
                && rc < (ssize_t) this->lb_alt_buffer.value().available()
4✔
644
                && (start + (ssize_t) this->lb_alt_buffer.value().size() + rc
12✔
645
                    > this->lb_file_size))
4✔
646
            {
647
                this->lb_file_size
648
                    = (start + this->lb_alt_buffer.value().size() + rc);
4✔
649
                log_info("fd(%d): set file size to %llu",
4✔
650
                         this->lb_fd.get(),
651
                         this->lb_file_size);
652
            }
653
#if 0
654
            log_debug("async decomp end  %d+%d:%d",
655
                      this->lb_alt_buffer->size(),
656
                      rc,
657
                      this->lb_alt_buffer->capacity());
658
#endif
659
        }
660
    }
661
#ifdef HAVE_BZLIB_H
662
    else if (!this->lb_cached_fd && this->lb_bz_file)
703✔
663
    {
664
        if (this->lb_file_size != (ssize_t) -1
2✔
665
            && (((ssize_t) start >= this->lb_file_size)
1✔
666
                || (this->in_range(start)
×
667
                    && this->in_range(this->lb_file_size - 1))))
×
668
        {
669
            rc = 0;
×
670
        } else {
671
            lock_hack::guard guard;
1✔
672
            char scratch[32 * 1024];
673
            BZFILE* bz_file;
674
            file_off_t seek_to;
675
            int bzfd;
676

677
            /*
678
             * Unfortunately, there is no bzseek, so we need to reopen the
679
             * file every time we want to do a read.
680
             */
681
            bzfd = dup(this->lb_fd);
1✔
682
            if (lseek(this->lb_fd, 0, SEEK_SET) < 0) {
1✔
683
                close(bzfd);
×
684
                throw error(errno);
×
685
            }
686
            if ((bz_file = BZ2_bzdopen(bzfd, "r")) == nullptr) {
1✔
687
                close(bzfd);
×
688
                if (errno == 0) {
×
689
                    throw std::bad_alloc();
×
690
                } else {
691
                    throw error(errno);
×
692
                }
693
            }
694

695
            seek_to = start + this->lb_alt_buffer.value().size();
1✔
696
            while (seek_to > 0) {
1✔
697
                int count;
698

699
                count = BZ2_bzread(bz_file,
×
700
                                   scratch,
701
                                   std::min((size_t) seek_to, sizeof(scratch)));
×
702
                seek_to -= count;
×
703
            }
704
            rc = BZ2_bzread(bz_file,
1✔
705
                            this->lb_alt_buffer->end(),
1✔
706
                            this->lb_alt_buffer->available());
1✔
707
            this->lb_compressed_offset = 0;
1✔
708
            BZ2_bzclose(bz_file);
1✔
709

710
            if (rc != -1
1✔
711
                && (rc < (ssize_t) (this->lb_alt_buffer.value().available()))
1✔
712
                && (start + (ssize_t) this->lb_alt_buffer.value().size() + rc
3✔
713
                    > this->lb_file_size))
1✔
714
            {
715
                this->lb_file_size
716
                    = (start + this->lb_alt_buffer.value().size() + rc);
1✔
717
                log_info("fd(%d): set file size to %llu",
1✔
718
                         this->lb_fd.get(),
719
                         this->lb_file_size);
720
            }
721
        }
1✔
722
    }
723
#endif
724
    else
725
    {
726
        rc = pread(this->lb_cached_fd ? this->lb_cached_fd.value().get()
1,404✔
727
                                      : this->lb_fd.get(),
702✔
728
                   this->lb_alt_buffer.value().end(),
702✔
729
                   this->lb_alt_buffer.value().available(),
702✔
730
                   start + this->lb_alt_buffer.value().size());
702✔
731
    }
732
    // XXX For some reason, cygwin is giving us a bogus return value when
733
    // up to the end of the file.
734
    if (rc > (ssize_t) this->lb_alt_buffer.value().available()) {
707✔
735
        rc = -1;
×
736
#ifdef ENODATA
737
        errno = ENODATA;
×
738
#else
739
        errno = EAGAIN;
740
#endif
741
    }
742
    switch (rc) {
707✔
743
        case 0:
14✔
744
            if (start < (file_off_t) this->lb_file_size) {
14✔
745
                retval = true;
×
746
            }
747
            break;
14✔
748

749
        case (ssize_t) -1:
×
750
            switch (errno) {
×
751
#ifdef ENODATA
752
                /* Cygwin seems to return this when pread reaches the end of
753
                 * the file. */
754
                case ENODATA:
×
755
#endif
756
                case EINTR:
757
                case EAGAIN:
758
                    break;
×
759

760
                default:
×
761
                    throw error(errno);
×
762
            }
763
            break;
×
764

765
        default:
693✔
766
            this->lb_alt_buffer.value().resize_by(rc);
693✔
767
            retval = true;
693✔
768
            break;
693✔
769
    }
770
    // log_debug("END preload read");
771

772
    if (start > this->lb_last_line_offset) {
707✔
773
        const auto* line_start = this->lb_alt_buffer.value().begin();
707✔
774

775
        do {
776
            auto before = line_start - this->lb_alt_buffer->begin();
18,890✔
777
            const auto remaining = this->lb_alt_buffer.value().size() - before;
18,890✔
778
            const auto frag
779
                = string_fragment::from_bytes(line_start, remaining);
18,890✔
780
            auto utf_scan_res = is_utf8(frag, '\n');
18,890✔
781
            const auto* lf = utf_scan_res.remaining_ptr();
18,890✔
782
            this->lb_alt_line_starts.emplace_back(before);
18,890✔
783
            this->lb_alt_line_is_utf.emplace_back(utf_scan_res.is_valid());
18,890✔
784
            this->lb_alt_line_has_ansi.emplace_back(utf_scan_res.usr_has_ansi);
18,890✔
785

786
            line_start = lf;
18,890✔
787
        } while (line_start != nullptr
788
                 && line_start < this->lb_alt_buffer->end());
18,890✔
789
    }
790

791
    return retval;
707✔
792
}
707✔
793

794
bool
795
line_buffer::fill_range(file_off_t start,
3,144✔
796
                        ssize_t max_length,
797
                        scan_direction dir)
798
{
799
    bool retval = false;
3,144✔
800

801
    require(start >= 0);
3,144✔
802

803
#if 0
804
    log_debug("(%p) fill range %d %d (%d) %d",
805
              this,
806
              start,
807
              max_length,
808
              this->lb_file_offset,
809
              this->lb_buffer.size());
810
#endif
811
    if (!lnav::pid::in_child && this->lb_loader_future.valid()
3,144✔
812
        && start >= this->lb_loader_file_offset.value())
6,288✔
813
    {
814
#if 0
815
        log_debug("getting preload! %d %d",
816
                  start,
817
                  this->lb_loader_file_offset.value());
818
#endif
819
        std::optional<std::chrono::system_clock::time_point> wait_start;
654✔
820

821
        if (this->lb_loader_future.wait_for(std::chrono::seconds(0))
654✔
822
            != std::future_status::ready)
654✔
823
        {
824
            wait_start = std::make_optional(std::chrono::system_clock::now());
170✔
825
        }
826
        retval = this->lb_loader_future.get();
654✔
827
        if (false && wait_start) {
828
            auto diff = std::chrono::system_clock::now() - wait_start.value();
829
            log_debug("wait done! %lld", diff.count());
830
        }
831
        // log_debug("got preload");
832
        this->lb_loader_future = {};
654✔
833
        this->lb_share_manager.invalidate_refs();
654✔
834
        this->lb_file_offset = this->lb_loader_file_offset.value();
654✔
835
        this->lb_loader_file_offset = std::nullopt;
654✔
836
        this->lb_buffer.swap(this->lb_alt_buffer.value());
654✔
837
        this->lb_alt_buffer.value().clear();
654✔
838
        this->lb_line_starts = std::move(this->lb_alt_line_starts);
654✔
839
        this->lb_alt_line_starts.clear();
654✔
840
        this->lb_line_is_utf = std::move(this->lb_alt_line_is_utf);
654✔
841
        this->lb_alt_line_is_utf.clear();
654✔
842
        this->lb_line_has_ansi = std::move(this->lb_alt_line_has_ansi);
654✔
843
        this->lb_alt_line_has_ansi.clear();
654✔
844
        this->lb_stats.s_used_preloads += 1;
654✔
845
        this->lb_next_line_start_index = 0;
654✔
846
        this->lb_next_buffer_offset = 0;
654✔
847
    }
848
    if (this->in_range(start)
3,144✔
849
        && (max_length == 0 || this->in_range(start + max_length - 1)))
3,144✔
850
    {
851
        /* Cache already has the data, nothing to do. */
852
        retval = true;
62✔
853
        if (this->lb_do_preloading && !lnav::pid::in_child && this->lb_seekable
62✔
854
            && this->lb_buffer.full() && !this->lb_loader_file_offset)
124✔
855
        {
856
            // log_debug("loader available start=%d", start);
857
            auto last_lf_iter = std::find(
858
                this->lb_buffer.rbegin(), this->lb_buffer.rend(), '\n');
3✔
859
            if (last_lf_iter != this->lb_buffer.rend()) {
3✔
860
                auto usable_size
861
                    = std::distance(last_lf_iter, this->lb_buffer.rend());
3✔
862
                // log_debug("found linefeed %d", usable_size);
863
                if (!this->lb_alt_buffer) {
3✔
864
                    // log_debug("allocating new buffer!");
865
                    this->lb_alt_buffer
866
                        = auto_buffer::alloc(this->lb_buffer.capacity());
×
867
                }
868
                this->lb_alt_buffer->resize(this->lb_buffer.size()
3✔
869
                                            - usable_size);
3✔
870
                memcpy(this->lb_alt_buffer.value().begin(),
6✔
871
                       this->lb_buffer.at(usable_size),
3✔
872
                       this->lb_alt_buffer->size());
873
                this->lb_loader_file_offset
874
                    = this->lb_file_offset + usable_size;
3✔
875
#if 0
876
                log_debug("load offset %d",
877
                          this->lb_loader_file_offset.value());
878
                log_debug("launch loader");
879
#endif
880
                auto prom = std::make_shared<std::promise<bool>>();
3✔
881
                this->lb_loader_future = prom->get_future();
3✔
882
                this->lb_stats.s_requested_preloads += 1;
3✔
883
                isc::to<io_looper&, io_looper_tag>().send(
3✔
884
                    [this, prom](auto& ioloop) mutable {
6✔
885
                        prom->set_value(this->load_next_buffer());
3✔
886
                    });
3✔
887
            }
3✔
888
        }
889
    } else if (this->lb_fd != -1) {
3,082✔
890
        ssize_t rc;
891

892
        /* Make sure there is enough space, then */
893
        this->ensure_available(start, max_length, dir);
3,082✔
894

895
        safe::WriteAccess<safe_gz_indexed> gi(this->lb_gz_file);
3,082✔
896

897
        /* ... read in the new data. */
898
        if (!this->lb_cached_fd && *gi) {
3,082✔
899
            // log_debug("old decomp start");
900
            if (this->lb_file_size != (ssize_t) -1 && this->in_range(start)
12✔
901
                && this->in_range(this->lb_file_size - 1))
27✔
902
            {
903
                rc = 0;
6✔
904
            } else {
905
                this->lb_stats.s_decompressions += 1;
9✔
906
                if (false && this->lb_last_line_offset > 0) {
907
                    this->lb_stats.s_hist[(this->lb_file_offset * 10)
908
                                          / this->lb_last_line_offset]
909
                        += 1;
910
                }
911
                rc = gi->read(this->lb_buffer.end(),
18✔
912
                              this->lb_file_offset + this->lb_buffer.size(),
9✔
913
                              this->lb_buffer.available());
914
                this->lb_compressed_offset = gi->get_source_offset();
9✔
915
                ensure(this->lb_compressed_offset >= 0);
9✔
916
                if (rc != -1 && (rc < (ssize_t) this->lb_buffer.available())) {
9✔
917
                    this->lb_file_size
918
                        = (this->lb_file_offset + this->lb_buffer.size() + rc);
9✔
919
                    log_info("fd(%d): rc (%zd) -- set file size to %llu",
9✔
920
                             this->lb_fd.get(),
921
                             rc,
922
                             this->lb_file_size);
923
                }
924
            }
925
#if 0
926
            log_debug("old decomp end -- %d+%d:%d",
927
                      this->lb_buffer.size(),
928
                      rc,
929
                      this->lb_buffer.capacity());
930
#endif
931
        }
932
#ifdef HAVE_BZLIB_H
933
        else if (!this->lb_cached_fd && this->lb_bz_file)
3,067✔
934
        {
935
            if (this->lb_file_size != (ssize_t) -1
6✔
936
                && (((ssize_t) start >= this->lb_file_size)
5✔
937
                    || (this->in_range(start)
2✔
938
                        && this->in_range(this->lb_file_size - 1))))
1✔
939
            {
940
                rc = 0;
2✔
941
            } else {
942
                lock_hack::guard guard;
1✔
943
                char scratch[32 * 1024];
944
                BZFILE* bz_file;
945
                file_off_t seek_to;
946
                int bzfd;
947

948
                /*
949
                 * Unfortunately, there is no bzseek, so we need to reopen the
950
                 * file every time we want to do a read.
951
                 */
952
                bzfd = dup(this->lb_fd);
1✔
953
                if (lseek(this->lb_fd, 0, SEEK_SET) < 0) {
1✔
954
                    close(bzfd);
×
955
                    throw error(errno);
×
956
                }
957
                if ((bz_file = BZ2_bzdopen(bzfd, "r")) == NULL) {
1✔
958
                    close(bzfd);
×
959
                    if (errno == 0) {
×
960
                        throw std::bad_alloc();
×
961
                    } else {
962
                        throw error(errno);
×
963
                    }
964
                }
965

966
                seek_to = this->lb_file_offset + this->lb_buffer.size();
1✔
967
                while (seek_to > 0) {
1✔
968
                    int count;
969

970
                    count = BZ2_bzread(
×
971
                        bz_file,
972
                        scratch,
973
                        std::min((size_t) seek_to, sizeof(scratch)));
×
974
                    seek_to -= count;
×
975
                }
976
                rc = BZ2_bzread(bz_file,
1✔
977
                                this->lb_buffer.end(),
1✔
978
                                this->lb_buffer.available());
1✔
979
                this->lb_compressed_offset = 0;
1✔
980
                BZ2_bzclose(bz_file);
1✔
981

982
                if (rc != -1 && (rc < (ssize_t) this->lb_buffer.available())) {
1✔
983
                    this->lb_file_size
984
                        = (this->lb_file_offset + this->lb_buffer.size() + rc);
1✔
985
                    log_info("fd(%d): set file size to %llu",
1✔
986
                             this->lb_fd.get(),
987
                             this->lb_file_size);
988
                }
989
            }
1✔
990
        }
991
#endif
992
        else if (this->lb_seekable)
3,064✔
993
        {
994
            this->lb_stats.s_preads += 1;
2,881✔
995
            if (false && this->lb_last_line_offset > 0) {
996
                this->lb_stats.s_hist[(this->lb_file_offset * 10)
997
                                      / this->lb_last_line_offset]
998
                    += 1;
999
            }
1000
#if 0
1001
            log_debug("%d: pread %lld",
1002
                      this->lb_fd.get(),
1003
                      this->lb_file_offset + this->lb_buffer.size());
1004
#endif
1005
            rc = pread(this->lb_cached_fd ? this->lb_cached_fd.value().get()
5,762✔
1006
                                          : this->lb_fd.get(),
2,881✔
1007
                       this->lb_buffer.end(),
2,881✔
1008
                       this->lb_buffer.available(),
1009
                       this->lb_file_offset + this->lb_buffer.size());
2,881✔
1010
            // log_debug("pread rc %d", rc);
1011
        } else {
1012
            rc = read(this->lb_fd,
366✔
1013
                      this->lb_buffer.end(),
183✔
1014
                      this->lb_buffer.available());
1015
        }
1016
        // XXX For some reason, cygwin is giving us a bogus return value when
1017
        // up to the end of the file.
1018
        if (rc > (ssize_t) this->lb_buffer.available()) {
3,082✔
1019
            rc = -1;
×
1020
#ifdef ENODATA
1021
            errno = ENODATA;
×
1022
#else
1023
            errno = EAGAIN;
1024
#endif
1025
        }
1026
        switch (rc) {
3,082✔
1027
            case 0:
1,413✔
1028
                if (!this->lb_seekable) {
1,413✔
1029
                    this->lb_file_size
1030
                        = this->lb_file_offset + this->lb_buffer.size();
133✔
1031
                }
1032
                if (start < (file_off_t) this->lb_file_size) {
1,413✔
1033
                    retval = true;
14✔
1034
                }
1035

1036
                if (this->lb_compressed) {
1,413✔
1037
                    /*
1038
                     * For compressed files, increase the buffer size so we
1039
                     * don't have to spend as much time uncompressing the data.
1040
                     */
1041
                    this->resize_buffer(MAX_COMPRESSED_BUFFER_SIZE);
11✔
1042
                }
1043
                break;
1,413✔
1044

1045
            case (ssize_t) -1:
2✔
1046
                switch (errno) {
2✔
1047
#ifdef ENODATA
1048
                    /* Cygwin seems to return this when pread reaches the end of
1049
                     * the */
1050
                    /* file. */
1051
                    case ENODATA:
2✔
1052
#endif
1053
                    case EINTR:
1054
                    case EAGAIN:
1055
                        break;
2✔
1056

1057
                    default:
×
1058
                        throw error(errno);
×
1059
                }
1060
                break;
2✔
1061

1062
            default:
1,667✔
1063
                this->lb_buffer.resize_by(rc);
1,667✔
1064
                retval = true;
1,667✔
1065
                break;
1,667✔
1066
        }
1067

1068
        if (this->lb_do_preloading && !lnav::pid::in_child && this->lb_seekable
2,277✔
1069
            && this->lb_buffer.full() && !this->lb_loader_file_offset)
5,359✔
1070
        {
1071
            // log_debug("loader available2 start=%d", start);
1072
            auto last_lf_iter = std::find(
1073
                this->lb_buffer.rbegin(), this->lb_buffer.rend(), '\n');
×
1074
            if (last_lf_iter != this->lb_buffer.rend()) {
×
1075
                auto usable_size
1076
                    = std::distance(last_lf_iter, this->lb_buffer.rend());
×
1077
                // log_debug("found linefeed %d", usable_size);
1078
                if (!this->lb_alt_buffer) {
×
1079
                    // log_debug("allocating new buffer!");
1080
                    this->lb_alt_buffer
1081
                        = auto_buffer::alloc(this->lb_buffer.capacity());
×
1082
                } else if (this->lb_alt_buffer->capacity()
×
1083
                           < this->lb_buffer.capacity())
×
1084
                {
1085
                    this->lb_alt_buffer->expand_to(this->lb_buffer.capacity());
×
1086
                }
1087
                this->lb_alt_buffer->resize(this->lb_buffer.size()
×
1088
                                            - usable_size);
×
1089
                memcpy(this->lb_alt_buffer->begin(),
×
1090
                       this->lb_buffer.at(usable_size),
×
1091
                       this->lb_alt_buffer->size());
1092
                this->lb_loader_file_offset
1093
                    = this->lb_file_offset + usable_size;
×
1094
#if 0
1095
                log_debug("load offset %d",
1096
                          this->lb_loader_file_offset.value());
1097
                log_debug("launch loader");
1098
#endif
1099
                auto prom = std::make_shared<std::promise<bool>>();
×
1100
                this->lb_loader_future = prom->get_future();
×
1101
                this->lb_stats.s_requested_preloads += 1;
×
1102
                isc::to<io_looper&, io_looper_tag>().send(
×
1103
                    [this, prom](auto& ioloop) mutable {
×
1104
                        prom->set_value(this->load_next_buffer());
×
1105
                    });
×
1106
            }
1107
        }
1108
        ensure(this->lb_buffer.size() <= this->lb_buffer.capacity());
3,082✔
1109
    }
3,082✔
1110

1111
    return retval;
3,144✔
1112
}
1113

1114
Result<line_info, std::string>
1115
line_buffer::load_next_line(file_range prev_line)
19,797✔
1116
{
1117
    const char* line_start = nullptr;
19,797✔
1118
    bool done = false;
19,797✔
1119
    line_info retval;
19,797✔
1120

1121
    require(this->lb_fd != -1);
19,797✔
1122

1123
    if (this->lb_line_metadata && prev_line.fr_offset == 0) {
19,797✔
1124
        prev_line.fr_offset = this->lb_piper_header_size;
128✔
1125
    }
1126

1127
    auto offset = prev_line.next_offset();
19,797✔
1128
    ssize_t request_size = INITIAL_REQUEST_SIZE;
19,797✔
1129
    retval.li_file_range.fr_offset = offset;
19,797✔
1130
    if (this->lb_buffer.empty() || !this->in_range(offset)) {
19,797✔
1131
        this->fill_range(offset, this->lb_buffer.capacity());
2,443✔
1132
    } else if (offset
17,354✔
1133
               == this->lb_file_offset + (ssize_t) this->lb_buffer.size())
17,354✔
1134
    {
1135
        if (!this->fill_range(offset, INITIAL_REQUEST_SIZE)) {
×
1136
            retval.li_file_range.fr_offset = offset;
×
1137
            retval.li_file_range.fr_size = 0;
×
1138
            if (this->is_pipe()) {
×
1139
                retval.li_partial = !this->is_pipe_closed();
×
1140
            } else {
1141
                retval.li_partial = true;
×
1142
            }
1143
            return Ok(retval);
×
1144
        }
1145
    }
1146
    if (prev_line.next_offset() == 0) {
19,797✔
1147
        auto is_utf_res = is_utf8(string_fragment::from_bytes(
3,300✔
1148
            this->lb_buffer.begin(), this->lb_buffer.size()));
1,650✔
1149
        this->lb_is_utf8 = is_utf_res.is_valid();
1,650✔
1150
        if (!this->lb_is_utf8) {
1,650✔
1151
            log_warning("fd(%d): input is not utf8 -- %s",
12✔
1152
                        this->lb_fd.get(),
1153
                        is_utf_res.usr_message);
1154
        }
1155
    }
1156
    while (!done) {
39,520✔
1157
        auto old_retval_size = retval.li_file_range.fr_size;
19,809✔
1158
        const char* lf = nullptr;
19,809✔
1159

1160
        /* Find the data in the cache and */
1161
        line_start = this->get_range(offset, retval.li_file_range.fr_size);
19,809✔
1162
        /* ... look for the end-of-line or end-of-file. */
1163
        ssize_t utf8_end = -1;
19,809✔
1164

1165
        if (!retval.li_utf8_scan_result.is_valid()) {
19,809✔
1166
            retval.li_utf8_scan_result = {};
2✔
1167
        }
1168
        auto found_in_cache = false;
19,809✔
1169
        auto has_ansi = false;
19,809✔
1170
        auto valid_utf8 = true;
19,809✔
1171
        if (!this->lb_line_starts.empty()) {
19,809✔
1172
            auto buffer_offset = offset - this->lb_file_offset;
3✔
1173

1174
            if (this->lb_next_buffer_offset == buffer_offset) {
3✔
1175
                require(this->lb_next_line_start_index
3✔
1176
                        < this->lb_line_starts.size());
1177
                auto start_iter = this->lb_line_starts.begin()
6✔
1178
                    + this->lb_next_line_start_index;
3✔
1179
                auto next_line_iter = start_iter + 1;
3✔
1180
                if (next_line_iter != this->lb_line_starts.end()) {
3✔
1181
                    utf8_end = *next_line_iter - 1 - *start_iter;
3✔
1182
                    found_in_cache = true;
3✔
1183
                    lf = line_start + utf8_end;
3✔
1184
                    has_ansi = this->lb_line_has_ansi
1185
                                   [this->lb_next_line_start_index];
3✔
1186
                    valid_utf8
1187
                        = this->lb_line_is_utf[this->lb_next_line_start_index];
3✔
1188

1189
                    // log_debug("hit cache");
1190
                    this->lb_next_buffer_offset = *next_line_iter;
3✔
1191
                    this->lb_next_line_start_index += 1;
3✔
1192
                } else {
1193
                    // log_debug("no next iter");
1194
                }
1195
            } else {
1196
                auto start_iter = std::lower_bound(this->lb_line_starts.begin(),
×
1197
                                                   this->lb_line_starts.end(),
1198
                                                   buffer_offset);
1199
                if (start_iter != this->lb_line_starts.end()) {
×
1200
                    auto next_line_iter = start_iter + 1;
×
1201

1202
                    // log_debug("found offset %d %d", buffer_offset,
1203
                    // *start_iter);
1204
                    if (next_line_iter != this->lb_line_starts.end()) {
×
1205
                        auto start_index = std::distance(
×
1206
                            this->lb_line_starts.begin(), start_iter);
1207
                        utf8_end = *next_line_iter - 1 - *start_iter;
×
1208
                        found_in_cache = true;
×
1209
                        lf = line_start + utf8_end;
×
1210
                        has_ansi = this->lb_line_has_ansi[start_index];
×
1211
                        valid_utf8 = this->lb_line_is_utf[start_index];
×
1212

1213
                        this->lb_next_line_start_index = start_index + 1;
×
1214
                        this->lb_next_buffer_offset = *next_line_iter;
×
1215
                    } else {
1216
                        // log_debug("no next iter");
1217
                    }
1218
                } else {
1219
                    // log_debug("no buffer_offset found");
1220
                }
1221
            }
1222
        }
1223

1224
        if (found_in_cache && valid_utf8) {
19,809✔
1225
            retval.li_utf8_scan_result.usr_has_ansi = has_ansi;
3✔
1226
        } else {
1227
            auto frag = string_fragment::from_bytes(
19,806✔
1228
                line_start, retval.li_file_range.fr_size);
19,806✔
1229
            auto scan_res = is_utf8(frag, '\n');
19,806✔
1230
            lf = scan_res.remaining_ptr();
19,806✔
1231
            if (lf != nullptr) {
19,806✔
1232
                lf -= 1;
19,093✔
1233
            }
1234
            retval.li_utf8_scan_result = scan_res;
19,806✔
1235
            if (!scan_res.is_valid()) {
19,806✔
1236
                log_warning("fd(%d): line is not utf8 -- %lld",
62✔
1237
                            this->lb_fd.get(),
1238
                            retval.li_file_range.fr_offset);
1239
            }
1240
        }
1241

1242
        auto got_new_data = old_retval_size != retval.li_file_range.fr_size;
19,809✔
1243
#if 0
1244
        log_debug("load next loop %p reqsize %d lsize %d",
1245
                  lf,
1246
                  request_size,
1247
                  retval.li_file_range.fr_size);
1248
#endif
1249
        if (lf != nullptr
19,809✔
1250
            || (retval.li_file_range.fr_size >= MAX_LINE_BUFFER_SIZE)
713✔
1251
            || (request_size >= MAX_LINE_BUFFER_SIZE)
713✔
1252
            || (!got_new_data
21,201✔
1253
                && (!this->is_pipe() || request_size > DEFAULT_INCREMENT)))
679✔
1254
        {
1255
            if ((lf != nullptr)
19,711✔
1256
                && ((size_t) (lf - line_start) >= MAX_LINE_BUFFER_SIZE - 1))
19,096✔
1257
            {
1258
                lf = nullptr;
×
1259
            }
1260
            if (lf != nullptr) {
19,711✔
1261
                retval.li_partial = false;
19,096✔
1262
                retval.li_file_range.fr_size = lf - line_start;
19,096✔
1263
                // delim
1264
                retval.li_file_range.fr_size += 1;
19,096✔
1265
                if (offset >= this->lb_last_line_offset) {
19,096✔
1266
                    this->lb_last_line_offset
1267
                        = offset + retval.li_file_range.fr_size;
18,514✔
1268
                }
1269
            } else {
1270
                if (retval.li_file_range.fr_size >= MAX_LINE_BUFFER_SIZE) {
615✔
1271
                    log_warning("Line exceeded max size: offset=%zd", offset);
×
1272
                    retval.li_file_range.fr_size = MAX_LINE_BUFFER_SIZE - 1;
×
1273
                    retval.li_partial = false;
×
1274
                } else {
1275
                    retval.li_partial = true;
615✔
1276
                }
1277
                this->ensure_available(offset, retval.li_file_range.fr_size);
615✔
1278

1279
                if (retval.li_file_range.fr_size >= MAX_LINE_BUFFER_SIZE) {
615✔
1280
                    retval.li_file_range.fr_size = MAX_LINE_BUFFER_SIZE - 1;
×
1281
                }
1282
                if (retval.li_partial) {
615✔
1283
                    /*
1284
                     * Since no delimiter was seen, we need to remember the
1285
                     * offset of the last line in the file so we don't
1286
                     * mistakenly return two partial lines to the caller.
1287
                     *
1288
                     *   1. read_line() - returns partial line
1289
                     *   2. file is written
1290
                     *   3. read_line() - returns the middle of partial line.
1291
                     */
1292
                    this->lb_last_line_offset = offset;
615✔
1293
                } else if (offset >= this->lb_last_line_offset) {
×
1294
                    this->lb_last_line_offset
1295
                        = offset + retval.li_file_range.fr_size;
×
1296
                }
1297
            }
1298

1299
            offset += retval.li_file_range.fr_size;
19,711✔
1300

1301
            done = true;
19,711✔
1302
        } else {
1303
            if (!this->is_pipe() || !this->is_pipe_closed()) {
98✔
1304
                retval.li_partial = true;
35✔
1305
            }
1306
            request_size
1307
                = std::min<ssize_t>(this->lb_buffer.size() + DEFAULT_INCREMENT,
98✔
1308
                                    MAX_LINE_BUFFER_SIZE);
1309
        }
1310

1311
        if (!done
39,618✔
1312
            && !this->fill_range(
19,907✔
1313
                offset,
1314
                std::max(request_size, (ssize_t) this->lb_buffer.available())))
19,907✔
1315
        {
1316
            break;
86✔
1317
        }
1318
    }
1319

1320
    ensure(retval.li_file_range.fr_size <= (ssize_t) this->lb_buffer.size());
19,797✔
1321
    ensure(this->invariant());
19,797✔
1322
#if 0
1323
    log_debug("got line part %d %d",
1324
              retval.li_file_range.fr_offset,
1325
              (int) retval.li_partial);
1326
#endif
1327

1328
    retval.li_file_range.fr_metadata.m_has_ansi
1329
        = retval.li_utf8_scan_result.usr_has_ansi;
19,797✔
1330
    retval.li_file_range.fr_metadata.m_valid_utf
1331
        = retval.li_utf8_scan_result.is_valid();
19,797✔
1332

1333
    if (this->lb_line_metadata) {
19,797✔
1334
        auto sv = std::string_view{
1335
            line_start,
1336
            (size_t) retval.li_file_range.fr_size,
470✔
1337
        };
470✔
1338

1339
        auto scan_res = scn::scan<int64_t, int64_t, char>(sv, "{}.{}:{};");
470✔
1340
        if (scan_res) {
470✔
1341
            auto& [tv_sec, tv_usec, level] = scan_res->values();
388✔
1342
            retval.li_timestamp.tv_sec = tv_sec;
388✔
1343
            retval.li_timestamp.tv_usec = tv_usec;
388✔
1344
            retval.li_timestamp.tv_sec
1345
                = lnav::to_local_time(date::sys_seconds{std::chrono::seconds{
388✔
1346
                                          retval.li_timestamp.tv_sec}})
388✔
1347
                      .time_since_epoch()
388✔
1348
                      .count();
388✔
1349
            retval.li_level = abbrev2level(&level, 1);
388✔
1350
        }
1351
    }
1352

1353
    return Ok(retval);
19,797✔
1354
}
1355

1356
Result<shared_buffer_ref, std::string>
1357
line_buffer::read_range(file_range fr, scan_direction dir)
90,429✔
1358
{
1359
    shared_buffer_ref retval;
90,429✔
1360
    const char* line_start;
1361
    file_ssize_t avail;
1362

1363
#if 0
1364
    if (this->lb_last_line_offset != -1
1365
        && fr.fr_offset > this->lb_last_line_offset)
1366
    {
1367
        /*
1368
         * Don't return anything past the last known line.  The caller needs
1369
         * to try reading at the offset of the last line again.
1370
         */
1371
        return Err(
1372
            fmt::format(FMT_STRING("attempt to read past the known end of the "
1373
                                   "file: read-offset={}; last_line_offset={}"),
1374
                        fr.fr_offset,
1375
                        this->lb_last_line_offset));
1376
    }
1377
#endif
1378

1379
    if (!(this->in_range(fr.fr_offset)
180,314✔
1380
          && this->in_range(fr.fr_offset + fr.fr_size - 1)))
89,885✔
1381
    {
1382
        if (!this->fill_range(fr.fr_offset, fr.fr_size, dir)) {
603✔
1383
            return Err(std::string("unable to read file"));
×
1384
        }
1385
    }
1386
    line_start = this->get_range(fr.fr_offset, avail);
90,429✔
1387

1388
    if (fr.fr_size > avail) {
90,429✔
1389
        return Err(fmt::format(
×
1390
            FMT_STRING("short-read (need: {}; avail: {})"), fr.fr_size, avail));
×
1391
    }
1392
    if (this->lb_line_metadata) {
90,429✔
1393
        auto new_start
1394
            = static_cast<const char*>(memchr(line_start, ';', fr.fr_size));
2,104✔
1395
        if (new_start) {
2,104✔
1396
            auto offset = new_start - line_start + 1;
1,859✔
1397
            line_start += offset;
1,859✔
1398
            fr.fr_size -= offset;
1,859✔
1399
        }
1400
    }
1401
    retval.share(this->lb_share_manager, line_start, fr.fr_size);
90,429✔
1402
    retval.get_metadata() = fr.fr_metadata;
90,429✔
1403

1404
    return Ok(std::move(retval));
90,429✔
1405
}
90,429✔
1406

1407
Result<auto_buffer, std::string>
NEW
1408
line_buffer::peek_range(file_range fr)
×
1409
{
1410
    static const std::string SHORT_READ_MSG = "short read";
1411

NEW
1412
    require(this->lb_seekable);
×
1413

NEW
1414
    auto buf = auto_buffer::alloc(fr.fr_size);
×
1415

NEW
1416
    if (this->lb_cached_fd) {
×
NEW
1417
        auto rc = pread(this->lb_cached_fd.value().get(),
×
NEW
1418
                        buf.data(),
×
NEW
1419
                        fr.fr_size,
×
1420
                        fr.fr_offset);
NEW
1421
        if (rc == -1) {
×
NEW
1422
            return Err(lnav::from_errno().message());
×
1423
        }
NEW
1424
        if (rc != fr.fr_size) {
×
NEW
1425
            return Err(SHORT_READ_MSG);
×
1426
        }
NEW
1427
        buf.resize(rc);
×
1428

NEW
1429
        return Ok(std::move(buf));
×
1430
    }
1431

NEW
1432
    if (this->lb_compressed) {
×
NEW
1433
        safe::WriteAccess<safe_gz_indexed> gi(this->lb_gz_file);
×
1434

NEW
1435
        if (*gi) {
×
NEW
1436
            auto rc = gi->read(buf.data(), fr.fr_offset, fr.fr_size);
×
1437

NEW
1438
            if (rc == -1) {
×
NEW
1439
                return Err(lnav::from_errno().message());
×
1440
            }
NEW
1441
            if (rc != fr.fr_size) {
×
NEW
1442
                return Err(SHORT_READ_MSG);
×
1443
            }
NEW
1444
            buf.resize(rc);
×
NEW
1445
            return Ok(std::move(buf));
×
1446
        }
NEW
1447
        if (this->lb_bz_file) {
×
NEW
1448
            lock_hack::guard guard;
×
1449
            char scratch[32 * 1024];
1450
            BZFILE* bz_file;
1451
            file_off_t seek_to;
1452
            int bzfd;
1453

1454
            /*
1455
             * Unfortunately, there is no bzseek, so we need to reopen the
1456
             * file every time we want to do a read.
1457
             */
NEW
1458
            bzfd = dup(this->lb_fd);
×
NEW
1459
            if (lseek(this->lb_fd, 0, SEEK_SET) < 0) {
×
NEW
1460
                close(bzfd);
×
NEW
1461
                throw error(errno);
×
1462
            }
NEW
1463
            if ((bz_file = BZ2_bzdopen(bzfd, "r")) == nullptr) {
×
NEW
1464
                close(bzfd);
×
NEW
1465
                if (errno == 0) {
×
NEW
1466
                    throw std::bad_alloc();
×
1467
                } else {
NEW
1468
                    throw error(errno);
×
1469
                }
1470
            }
1471

NEW
1472
            seek_to = fr.fr_offset;
×
NEW
1473
            while (seek_to > 0) {
×
1474
                int count;
1475

NEW
1476
                count = BZ2_bzread(bz_file,
×
1477
                                   scratch,
NEW
1478
                                   std::min((size_t) seek_to, sizeof(scratch)));
×
NEW
1479
                seek_to -= count;
×
1480
            }
NEW
1481
            auto rc = BZ2_bzread(bz_file, buf.data(), fr.fr_size);
×
NEW
1482
            this->lb_compressed_offset = 0;
×
NEW
1483
            BZ2_bzclose(bz_file);
×
1484

NEW
1485
            if (rc == -1) {
×
NEW
1486
                return Err(lnav::from_errno().message());
×
1487
            }
NEW
1488
            if (rc != fr.fr_size) {
×
NEW
1489
                return Err(SHORT_READ_MSG);
×
1490
            }
NEW
1491
            buf.resize(rc);
×
NEW
1492
            return Ok(std::move(buf));
×
1493
        }
1494
    }
1495

NEW
1496
    auto rc = pread(this->lb_fd, buf.data(), fr.fr_size, fr.fr_offset);
×
NEW
1497
    if (rc == -1) {
×
NEW
1498
        return Err(lnav::from_errno().message());
×
1499
    }
NEW
1500
    if (rc != fr.fr_size) {
×
NEW
1501
        return Err(SHORT_READ_MSG);
×
1502
    }
NEW
1503
    buf.resize(rc);
×
NEW
1504
    return Ok(std::move(buf));
×
1505
}
1506

1507
file_range
1508
line_buffer::get_available()
639✔
1509
{
1510
    return {
1511
        this->lb_file_offset,
639✔
1512
        static_cast<file_ssize_t>(this->lb_buffer.size()),
1,278✔
1513
    };
639✔
1514
}
1515

1516
line_buffer::gz_indexed::indexDict::indexDict(const z_stream& s,
×
1517
                                              const file_size_t size)
×
1518
{
1519
    assert((s.data_type & GZ_END_OF_BLOCK_MASK));
1520
    assert(!(s.data_type & GZ_END_OF_FILE_MASK));
1521
    assert(size >= s.avail_out + GZ_WINSIZE);
1522
    this->bits = s.data_type & GZ_BORROW_BITS_MASK;
×
1523
    this->in = s.total_in;
×
1524
    this->out = s.total_out;
×
1525
    auto last_byte_in = s.next_in[-1];
×
1526
    this->in_bits = last_byte_in >> (8 - this->bits);
×
1527
    // Copy the last 32k uncompressed data (sliding window) to our
1528
    // index
1529
    memcpy(this->index, s.next_out - GZ_WINSIZE, GZ_WINSIZE);
×
1530
}
1531

1532
int
1533
line_buffer::gz_indexed::indexDict::apply(z_streamp s)
×
1534
{
1535
    s->zalloc = Z_NULL;
×
1536
    s->zfree = Z_NULL;
×
1537
    s->opaque = Z_NULL;
×
1538
    s->avail_in = 0;
×
1539
    s->next_in = Z_NULL;
×
1540
    auto ret = inflateInit2(s, GZ_RAW_MODE);
×
1541
    if (ret != Z_OK) {
×
1542
        return ret;
×
1543
    }
1544
    if (this->bits) {
×
1545
        inflatePrime(s, this->bits, this->in_bits);
×
1546
    }
1547
    s->total_in = this->in;
×
1548
    s->total_out = this->out;
×
1549
    inflateSetDictionary(s, this->index, GZ_WINSIZE);
×
1550
    return ret;
×
1551
}
1552

1553
bool
1554
line_buffer::is_likely_to_flush(file_range prev_line)
×
1555
{
1556
    auto avail = this->get_available();
×
1557

1558
    if (prev_line.fr_offset < avail.fr_offset) {
×
1559
        return true;
×
1560
    }
1561
    auto prev_line_end = prev_line.fr_offset + prev_line.fr_size;
×
1562
    auto avail_end = avail.fr_offset + avail.fr_size;
×
1563
    if (avail_end < prev_line_end) {
×
1564
        return true;
×
1565
    }
1566
    auto remaining = avail_end - prev_line_end;
×
1567
    return remaining < INITIAL_REQUEST_SIZE;
×
1568
}
1569

1570
void
1571
line_buffer::quiesce()
42✔
1572
{
1573
    if (this->lb_loader_future.valid()) {
42✔
1574
        this->lb_loader_future.wait();
×
1575
    }
1576
}
42✔
1577

1578
static std::filesystem::path
1579
line_buffer_cache_path()
609✔
1580
{
1581
    return lnav::paths::workdir() / "buffer-cache";
1,218✔
1582
}
1583

1584
void
1585
line_buffer::enable_cache()
230✔
1586
{
1587
    if (!this->lb_compressed || this->lb_cached_fd) {
230✔
1588
        log_info("%d: skipping cache request (compressed=%d already-cached=%d)",
230✔
1589
                 this->lb_fd.get(),
1590
                 this->lb_compressed,
1591
                 (bool) this->lb_cached_fd);
1592
        return;
230✔
1593
    }
1594

1595
    struct stat st;
1596

1597
    if (fstat(this->lb_fd, &st) == -1) {
×
1598
        log_error("failed to fstat(%d) - %d", this->lb_fd.get(), errno);
×
1599
        return;
×
1600
    }
1601

1602
    auto cached_base_name = hasher()
×
1603
                                .update(st.st_dev)
×
1604
                                .update(st.st_ino)
×
1605
                                .update(st.st_size)
×
1606
                                .to_string();
×
1607
    auto cache_dir = line_buffer_cache_path() / cached_base_name.substr(0, 2);
×
1608

1609
    std::filesystem::create_directories(cache_dir);
×
1610

1611
    auto cached_file_name = fmt::format(FMT_STRING("{}.bin"), cached_base_name);
×
1612
    auto cached_file_path = cache_dir / cached_file_name;
×
1613
    auto cached_done_path
1614
        = cache_dir / fmt::format(FMT_STRING("{}.done"), cached_base_name);
×
1615

1616
    log_info(
×
1617
        "%d:cache file path: %s", this->lb_fd.get(), cached_file_path.c_str());
1618

1619
    auto fl = lnav::filesystem::file_lock(cached_file_path);
×
1620
    auto guard = lnav::filesystem::file_lock::guard(&fl);
×
1621

1622
    if (std::filesystem::exists(cached_done_path)) {
×
1623
        log_info("%d:using existing cache file", this->lb_fd.get());
×
1624
        auto open_res = lnav::filesystem::open_file(cached_file_path, O_RDWR);
×
1625
        if (open_res.isOk()) {
×
1626
            this->lb_cached_fd = open_res.unwrap();
×
1627
            return;
×
1628
        }
1629
        std::filesystem::remove(cached_done_path);
×
1630
    }
1631

1632
    auto create_res = lnav::filesystem::create_file(
1633
        cached_file_path, O_RDWR | O_TRUNC, 0600);
×
1634
    if (create_res.isErr()) {
×
1635
        log_error("failed to create cache file: %s -- %s",
×
1636
                  cached_file_path.c_str(),
1637
                  create_res.unwrapErr().c_str());
1638
        return;
×
1639
    }
1640

1641
    auto write_fd = create_res.unwrap();
×
1642
    auto done = false;
×
1643

1644
    static constexpr ssize_t FILL_LENGTH = 1024 * 1024;
1645
    auto off = file_off_t{0};
×
1646
    while (!done) {
×
1647
        log_debug("%d: caching file content at %lld", this->lb_fd.get(), off);
×
1648
        if (!this->fill_range(off, FILL_LENGTH)) {
×
1649
            log_debug("%d: caching finished", this->lb_fd.get());
×
1650
            done = true;
×
1651
        } else {
1652
            file_ssize_t avail;
1653

1654
            const auto* data = this->get_range(off, avail);
×
1655
            auto rc = write(write_fd, data, avail);
×
1656
            if (rc != avail) {
×
1657
                log_error("%d: short write!", this->lb_fd.get());
×
1658
                return;
×
1659
            }
1660

1661
            off += avail;
×
1662
        }
1663
    }
1664

1665
    lnav::filesystem::create_file(cached_done_path, O_WRONLY, 0600);
×
1666

1667
    this->lb_cached_fd = std::move(write_fd);
×
1668
}
1669

1670
std::future<void>
1671
line_buffer::cleanup_cache()
609✔
1672
{
1673
    return std::async(
1674
        std::launch::async, +[]() {
609✔
1675
            auto now = std::filesystem::file_time_type::clock::now();
609✔
1676
            auto cache_path = line_buffer_cache_path();
609✔
1677
            std::vector<std::filesystem::path> to_remove;
609✔
1678
            std::error_code ec;
609✔
1679

1680
            for (const auto& cache_subdir :
609✔
1681
                 std::filesystem::directory_iterator(cache_path, ec))
609✔
1682
            {
1683
                for (const auto& entry :
×
1684
                     std::filesystem::directory_iterator(cache_subdir, ec))
×
1685
                {
1686
                    auto mtime = std::filesystem::last_write_time(entry.path());
×
1687
                    auto exp_time = mtime + 1h;
×
1688
                    if (now < exp_time) {
×
1689
                        continue;
×
1690
                    }
1691

1692
                    to_remove.emplace_back(entry.path());
×
1693
                }
1694
            }
609✔
1695

1696
            for (auto& entry : to_remove) {
609✔
1697
                log_debug("removing compressed file cache: %s", entry.c_str());
×
1698
                std::filesystem::remove_all(entry, ec);
×
1699
            }
1700
        });
1,827✔
1701
}
1702

1703
void
1704
line_buffer::send_initial_load()
704✔
1705
{
1706
    if (!this->lb_seekable) {
704✔
1707
        log_warning("file is not seekable, not doing preload");
×
1708
        return;
×
1709
    }
1710

1711
    if (this->lb_loader_future.valid()) {
704✔
1712
        log_warning("preload is already active");
×
1713
        return;
×
1714
    }
1715

1716
    log_debug("sending initial load");
704✔
1717
    if (!this->lb_alt_buffer) {
704✔
1718
        // log_debug("allocating new buffer!");
1719
        this->lb_alt_buffer = auto_buffer::alloc(this->lb_buffer.capacity());
704✔
1720
    }
1721
    this->lb_loader_file_offset = 0;
704✔
1722
    auto prom = std::make_shared<std::promise<bool>>();
704✔
1723
    this->lb_loader_future = prom->get_future();
704✔
1724
    this->lb_stats.s_requested_preloads += 1;
704✔
1725
    isc::to<io_looper&, io_looper_tag>().send(
704✔
1726
        [this, prom](auto& ioloop) mutable {
1,408✔
1727
            prom->set_value(this->load_next_buffer());
704✔
1728
        });
704✔
1729
}
704✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc