• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

scokmen / jpipe / 22961152515

11 Mar 2026 03:44PM UTC coverage: 73.41% (+0.1%) from 73.308%
22961152515

push

github

web-flow
perf(queue): implemented zero-copy push (#6)

10 of 28 new or added lines in 2 files covered. (35.71%)

1 existing line in 1 file now uncovered.

381 of 519 relevant lines covered (73.41%)

1186.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.54
/src/worker.c
1
#include <errno.h>
2
#include <getopt.h>
3
#include <jp_command.h>
4
#include <jp_errno.h>
5
#include <jp_poller.h>
6
#include <jp_queue.h>
7
#include <jp_worker.h>
8
#include <signal.h>
9
#include <stdint.h>
10
#include <stdio.h>
11
#include <stdlib.h>
12
#include <string.h>
13
#include <sys/stat.h>
14
#include <unistd.h>
15

16
typedef void* (*worker_func)(void*);
17

18
typedef struct {
19
    bool running;
20
    bool detached;
21
    pthread_t tid;
22
    worker_func func;
23
} worker_thread_t;
24

25
typedef struct {
26
    size_t chunk_size;
27
    size_t buffer_size;
28
    bool dry_run;
29
    jp_queue_policy_t policy;
30
    const char* out_dir;
31
    jp_queue_t* queue;
32
    jp_field_set_t* fields;
33
} worker_arg_t;
34

35
static jp_errno_t display_help(void) {
2✔
36
    JP_LOG("Usage: jpipe run [options]\n");
2✔
37
    JP_LOG("Execute the data processing engine with the following configurations:\n");
2✔
38
    JP_LOG("Options:");
2✔
39
    JP_LOG("  -c, --chunk-size  <size>     Chunk size (e.g., 16kb, 64kb). Range: 1kb-128kb  (default: 16kb).");
2✔
40
    JP_LOG("  -b, --buffer-size <count>    Max pending operations. Range: 1-1024 (default: 64).");
2✔
41
    JP_LOG("  -p, --policy      <type>     Overflow policy: 'wait' or 'drop' (default: wait).");
2✔
42
    JP_LOG("  -o, --out-dir     <path>     Output directory (default: current dir).");
2✔
43
    JP_LOG("  -f, --field       key=value  Additional field to the JSON output. Can be used multiple times.");
2✔
44
    JP_LOG("  -n, --dry-run                Dry run.");
2✔
45
    JP_LOG("  -h, --help                   Show this help message.\n");
2✔
46
    JP_LOG("Field Options:");
2✔
47
    JP_LOG("  -f, --field \"key=value\"   Add a field to the JSON output.\n");
2✔
48
    JP_LOG("  Key Rules:");
2✔
49
    JP_LOG("    - Must contain only: 'a-z', 'A-Z', '0-9', '_' and '-'.");
2✔
50
    JP_LOG("    - Maximum length: 64 characters.\n");
2✔
51
    JP_LOG("  Value Type Inference:");
2✔
52
    JP_LOG("    - key=123        -> Number  (no quotes in JSON).");
2✔
53
    JP_LOG("    - key=true|false -> Boolean (no quotes in JSON).");
2✔
54
    JP_LOG("    - key=string     -> String.");
2✔
55
    JP_LOG("    - key=\"string\"   -> String.");
2✔
56
    JP_LOG("    - key=\"123\"      -> Forced string.");
2✔
57
    JP_LOG("    - key=\"true\"     -> Forced string.");
2✔
58
    JP_LOG("    - key=\"str=ng\"   -> String.");
2✔
59
    JP_LOG("    - key=str=ng     -> Invalid field.\n");
2✔
60
    JP_LOG("  Example:");
2✔
61
    JP_LOG("    Input : jpipe -f \"id=101\" -f \"name=app\" -f \"active=true\" -f \"ver=1.2.0\"");
2✔
62
    JP_LOG("    Output: {\"id\": 101, \"name\": \"app\", \"active\": true, \"ver\": \"1.2.0\"}");
2✔
63
    return 0;
2✔
64
}
65

66
static void display_summary(worker_arg_t* args) {
23✔
67
    double estimated_mem_usage = ((double) args->chunk_size * (double) args->buffer_size) / (BYTES_IN_KB * BYTES_IN_KB);
23✔
68

69
    JP_LOG("[jpipe]: Configuration Summary\n");
23✔
70
    JP_LOG("[Runtime Parameters]");
23✔
71
    JP_LOG("• Chunk Size   (-c) : %zu KB", (args->chunk_size / BYTES_IN_KB));
23✔
72
    JP_LOG("• Buffer Size  (-b) : %zu", args->buffer_size);
23✔
73
    JP_LOG("• Output Dir   (-o) : %s", args->out_dir);
23✔
74
    JP_LOG("• Policy       (-p) : %s", args->policy == JP_QUEUE_POLICY_WAIT ? "WAIT" : "DROP");
25✔
75
    if (args->fields->len > 0) {
23✔
76
        JP_LOG("• Fields       (-f) :");
1✔
77
        for (size_t i = 0; i < args->fields->len; i++) {
2✔
78
            JP_LOG("     %zu. %-32s= %-32s", i + 1, args->fields->fields[i]->key, args->fields->fields[i]->val);
1✔
79
        }
80
    }
81
    JP_LOG("\n[Resource Utilization]");
23✔
82
    JP_LOG("• Memory Usage      :  ~%.2f MB", estimated_mem_usage);
23✔
83
    JP_LOG("\nThese values are based on user-provided parameters.");
23✔
84
    JP_LOG("Memory usage is an approximation; operating system overhead and thread stack allocations are not included.");
23✔
85
}
23✔
86

87
static jp_errno_t set_out_dir(const char* arg, worker_arg_t* args) {
9✔
88
    size_t len = 0;
9✔
89
    JP_FREE(args->out_dir);
9✔
90
    if (arg == NULL) {
9✔
91
        return jp_errno_log_err_format(JP_EMISSING_CMD, "Output path is empty.");
×
92
    }
93
    len = strlen(arg);
9✔
94
    if (len == 0) {
9✔
95
        return jp_errno_log_err_format(JP_EMISSING_CMD, "Output path is empty.");
2✔
96
    }
97
    if (len > JP_PATH_MAX) {
7✔
98
        return jp_errno_log_err_format(JP_EMISSING_CMD, "Output path is too long. Maximum allowed path size: %d.", JP_PATH_MAX);
×
99
    }
100
    JP_ALLOC_OR_LOG(args->out_dir, strdup(arg));
7✔
101
    return 0;
102
}
103

104
static jp_errno_t set_field(const char* arg, worker_arg_t* args) {
32✔
105
    jp_errno_t err;
32✔
106
    if (arg == NULL) {
32✔
107
        return jp_errno_log_err_format(JP_EINV_FIELD_KEY, "Field key/value is empty");
×
108
    }
109

110
    JP_ASSUME(args->fields != NULL);
32✔
111
    err = jp_field_set_add(args->fields, arg);
32✔
112
    if (err) {
32✔
113
        return jp_errno_log_err_format(err, "Field key/value is invalid: '%s'", arg);
×
114
    }
115
    return 0;
116
}
117

118
static jp_errno_t set_chunk_size(const char* arg, worker_arg_t* args) {
27✔
119
    char* end_ptr;
27✔
120
    size_t chunk_size        = 0;
27✔
121
    unsigned long long param = 0;
27✔
122

123
    errno = 0;
27✔
124
    param = strtoull(arg, &end_ptr, 10);
27✔
125

126
    if (errno == ERANGE) {
27✔
127
        return jp_errno_log_err_format(JP_ECHUNK_SIZE, "Chunk size format is incorrect: '%.32s'.", arg);
2✔
128
    }
129

130
    if (end_ptr == arg) {
25✔
131
        return jp_errno_log_err_format(JP_ECHUNK_SIZE, "Chunk size is empty: '%.32s'.", arg);
4✔
132
    }
133

134
    if (*end_ptr != '\0' && !strcmp(end_ptr, "kb") && param <= (JP_WRK_CHUNK_SIZE_MAX / BYTES_IN_KB)) {
21✔
135
        chunk_size += (param * BYTES_IN_KB);
9✔
136
    } else {
137
        return jp_errno_log_err_format(JP_ECHUNK_SIZE, "Chunk size value is invalid: '%.32s'.", arg);
12✔
138
    }
139

140
    if (chunk_size < JP_WRK_CHUNK_SIZE_MIN || chunk_size > JP_WRK_CHUNK_SIZE_MAX) {
9✔
141
        return jp_errno_log_err_format(JP_ECHUNK_SIZE, "Chunk size value is invalid: '%.32s'.", arg);
2✔
142
    }
143
    args->chunk_size = chunk_size;
7✔
144
    return 0;
7✔
145
}
146

147
static jp_errno_t set_buffer_size(const char* arg, worker_arg_t* args) {
19✔
148
    char* end_ptr;
19✔
149
    unsigned long long param = 0;
19✔
150

151
    errno = 0;
19✔
152
    param = strtoull(arg, &end_ptr, 10);
19✔
153

154
    if (errno == ERANGE || end_ptr == arg || *end_ptr != '\0') {
19✔
155
        return jp_errno_log_err_format(JP_EBUFFER_SIZE, "Buffer size format is incorrect: '%.32s'.", arg);
6✔
156
    }
157

158
    if (param < JP_WRK_BUFFER_SIZE_MIN || param > JP_WRK_BUFFER_SIZE_MAX) {
13✔
159
        return jp_errno_log_err_format(JP_EBUFFER_SIZE, "Buffer size format invalid: '%.32s'.", arg);
6✔
160
    }
161

162
    args->buffer_size = (size_t) param;
7✔
163
    return 0;
7✔
164
}
165

166
static jp_errno_t set_policy(const char* arg, worker_arg_t* args) {
10✔
167
    if (!strcmp(arg, "wait")) {
10✔
168
        args->policy = JP_QUEUE_POLICY_WAIT;
2✔
169
        return 0;
2✔
170
    }
171

172
    if (!strcmp(arg, "drop")) {
8✔
173
        args->policy = JP_QUEUE_POLICY_DROP;
2✔
174
        return 0;
2✔
175
    }
176

177
    return jp_errno_log_err_format(JP_EOVERFLOW_POLICY, "Overflow policy is invalid: '%.32s'.", arg);
6✔
178
}
179

180
static jp_errno_t handle_unknown_argument(const char* cmd) {
4✔
181
    return jp_errno_log_err_format(JP_EUNKNOWN_RUN_CMD, "Invalid or incomplete [run] command: '%.32s'.", cmd);
4✔
182
}
183

184
static jp_errno_t create_and_normalize_out_dir(worker_arg_t* args) {
27✔
185
    char tmp[JP_PATH_MAX]           = {0};
27✔
186
    char absolute_path[JP_PATH_MAX] = {0};
27✔
187
    char* p                         = NULL;
27✔
188
    struct stat st;
27✔
189

190
    if (args->out_dir == NULL) {
27✔
191
        JP_ALLOC_OR_LOG(args->out_dir, strdup(JP_WRK_OUTDIR_DEF));
20✔
192
    }
193
    size_t path_len = strlen(args->out_dir);
27✔
194
    strncpy(tmp, args->out_dir, sizeof(tmp));
27✔
195

196
    while (path_len > 1 && tmp[path_len - 1] == '/') {
27✔
197
        tmp[path_len - 1] = '\0';
×
198
        path_len--;
×
199
    }
200

201
    for (p = tmp + 1; *p; p++) {
205✔
202
        if (*p == '/') {
178✔
203
            *p = '\0';
13✔
204
            if (mkdir(tmp, 0755) != 0 && errno != EEXIST) {
13✔
205
                return jp_errno_log_err_format(JP_EOUT_DIR, "Could not create the output directory: '%s'", tmp);
×
206
            }
207
            *p = '/';
13✔
208
        }
209
    }
210

211
    if (mkdir(tmp, 0755) != 0 && errno != EEXIST) {
27✔
212
        return jp_errno_log_err_format(JP_EOUT_DIR, "Could not create the output directory: '%s'.", tmp);
3✔
213
    }
214

215
    if (stat(tmp, &st) != 0 || !S_ISDIR(st.st_mode)) {
24✔
216
        return jp_errno_log_err_format(JP_EOUT_DIR, "The target path is not a directory: '%s'.", tmp);
1✔
217
    }
218

219
    if (access(tmp, W_OK) != 0) {
23✔
220
        return jp_errno_log_err_format(JP_EOUT_DIR, "The target path is inaccessible: '%s'.", tmp);
×
221
    }
222

223
    if (realpath(tmp, absolute_path) == NULL) {
23✔
224
        return jp_errno_log_err_format(JP_EOUT_DIR, "Could not resolve absolute path: '%s'.", tmp);
×
225
    }
226

227
    JP_FREE(args->out_dir);
23✔
228
    JP_ALLOC_OR_LOG(args->out_dir, strdup(absolute_path));
23✔
229
    return 0;
230
}
231

232
static int get_field_args_count(int argc, char* argv[]) {
72✔
233
    int c = 0;
72✔
234
    for (int i = 0; i < argc; i++) {
479✔
235
        if (JP_CMD_EQ(argv[i], "-f", "--field")) {
407✔
236
            c++;
65✔
237
        }
238
    }
239
    return c;
72✔
240
}
241

242
static jp_errno_t init_worker_args(int argc, char* argv[], worker_arg_t* args) {
72✔
243
    int fields = get_field_args_count(argc, argv);
72✔
244
    if (fields > JP_WRK_FIELDS_MAX) {
72✔
245
        return jp_errno_log_err_format(JP_ETOO_MANY_FIELD, "Too many 'fields' specified: '%d'", fields);
1✔
246
    }
247
    JP_ALLOC_OR_LOG(args->fields, jp_field_set_create((size_t) fields));
71✔
248
    return 0;
249
}
250

251
static jp_errno_t collect_cli_args(int argc, char* argv[], worker_arg_t* args) {
71✔
252
    int option;
71✔
253
    opterr            = 0;
71✔
254
    optind            = 1;
71✔
255
    args->chunk_size  = JP_WRK_CHUNK_SIZE_DEF;
71✔
256
    args->buffer_size = JP_WRK_BUFFER_SIZE_DEF;
71✔
257
    args->out_dir     = NULL;
71✔
258

259
    static struct option long_options[] = {{"chunk-size", required_argument, 0, 'c'},
71✔
260
                                           {"buffer-size", required_argument, 0, 'b'},
261
                                           {"policy", required_argument, 0, 'p'},
262
                                           {"field", required_argument, 0, 'f'},
263
                                           {"out-dir", required_argument, 0, 'o'},
264
                                           {"dry-run", required_argument, 0, 'n'},
265
                                           {"help", no_argument, 0, 'h'},
266
                                           {0, 0, 0, 0}};
267

268
    while ((option = getopt_long(argc, argv, ":c:b:p:o:f:hn", long_options, NULL)) != -1) {
270✔
269
        switch (option) {
171✔
270
            case 'c':
27✔
271
                JP_OK_OR_RET(set_chunk_size(optarg, args));
27✔
272
                break;
273
            case 'b':
19✔
274
                JP_OK_OR_RET(set_buffer_size(optarg, args));
19✔
275
                break;
276
            case 'p':
10✔
277
                JP_OK_OR_RET(set_policy(optarg, args));
10✔
278
                break;
279
            case 'o':
9✔
280
                JP_OK_OR_RET(set_out_dir(optarg, args));
9✔
281
                break;
282
            case 'n':
71✔
283
                args->dry_run = true;
71✔
284
                break;
71✔
285
            case 'f':
32✔
286
                JP_OK_OR_RET(set_field(optarg, args));
32✔
287
                break;
288
            case 'h':
289
                break;
290
            case ':':
3✔
291
                JP_FALLTHROUGH;
3✔
292
            case '?':
293
                JP_OK_OR_RET(handle_unknown_argument(argv[optind - 1]));
3✔
294
                break;
295
            default: {
128✔
296
            }
199✔
297
        }
298
    }
299

300
    if (optind < argc) {
28✔
301
        JP_OK_OR_RET(handle_unknown_argument(argv[optind]));
1✔
302
    }
303

304
    return 0;
305
}
306

307
static jp_errno_t finalize_worker_args(worker_arg_t* args) {
27✔
308
    JP_OK_OR_RET(create_and_normalize_out_dir(args));
27✔
309
    JP_ALLOC_OR_LOG(args->queue, jp_queue_create(args->buffer_size, args->chunk_size, args->policy));
23✔
310
    return 0;
311
}
312

313
static void* producer_thread_init(void* data) {
×
314
    jp_errno_t err = 0;
×
NEW
315
    ssize_t read_len;
×
NEW
316
    jp_block_t* block;
×
NEW
317
    const worker_arg_t* args = data;
×
NEW
318
    const size_t chunk_size  = args->chunk_size;
×
NEW
319
    unsigned char* buffer    = malloc(chunk_size);
×
NEW
320
    jp_poller_t* poller      = jp_poller_create(100);
×
321
    if (buffer == NULL || poller == NULL) {
×
322
        err = JP_ENOMEMORY;
×
323
        goto clean_up;
×
324
    }
325

326
    err = jp_poller_poll(poller, STDIN_FILENO);
×
327
    if (err) {
×
328
        goto clean_up;
×
329
    }
330

331
    while (true) {
×
332
        err = jp_poller_wait(poller);
×
333
        if (err == JP_EREAD_FAILED) {
×
334
            break;
335
        }
336
        if (err == JP_ETRYAGAIN) {
×
337
            continue;
×
338
        }
339
        while (true) {
×
NEW
340
            err = jp_queue_reserve(args->queue, &block);
×
NEW
341
            if (err == JP_ESHUTTING_DOWN) {
×
NEW
342
                err = 0;
×
NEW
343
                goto clean_up;
×
344
            }
NEW
345
            read_len = err == JP_EMSG_SHOULD_DROP ? read(STDIN_FILENO, buffer, chunk_size) : read(STDIN_FILENO, block->data, chunk_size);
×
NEW
346
            if (read_len == 0) {
×
UNCOV
347
                goto clean_up;
×
348
            }
NEW
349
            if (read_len < 0) {
×
350
                if (errno == EINTR) {
×
351
                    continue;
×
352
                }
353
                if (JP_IS_EAGAIN(errno)) {
×
354
                    break;
355
                }
356
                err = JP_EREAD_FAILED;
×
357
                goto clean_up;
×
358
            }
NEW
359
            if (err == 0) {
×
NEW
360
                block->length = (size_t) read_len;
×
NEW
361
                jp_queue_commit(args->queue);
×
362
            }
363
        }
364
    }
365

366
clean_up:
×
367
    if (err) {
×
368
        jp_errno_log_err(err);
×
369
    }
370
    JP_FREE(buffer);
×
371
    jp_poller_destroy(poller);
×
372
    jp_queue_finalize(args->queue);
×
373
    pthread_exit((void*) (uintptr_t) err);  // NOLINT(performance-no-int-to-ptr)
×
374
}
375

376
static void* consumer_thread_init(void* data) {
×
377
    jp_errno_t err = 0;
×
378
    size_t max_len, read_len;
×
NEW
379
    const worker_arg_t* args = data;
×
NEW
380
    unsigned char* buffer    = malloc(args->chunk_size);
×
381
    if (buffer == NULL) {
×
382
        err = JP_ENOMEMORY;
×
383
        goto clean_up;
×
384
    }
385

386
    max_len = args->chunk_size;
387
    while (true) {
×
388
        err = jp_queue_pop(args->queue, buffer, max_len, &read_len);
×
389
        if (err) {
×
390
            if (err == JP_ESHUTTING_DOWN) {
×
391
                err = 0;
392
            }
393
            break;
394
        }
395
    }
396

397
clean_up:
398
    if (err) {
×
399
        jp_errno_log_err(err);
×
400
    }
401
    JP_FREE(buffer);
×
402
    jp_queue_finalize(args->queue);
×
403
    pthread_exit((void*) (uintptr_t) err);  // NOLINT(performance-no-int-to-ptr)
×
404
}
405

406
static void* watcher_thread_init(void* data) {
×
407
    int sig;
×
408
    sigset_t set;
×
409
    worker_arg_t* args = data;
×
410

411
    sigemptyset(&set);
×
412
    sigaddset(&set, SIGINT);
×
413
    sigaddset(&set, SIGTERM);
×
414
    if (sigwait(&set, &sig) == 0) {
×
415
        JP_DEBUG("[WATCHER]: Termination signal (%s) was received. Shutting down...", sig == SIGINT ? "SIGINT" : "SIGTERM");
×
416
        jp_queue_finalize(args->queue);
×
417
    }
418
    pthread_exit(NULL);
×
419
}
420

421
static jp_errno_t orchestrate_threads(worker_arg_t* args) {
×
422
    int err = 0, join_err = 0, t_size = 0;
×
423
    void* thread_result;
×
424
    sigset_t set;
×
425
    worker_thread_t threads[3] = {{.func = producer_thread_init, .running = false, .detached = false},
×
426
                                  {.func = consumer_thread_init, .running = false, .detached = false},
427
                                  {.func = watcher_thread_init, .running = false, .detached = true}};
428

429
    sigemptyset(&set);
×
430
    sigaddset(&set, SIGTERM);
×
431
    sigaddset(&set, SIGINT);
×
432
    pthread_sigmask(SIG_BLOCK, &set, NULL);
×
433

434
    t_size = (sizeof(threads) / sizeof(threads[0]));
×
435
    for (int i = 0; i < t_size; i++) {
×
436
        err = pthread_create(&threads[i].tid, NULL, threads[i].func, args);
×
437
        if (err) {
×
438
            goto clean_up;
×
439
        }
440
        threads[i].running = true;
×
441
        if (!threads[i].detached) {
×
442
            continue;
×
443
        }
444
        err = pthread_detach(threads[i].tid);
×
445
        if (err) {
×
446
            goto clean_up;
×
447
        }
448
    }
449

450
clean_up:
×
451
    if (err) {
×
452
        jp_errno_log_err_format(JP_ERUN_FAILED, "%s", strerror(err));
×
453
    }
454

455
    for (int i = 0; i < t_size; i++) {
×
456
        if (threads[i].running && !threads[i].detached) {
×
457
            join_err = pthread_join(threads[i].tid, &thread_result);
×
458
            if (join_err) {
×
459
                jp_errno_log_err_format(JP_ERUN_FAILED, "%s", strerror(join_err));
×
460
            }
461
            err                = (int) (uintptr_t) thread_result;
×
462
            threads[i].running = false;
×
463
        }
464
    }
465

466
    return err ? JP_ERUN_FAILED : 0;
×
467
}
468

469
jp_errno_t jp_wrk_exec(int argc, char* argv[]) {
74✔
470
    jp_errno_t err    = 0;
74✔
471
    worker_arg_t args = {0};
74✔
472

473
    if (argc == 2 && JP_CMD_EQ(argv[1], "-h", "--help")) {
74✔
474
        return display_help();
2✔
475
    }
476

477
    err = init_worker_args(argc, argv, &args);
72✔
478
    if (err) {
72✔
479
        goto clean_up;
1✔
480
    }
481

482
    err = collect_cli_args(argc, argv, &args);
71✔
483
    if (err) {
71✔
484
        goto clean_up;
44✔
485
    }
486

487
    err = finalize_worker_args(&args);
27✔
488
    if (err) {
27✔
489
        goto clean_up;
4✔
490
    }
491

492
    display_summary(&args);
23✔
493
    if (!args.dry_run) {
23✔
494
        err = orchestrate_threads(&args);
×
495
    }
496

497
clean_up:
23✔
498
    JP_FREE(args.out_dir);
72✔
499
    jp_field_set_destroy(args.fields);
72✔
500
    jp_queue_destroy(args.queue);
72✔
501
    return err;
72✔
502
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc