• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

scokmen / jpipe / 22623805297

03 Mar 2026 12:43PM UTC coverage: 79.587% (-11.6%) from 91.143%
22623805297

push

github

scokmen
feat: added an orchestrator for managing consumer and producer threads

101 of 167 new or added lines in 3 files covered. (60.48%)

5 existing lines in 1 file now uncovered.

347 of 436 relevant lines covered (79.59%)

131353.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.88
/src/queue.c
1
#include <stdlib.h>
2
#include <string.h>
3
#include <stdbool.h>
4
#include <jp_common.h>
5
#include <jp_queue.h>
6

7
jp_queue_t *jp_queue_create(size_t capacity, size_t chunk_size) {
20✔
8
    jp_queue_t *queue;
20✔
9

10
    size_t blocks_offset = sizeof(jp_queue_t);
20✔
11
    size_t area_offset = blocks_offset + (capacity * sizeof(jp_block_t));
20✔
12
    size_t total_size = area_offset + (capacity * chunk_size);
20✔
13

14
    JP_ALLOC_OR_RET(queue, malloc(total_size), NULL);
20✔
15

16
    queue->active = true;
20✔
17
    queue->capacity = capacity;
20✔
18
    queue->chunk_size = chunk_size;
20✔
19
    queue->head = 0;
20✔
20
    queue->tail = 0;
20✔
21
    queue->length = 0;
20✔
22
    queue->blocks = (jp_block_t *) ((unsigned char *) queue + blocks_offset);
20✔
23
    queue->area = (unsigned char *) queue + area_offset;
20✔
24

25
    for (size_t i = 0; i < capacity; i++) {
3,022✔
26
        queue->blocks[i].data = queue->area + (i * chunk_size);
3,002✔
27
        queue->blocks[i].length = 0;
3,002✔
28
    }
29

30
    pthread_mutex_init(&queue->lock, NULL);
20✔
31
    pthread_cond_init(&queue->not_empty, NULL);
20✔
32
    pthread_cond_init(&queue->not_full, NULL);
20✔
33

34
    return queue;
20✔
35
}
36

37
jp_errno_t jp_queue_push(jp_queue_t *queue, const void *src, size_t len) {
2,000,004✔
38
    pthread_mutex_lock(&queue->lock);
2,000,004✔
39

40
    while (queue->length == queue->capacity && queue->active) {
2,125,040✔
41
        JP_DEBUG("[PUSH]: Queue is full. (len: %zu, cap: %zu", queue->length, queue->capacity);
125,036✔
42
        pthread_cond_wait(&queue->not_full, &queue->lock);
125,036✔
43
    }
44

45
    if (!queue->active) {
2,000,004✔
NEW
46
        JP_DEBUG("[PUSH]: Queue is not active.");
×
NEW
47
        pthread_mutex_unlock(&queue->lock);
×
NEW
48
        return JP_ESHUTTING_DOWN;
×
49
    }
50

51
    size_t block_size = MIN(len, queue->chunk_size);
2,000,004✔
52
    memcpy(queue->blocks[queue->tail].data, src, block_size);
2,000,004✔
53
    queue->blocks[queue->tail].length = block_size;
2,000,004✔
54
    queue->tail = (queue->tail + 1) % queue->capacity;
2,000,004✔
55
    queue->length++;
2,000,004✔
56

57
    pthread_cond_signal(&queue->not_empty);
2,000,004✔
58
    pthread_mutex_unlock(&queue->lock);
2,000,004✔
59
    return 0;
2,000,004✔
60
}
61

62
jp_errno_t jp_queue_pop(jp_queue_t *queue, unsigned char *dest_buffer, size_t max_len, size_t *out_len) {
2,000,004✔
63
    pthread_mutex_lock(&queue->lock);
2,000,004✔
64

65
    while (queue->length == 0 && queue->active) {
2,125,019✔
66
        JP_DEBUG("[POP]: Queue is empty. (len: %zu, cap: %zu)", queue->length, queue->capacity);
125,015✔
67
        pthread_cond_wait(&queue->not_empty, &queue->lock);
125,015✔
68
    }
69

70
    if (queue->length == 0 && !queue->active) {
2,000,004✔
NEW
71
        JP_DEBUG("[POP]: Queue is not active.");
×
NEW
72
        pthread_mutex_unlock(&queue->lock);
×
NEW
73
        return JP_ESHUTTING_DOWN;
×
74
    }
75

76
    size_t block_size = MIN(max_len, queue->blocks[queue->head].length);
2,000,004✔
77
    memcpy(dest_buffer, queue->blocks[queue->head].data, block_size);
2,000,004✔
78
    *out_len = block_size;
2,000,004✔
79

80
    queue->head = (queue->head + 1) % queue->capacity;
2,000,004✔
81
    queue->length--;
2,000,004✔
82

83
    pthread_cond_signal(&queue->not_full);
2,000,004✔
84
    pthread_mutex_unlock(&queue->lock);
2,000,004✔
85
    return 0;
2,000,004✔
86
}
87

NEW
88
void jp_queue_finalize(jp_queue_t *queue) {
×
NEW
89
    pthread_mutex_lock(&queue->lock);
×
NEW
90
    queue->active = false;
×
NEW
91
    pthread_cond_broadcast(&queue->not_empty);
×
NEW
92
    pthread_cond_broadcast(&queue->not_full);
×
NEW
93
    pthread_mutex_unlock(&queue->lock);
×
NEW
94
}
×
95

96
void jp_queue_destroy(jp_queue_t *queue) {
59✔
97
    if (queue == NULL) {
59✔
98
        return;
99
    }
100

101
    pthread_mutex_destroy(&queue->lock);
20✔
102
    pthread_cond_destroy(&queue->not_empty);
20✔
103
    pthread_cond_destroy(&queue->not_full);
20✔
104
    JP_FREE(queue);
20✔
105
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc