• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

scokmen / jpipe / 24248365958

10 Apr 2026 02:38PM UTC coverage: 82.804% (+0.3%) from 82.48%
24248365958

push

github

scokmen
perf: improved wait/mutex perf using algorith similar to neglo's

24 of 25 new or added lines in 2 files covered. (96.0%)

626 of 756 relevant lines covered (82.8%)

956.37 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.45
/src/reader.c
1
#include <jp_memory.h>
2
#include <jp_poller.h>
3
#include <jp_queue.h>
4
#include <jp_reader.h>
5
#include <stdlib.h>
6
#include <unistd.h>
7

8
JP_ATTR_WEAK
9
jp_errno_t jp_reader_consume(jp_reader_ctx_t ctx) {
3✔
10
    jp_errno_t err = 0, queue_err = 0;
3✔
11
    ssize_t read_len      = 0;
3✔
12
    jp_block_t* block     = NULL;
3✔
13
    unsigned char* buffer = NULL;
3✔
14
    jp_poller_t* poller   = jp_poller_create(100, &err);
3✔
15

16
    if (err) {
3✔
17
        goto clean_up;
×
18
    }
19

20
    buffer = jp_mem_calloc(1, ctx.chunk_size);
3✔
21
    err    = jp_poller_poll(poller, ctx.input_stream);
3✔
22
    if (err) {
3✔
23
        goto clean_up;
×
24
    }
25

26
    while (true) {
33✔
27
        err = jp_poller_wait(poller);
33✔
28
        if (JP_ATTR_UNLIKELY(err == JP_EREAD_FAILED)) {
33✔
29
            break;
30
        }
31
        if (err == JP_ETRYAGAIN) {
33✔
32
            continue;
×
33
        }
34
        while (true) {
257✔
35
            if (block == NULL) {
257✔
36
                queue_err = jp_queue_push_uncommitted(ctx.queue, &block);
49✔
37
                if (queue_err == 0) {
49✔
38
                    block->length = 0;
24✔
39
                }
40
                if (JP_ATTR_UNLIKELY(queue_err == JP_ESHUTTING_DOWN)) {
49✔
NEW
41
                    goto clean_up;
×
42
                }
43
            }
44
            if (queue_err == JP_EMSG_SHOULD_DROP) {
257✔
45
                read_len = read(ctx.input_stream, buffer, ctx.chunk_size);
25✔
46
                if (read_len > 0) {
25✔
47
                    continue;
12✔
48
                }
49
            } else {
50
                read_len = read(ctx.input_stream, block->data + block->length, ctx.chunk_size - block->length);
232✔
51
                if (JP_ATTR_LIKELY(read_len > 0)) {
232✔
52
                    block->length += (size_t) read_len;
212✔
53
                    if (block->length == ctx.chunk_size) {
212✔
54
                        jp_queue_push_commit(ctx.queue);
4✔
55
                        block = NULL;
4✔
56
                        continue;
4✔
57
                    }
58
                }
59
            }
60

61
            if (read_len == 0) {
241✔
62
                if (queue_err == 0 && block->length > 0) {
3✔
63
                    jp_queue_push_commit(ctx.queue);
2✔
64
                }
65
                goto clean_up;
3✔
66
            }
67
            if (read_len < 0) {
238✔
68
                if (errno == EINTR) {
30✔
69
                    continue;
×
70
                }
71
                if (JP_ERRNO_EAGAIN(errno)) {
30✔
72
                    if (queue_err == 0 && block->length > 0) {
30✔
73
                        jp_queue_push_commit(ctx.queue);
18✔
74
                        block = NULL;
18✔
75
                    }
76
                    break;
77
                }
78
                err = JP_ERRNO_RAISE_POSIX(JP_EREAD_FAILED, errno);
×
79
                goto clean_up;
×
80
            }
81
        }
82
    }
83

84
clean_up:
×
85
    if (err == 0 || JP_QUEUE_IS_GRACEFUL_ERR(err)) {
3✔
86
        err = 0;
3✔
87
    }
88
    JP_FREE(buffer);
3✔
89
    jp_poller_destroy(poller);
3✔
90
    jp_queue_finalize(ctx.queue);
3✔
91
    return err;
3✔
92
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc