• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

scokmen / jpipe / 23056744476

13 Mar 2026 02:59PM UTC coverage: 73.258% (-0.2%) from 73.435%
23056744476

push

github

web-flow
perf(queue): implemented atomic fast-path for push and pop (#8)

26 of 26 new or added lines in 1 file covered. (100.0%)

2 existing lines in 1 file now uncovered.

389 of 531 relevant lines covered (73.26%)

1335.07 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.59
/src/queue.c
1
#include <jp_common.h>
2
#include <jp_queue.h>
3
#include <stdbool.h>
4
#include <stdlib.h>
5

6
jp_queue_t* jp_queue_create(size_t capacity, size_t chunk_size, jp_queue_policy_t policy) {
33✔
7
    jp_queue_t* queue;
33✔
8

9
    size_t blocks_offset = sizeof(jp_queue_t);
33✔
10
    size_t area_offset   = blocks_offset + (capacity * sizeof(jp_block_t));
33✔
11
    size_t total_size    = area_offset + (capacity * chunk_size);
33✔
12

13
    JP_ALLOC(queue, malloc(total_size), NULL);
33✔
14

15
    queue->capacity   = capacity;
33✔
16
    queue->chunk_size = chunk_size;
33✔
17
    queue->policy     = policy;
33✔
18
    queue->head       = 0;
33✔
19
    queue->tail       = 0;
33✔
20
    queue->blocks     = (jp_block_t*) ((unsigned char*) queue + blocks_offset);
33✔
21
    queue->area       = (unsigned char*) queue + area_offset;
33✔
22
    atomic_store_explicit(&queue->active, true, memory_order_relaxed);
33✔
23
    atomic_store_explicit(&queue->length, 0, memory_order_relaxed);
33✔
24

25
    for (size_t i = 0; i < capacity; i++) {
3,504✔
26
        queue->blocks[i].data   = queue->area + i * chunk_size;
3,471✔
27
        queue->blocks[i].length = 0;
3,471✔
28
    }
29

30
    pthread_mutex_init(&queue->lock, NULL);
33✔
31
    pthread_cond_init(&queue->not_empty, NULL);
33✔
32
    pthread_cond_init(&queue->not_full, NULL);
33✔
33

34
    return queue;
33✔
35
}
36

37
jp_errno_t jp_queue_push_uncommitted(jp_queue_t* queue, jp_block_t** block) {
27,019✔
38
    if (JP_ATTR_UNLIKELY(!atomic_load_explicit(&queue->active, memory_order_acquire))) {
27,019✔
39
        return JP_ESHUTTING_DOWN;
40
    }
41

42
    if (atomic_load_explicit(&queue->length, memory_order_acquire) < queue->capacity) {
27,018✔
43
        *block = &queue->blocks[queue->tail];
23,016✔
44
        return 0;
23,016✔
45
    }
46

47
    pthread_mutex_lock(&queue->lock);
4,002✔
48
    while (queue->policy == JP_QUEUE_POLICY_WAIT && atomic_load_explicit(&queue->active, memory_order_relaxed) &&
8,002✔
49
           atomic_load_explicit(&queue->length, memory_order_relaxed) >= queue->capacity) {
8,000✔
50
        pthread_cond_wait(&queue->not_full, &queue->lock);
4,000✔
51
    }
52

53
    if (!atomic_load_explicit(&queue->active, memory_order_relaxed)) {
4,002✔
UNCOV
54
        pthread_mutex_unlock(&queue->lock);
×
UNCOV
55
        return JP_ESHUTTING_DOWN;
×
56
    }
57

58
    if (queue->policy == JP_QUEUE_POLICY_DROP &&
4,002✔
59
        atomic_load_explicit(&queue->length, memory_order_relaxed) >= queue->capacity) {
2✔
60
        pthread_mutex_unlock(&queue->lock);
2✔
61
        return JP_EMSG_SHOULD_DROP;
2✔
62
    }
63

64
    *block = &queue->blocks[queue->tail];
4,000✔
65
    pthread_mutex_unlock(&queue->lock);
4,000✔
66
    return 0;
4,000✔
67
}
68

69
void jp_queue_push_commit(jp_queue_t* queue) {
27,016✔
70
    pthread_mutex_lock(&queue->lock);
27,016✔
71
    queue->tail = (queue->tail + 1) % queue->capacity;
27,016✔
72
    atomic_fetch_add_explicit(&queue->length, 1, memory_order_release);
27,016✔
73
    pthread_cond_signal(&queue->not_empty);
27,016✔
74
    pthread_mutex_unlock(&queue->lock);
27,016✔
75
}
27,016✔
76

77
jp_errno_t jp_queue_pop_uncommitted(jp_queue_t* queue, jp_block_t** block) {
27,019✔
78
    if (atomic_load_explicit(&queue->length, memory_order_acquire) > 0) {
27,019✔
79
        *block = &queue->blocks[queue->head];
23,014✔
80
        return 0;
23,014✔
81
    }
82

83
    pthread_mutex_lock(&queue->lock);
4,005✔
84
    while (atomic_load_explicit(&queue->length, memory_order_relaxed) == 0 &&
8,005✔
85
           atomic_load_explicit(&queue->active, memory_order_relaxed)) {
4,005✔
86
        pthread_cond_wait(&queue->not_empty, &queue->lock);
4,000✔
87
    }
88

89
    if (atomic_load_explicit(&queue->length, memory_order_relaxed) == 0 &&
4,005✔
90
        !atomic_load_explicit(&queue->active, memory_order_relaxed)) {
5✔
91
        pthread_mutex_unlock(&queue->lock);
5✔
92
        return JP_ESHUTTING_DOWN;
5✔
93
    }
94

95
    *block = &queue->blocks[queue->head];
4,000✔
96
    pthread_mutex_unlock(&queue->lock);
4,000✔
97
    return 0;
4,000✔
98
}
99

100
void jp_queue_pop_commit(jp_queue_t* queue) {
27,014✔
101
    pthread_mutex_lock(&queue->lock);
27,014✔
102
    queue->head = (queue->head + 1) % queue->capacity;
27,014✔
103
    atomic_fetch_sub_explicit(&queue->length, 1, memory_order_release);
27,014✔
104
    pthread_cond_signal(&queue->not_full);
27,014✔
105
    pthread_mutex_unlock(&queue->lock);
27,014✔
106
}
27,014✔
107

108
void jp_queue_finalize(jp_queue_t* queue) {
6✔
109
    pthread_mutex_lock(&queue->lock);
6✔
110
    atomic_store_explicit(&queue->active, false, memory_order_release);
6✔
111
    pthread_cond_broadcast(&queue->not_empty);
6✔
112
    pthread_cond_broadcast(&queue->not_full);
6✔
113
    pthread_mutex_unlock(&queue->lock);
6✔
114
}
6✔
115

116
void jp_queue_destroy(jp_queue_t* queue) {
82✔
117
    if (queue == NULL) {
82✔
118
        return;
119
    }
120
    pthread_mutex_destroy(&queue->lock);
33✔
121
    pthread_cond_destroy(&queue->not_empty);
33✔
122
    pthread_cond_destroy(&queue->not_full);
33✔
123
    JP_FREE(queue);
33✔
124
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc