• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

scokmen / jpipe / 24243026519

10 Apr 2026 12:31PM UTC coverage: 82.48% (-2.5%) from 85.021%
24243026519

push

github

scokmen
ci: fixed ubuntu based analyzer errors

8 of 28 new or added lines in 6 files covered. (28.57%)

67 existing lines in 7 files now uncovered.

612 of 742 relevant lines covered (82.48%)

5562.42 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.0
/src/queue.c
1
#include <jp_common.h>
2
#include <jp_memory.h>
3
#include <jp_queue.h>
4
#include <stdbool.h>
5
#include <stdlib.h>
6

7
jp_queue_t* jp_queue_create(size_t capacity, size_t chunk_size, jp_queue_policy_t policy, jp_errno_t* err) {
42✔
8
    int status                 = 0;
42✔
9
    const size_t blocks_offset = sizeof(jp_queue_t);
42✔
10
    const size_t area_offset   = blocks_offset + capacity * sizeof(jp_block_t);
42✔
11
    const size_t total_size    = area_offset + capacity * chunk_size;
42✔
12
    jp_queue_t* queue          = jp_mem_malloc(total_size);
42✔
13

14
    queue->capacity   = capacity;
42✔
15
    queue->chunk_size = chunk_size;
42✔
16
    queue->policy     = policy;
42✔
17
    queue->head       = 0;
42✔
18
    queue->tail       = 0;
42✔
19
    queue->blocks     = (jp_block_t*) ((unsigned char*) queue + blocks_offset);
42✔
20
    queue->area       = (unsigned char*) queue + area_offset;
42✔
21
    atomic_store_explicit(&queue->active, true, memory_order_relaxed);
42✔
22
    atomic_store_explicit(&queue->length, 0, memory_order_relaxed);
42✔
23

24
    for (size_t i = 0; i < capacity; i++) {
3,559✔
25
        queue->blocks[i].data   = queue->area + i * chunk_size;
3,517✔
26
        queue->blocks[i].length = 0;
3,517✔
27
    }
28

29
    status = pthread_mutex_init(&queue->lock, NULL);
42✔
30
    if (status != 0) {
42✔
UNCOV
31
        *err = JP_ERRNO_RAISE_POSIX(JP_ESYS_ERR, status);
×
UNCOV
32
        goto clean_up;
×
33
    }
34

35
    status = pthread_cond_init(&queue->not_empty, NULL);
42✔
36
    if (status != 0) {
42✔
UNCOV
37
        *err = JP_ERRNO_RAISE_POSIX(JP_ESYS_ERR, status);
×
UNCOV
38
        goto clean_up_lock;
×
39
    }
40

41
    status = pthread_cond_init(&queue->not_full, NULL);
42✔
42
    if (status != 0) {
42✔
UNCOV
43
        *err = JP_ERRNO_RAISE_POSIX(JP_ESYS_ERR, status);
×
UNCOV
44
        goto clean_up_cond;
×
45
    }
46

47
    return queue;
48

UNCOV
49
clean_up_cond:
×
UNCOV
50
    pthread_cond_destroy(&queue->not_empty);
×
51

UNCOV
52
clean_up_lock:
×
53
    pthread_mutex_destroy(&queue->lock);
×
54

UNCOV
55
clean_up:
×
UNCOV
56
    JP_FREE(queue);
×
57
    return NULL;
58
}
59

60
jp_errno_t jp_queue_push_uncommitted(jp_queue_t* queue, jp_block_t** block) {
123,919✔
61
    if (JP_ATTR_UNLIKELY(!atomic_load_explicit(&queue->active, memory_order_acquire))) {
123,919✔
62
        return JP_ESHUTTING_DOWN;
63
    }
64

65
    if (atomic_load_explicit(&queue->length, memory_order_acquire) < queue->capacity) {
123,918✔
66
        *block = &queue->blocks[queue->tail];
108,176✔
67
        return 0;
108,176✔
68
    }
69

70
    pthread_mutex_lock(&queue->lock);
15,742✔
71
    while (queue->policy == JP_QUEUE_POLICY_WAIT && atomic_load_explicit(&queue->active, memory_order_relaxed) &&
31,455✔
72
           atomic_load_explicit(&queue->length, memory_order_relaxed) >= queue->capacity) {
31,427✔
73
        pthread_cond_wait(&queue->not_full, &queue->lock);
15,713✔
74
    }
75

76
    if (!atomic_load_explicit(&queue->active, memory_order_relaxed)) {
15,742✔
UNCOV
77
        pthread_mutex_unlock(&queue->lock);
×
UNCOV
78
        return JP_ESHUTTING_DOWN;
×
79
    }
80

81
    if (queue->policy == JP_QUEUE_POLICY_DROP &&
15,742✔
82
        atomic_load_explicit(&queue->length, memory_order_relaxed) >= queue->capacity) {
28✔
83
        pthread_mutex_unlock(&queue->lock);
28✔
84
        return JP_EMSG_SHOULD_DROP;
28✔
85
    }
86

87
    *block = &queue->blocks[queue->tail];
15,714✔
88
    pthread_mutex_unlock(&queue->lock);
15,714✔
89
    return 0;
15,714✔
90
}
91

92
void jp_queue_push_commit(jp_queue_t* queue) {
122,127✔
93
    pthread_mutex_lock(&queue->lock);
122,127✔
94
    queue->tail = (queue->tail + 1) % queue->capacity;
122,127✔
95
    atomic_fetch_add_explicit(&queue->length, 1, memory_order_release);
122,127✔
96
    pthread_cond_signal(&queue->not_empty);
122,127✔
97
    pthread_mutex_unlock(&queue->lock);
122,127✔
98
}
122,127✔
99

100
jp_errno_t jp_queue_pop_uncommitted(jp_queue_t* queue, jp_block_t** block) {
122,139✔
101
    if (atomic_load_explicit(&queue->length, memory_order_acquire) > 0) {
122,139✔
102
        *block = &queue->blocks[queue->head];
100,371✔
103
        return 0;
100,371✔
104
    }
105

106
    pthread_mutex_lock(&queue->lock);
21,768✔
107
    while (atomic_load_explicit(&queue->length, memory_order_relaxed) == 0 &&
43,387✔
108
           atomic_load_explicit(&queue->active, memory_order_relaxed)) {
21,633✔
109
        pthread_cond_wait(&queue->not_empty, &queue->lock);
21,619✔
110
    }
111

112
    if (atomic_load_explicit(&queue->length, memory_order_relaxed) == 0 &&
21,768✔
113
        !atomic_load_explicit(&queue->active, memory_order_relaxed)) {
14✔
114
        pthread_mutex_unlock(&queue->lock);
14✔
115
        return JP_ESHUTTING_DOWN;
14✔
116
    }
117

118
    *block = &queue->blocks[queue->head];
21,754✔
119
    pthread_mutex_unlock(&queue->lock);
21,754✔
120
    return 0;
21,754✔
121
}
122

123
void jp_queue_pop_commit(jp_queue_t* queue) {
122,125✔
124
    pthread_mutex_lock(&queue->lock);
122,125✔
125
    queue->head = (queue->head + 1) % queue->capacity;
122,125✔
126
    atomic_fetch_sub_explicit(&queue->length, 1, memory_order_release);
122,125✔
127
    pthread_cond_signal(&queue->not_full);
122,125✔
128
    pthread_mutex_unlock(&queue->lock);
122,125✔
129
}
122,125✔
130

131
void jp_queue_finalize(jp_queue_t* queue) {
21✔
132
    if (!atomic_load_explicit(&queue->active, memory_order_acquire)) {
21✔
133
        JP_LOG_DEBUG("[QUEUE]: Queue was already finalized.");
4✔
134
        return;
4✔
135
    }
136
    JP_LOG_DEBUG("[QUEUE]: Queue is being finalized.");
17✔
137
    pthread_mutex_lock(&queue->lock);
17✔
138
    atomic_store_explicit(&queue->active, false, memory_order_release);
17✔
139
    pthread_cond_broadcast(&queue->not_empty);
17✔
140
    pthread_cond_broadcast(&queue->not_full);
17✔
141
    pthread_mutex_unlock(&queue->lock);
17✔
142
}
143

144
void jp_queue_destroy(jp_queue_t* queue) {
91✔
145
    if (queue == NULL) {
91✔
146
        return;
147
    }
148
    pthread_mutex_destroy(&queue->lock);
42✔
149
    pthread_cond_destroy(&queue->not_empty);
42✔
150
    pthread_cond_destroy(&queue->not_full);
42✔
151
    JP_FREE(queue);
42✔
152
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc