• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

scokmen / jpipe / 23212974946

17 Mar 2026 07:36PM UTC coverage: 80.218% (+7.0%) from 73.258%
23212974946

push

github

web-flow
feat(poller): implemented linux/epoll support (#9)

107 of 132 new or added lines in 3 files covered. (81.06%)

2 existing lines in 1 file now uncovered.

442 of 551 relevant lines covered (80.22%)

7961.2 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.41
/src/worker.c
1
#include <errno.h>
2
#include <getopt.h>
3
#include <jp_command.h>
4
#include <jp_errno.h>
5
#include <jp_field.h>
6
#include <jp_queue.h>
7
#include <jp_reader.h>
8
#include <jp_worker.h>
9
#include <signal.h>
10
#include <stdint.h>
11
#include <stdio.h>
12
#include <stdlib.h>
13
#include <string.h>
14
#include <sys/stat.h>
15
#include <unistd.h>
16

17
typedef void* (*worker_func)(void*);
18

19
typedef struct {
20
    bool running;
21
    bool detached;
22
    pthread_t tid;
23
    worker_func func;
24
} worker_thread_t;
25

26
typedef struct {
27
    int input_stream;
28
    bool dry_run;
29
    size_t chunk_size;
30
    size_t buffer_size;
31
    jp_queue_policy_t policy;
32
    jp_queue_t* queue;
33
    jp_field_set_t* fields;
34
    const char* out_dir;
35
} worker_ctx_t;
36

37
static jp_errno_t display_help(void) {
2✔
38
    JP_LOG_INFO("Usage: jpipe run [options]\n");
2✔
39
    JP_LOG_INFO("Execute the data processing engine with the following configurations:\n");
2✔
40
    JP_LOG_INFO("Options:");
2✔
41
    JP_LOG_INFO("  -c, --chunk-size  <size>     Chunk size (e.g., 16kb, 64kb). Range: 1kb-128kb  (default: 16kb).");
2✔
42
    JP_LOG_INFO("  -b, --buffer-size <count>    Max pending operations. Range: 1-1024 (default: 64).");
2✔
43
    JP_LOG_INFO("  -p, --policy      <type>     Overflow policy: 'wait' or 'drop' (default: wait).");
2✔
44
    JP_LOG_INFO("  -o, --output      <path>     Output directory (default: current dir).");
2✔
45
    JP_LOG_INFO("  -f, --field       key=value  Additional field to the JSON output. Can be used multiple times.");
2✔
46
    JP_LOG_INFO("  -n, --dry-run                Dry run.");
2✔
47
    JP_LOG_INFO("  -q, --quiet                  Display less output.");
2✔
48
    JP_LOG_INFO("  -h, --help                   Show this help message.\n");
2✔
49
    JP_LOG_INFO("Field Options:");
2✔
50
    JP_LOG_INFO("  -f, --field \"key=value\"   Add a field to the JSON output.\n");
2✔
51
    JP_LOG_INFO("  Key Rules:");
2✔
52
    JP_LOG_INFO("    - Must contain only: 'a-z', 'A-Z', '0-9', '_' and '-'.");
2✔
53
    JP_LOG_INFO("    - Maximum length: 64 characters.\n");
2✔
54
    JP_LOG_INFO("  Value Type Inference:");
2✔
55
    JP_LOG_INFO("    - key=123        -> Number  (no quotes in JSON).");
2✔
56
    JP_LOG_INFO("    - key=true|false -> Boolean (no quotes in JSON).");
2✔
57
    JP_LOG_INFO("    - key=string     -> String.");
2✔
58
    JP_LOG_INFO("    - key=\"string\"   -> String.");
2✔
59
    JP_LOG_INFO("    - key=\"123\"      -> Forced string.");
2✔
60
    JP_LOG_INFO("    - key=\"true\"     -> Forced string.");
2✔
61
    JP_LOG_INFO("    - key=\"str=ng\"   -> String.");
2✔
62
    JP_LOG_INFO("    - key=str=ng     -> Invalid field.\n");
2✔
63
    JP_LOG_INFO("  Example:");
2✔
64
    JP_LOG_INFO("    Input : jpipe -f \"id=101\" -f \"name=app\" -f \"active=true\" -f \"ver=1.2.0\"");
2✔
65
    JP_LOG_INFO("    Output: {\"id\": 101, \"name\": \"app\", \"active\": true, \"ver\": \"1.2.0\"}");
2✔
66
    return 0;
2✔
67
}
68

69
static void display_summary(worker_ctx_t* ctx) {
25✔
70
    double estimated_mem_usage = (double) ctx->chunk_size * (double) ctx->buffer_size / (BYTES_IN_KB * BYTES_IN_KB);
25✔
71

72
    JP_LOG_MSG("Application is starting...\n");
25✔
73
    JP_LOG_MSG("[Runtime Parameters]");
25✔
74
    JP_LOG_MSG("• Chunk Size   (-c) : %zu KB", (ctx->chunk_size / BYTES_IN_KB));
25✔
75
    JP_LOG_MSG("• Buffer Size  (-b) : %zu", ctx->buffer_size);
25✔
76
    JP_LOG_MSG("• Output Dir   (-o) : %s", ctx->out_dir);
25✔
77
    JP_LOG_MSG("• Policy       (-p) : %s", ctx->policy == JP_QUEUE_POLICY_WAIT ? "WAIT" : "DROP");
27✔
78
    if (ctx->fields->len > 0) {
25✔
79
        JP_LOG_MSG("• Fields       (-f) :");
1✔
80
        for (size_t i = 0; i < ctx->fields->len; i++) {
2✔
81
            JP_LOG_MSG("     %zu. %-32s= %-32s", i + 1, ctx->fields->fields[i]->key, ctx->fields->fields[i]->val);
1✔
82
        }
83
    }
84
    JP_LOG_MSG("\n[Resource Utilization]");
25✔
85
    JP_LOG_MSG("• Memory Usage      :  ~%.2f MB", estimated_mem_usage);
25✔
86
    JP_LOG_MSG("\n* These values are based on user-provided parameters.");
25✔
87
    JP_LOG_MSG(
25✔
88
        "* Memory usage is an approximation;"
89
        "operating system overhead and thread stack allocations are not included.\n");
90
}
25✔
91

92
static jp_errno_t set_out_dir(const char* arg, worker_ctx_t* ctx) {
9✔
93
    size_t len = 0;
9✔
94
    JP_FREE(ctx->out_dir);
9✔
95
    if (arg == NULL) {
9✔
96
        return jp_errno_log_err_format(JP_EMISSING_CMD, "Path is empty.");
×
97
    }
98
    len = strlen(arg);
9✔
99
    if (len == 0) {
9✔
100
        return jp_errno_log_err_format(JP_EMISSING_CMD, "Path is empty.");
2✔
101
    }
102
    if (len > JP_PATH_MAX) {
7✔
103
        return jp_errno_log_err_format(JP_EMISSING_CMD, "Path is too long. Maximum allowed length: %d.", JP_PATH_MAX);
×
104
    }
105
    JP_ALLOC_ERRNO(ctx->out_dir, strdup(arg));
7✔
106
    return 0;
107
}
108

109
static jp_errno_t set_field(const char* arg, worker_ctx_t* ctx) {
32✔
110
    jp_errno_t err;
32✔
111
    if (arg == NULL) {
32✔
112
        return jp_errno_log_err_format(JP_EINV_FIELD_KEY, "Field is invalid.");
×
113
    }
114

115
    JP_ATTR_ASSUME(ctx->fields != NULL);
32✔
116
    err = jp_field_set_add(ctx->fields, arg);
32✔
117
    if (err) {
32✔
118
        return jp_errno_log_err_format(err, "Field is invalid: '%s'", arg);
×
119
    }
120
    return 0;
121
}
122

123
static jp_errno_t set_chunk_size(const char* arg, worker_ctx_t* ctx) {
27✔
124
    char* end_ptr;
27✔
125
    size_t chunk_size        = 0;
27✔
126
    unsigned long long param = 0;
27✔
127

128
    errno = 0;
27✔
129
    param = strtoull(arg, &end_ptr, 10);
27✔
130

131
    if (errno == ERANGE || errno == EINVAL) {
27✔
132
        return jp_errno_log_err_format(JP_ECHUNK_SIZE, "Size is invalid: '%.32s'.", arg);
2✔
133
    }
134

135
    if (end_ptr == arg) {
25✔
136
        return jp_errno_log_err_format(JP_ECHUNK_SIZE, "Size is invalid: '%.32s'.", arg);
4✔
137
    }
138

139
    if (*end_ptr != '\0' && !strcmp(end_ptr, "kb") && param <= JP_WRK_CHUNK_SIZE_MAX / BYTES_IN_KB) {
21✔
140
        chunk_size += param * BYTES_IN_KB;
9✔
141
    } else {
142
        return jp_errno_log_err_format(JP_ECHUNK_SIZE, "Size is invalid: '%.32s'.", arg);
12✔
143
    }
144

145
    if (chunk_size < JP_WRK_CHUNK_SIZE_MIN || chunk_size > JP_WRK_CHUNK_SIZE_MAX) {
9✔
146
        return jp_errno_log_err_format(JP_ECHUNK_SIZE, "Size is invalid: '%.32s'.", arg);
2✔
147
    }
148
    ctx->chunk_size = chunk_size;
7✔
149
    return 0;
7✔
150
}
151

152
static jp_errno_t set_buffer_size(const char* arg, worker_ctx_t* ctx) {
19✔
153
    char* end_ptr;
19✔
154
    unsigned long long param = 0;
19✔
155

156
    errno = 0;
19✔
157
    param = strtoull(arg, &end_ptr, 10);
19✔
158

159
    if (errno == ERANGE || errno == EINVAL || end_ptr == arg || *end_ptr != '\0') {
19✔
160
        return jp_errno_log_err_format(JP_EBUFFER_SIZE, "Size is invalid: '%.32s'.", arg);
6✔
161
    }
162

163
    if (param < JP_WRK_BUFFER_SIZE_MIN || param > JP_WRK_BUFFER_SIZE_MAX) {
13✔
164
        return jp_errno_log_err_format(JP_EBUFFER_SIZE, "Size is invalid: '%.32s'.", arg);
6✔
165
    }
166

167
    ctx->buffer_size = (size_t) param;
7✔
168
    return 0;
7✔
169
}
170

171
static jp_errno_t set_policy(const char* arg, worker_ctx_t* ctx) {
10✔
172
    if (!strcmp(arg, "wait")) {
10✔
173
        ctx->policy = JP_QUEUE_POLICY_WAIT;
2✔
174
        return 0;
2✔
175
    }
176

177
    if (!strcmp(arg, "drop")) {
8✔
178
        ctx->policy = JP_QUEUE_POLICY_DROP;
2✔
179
        return 0;
2✔
180
    }
181

182
    return jp_errno_log_err_format(JP_EOVERFLOW_POLICY, "Policy is invalid: '%.32s'.", arg);
6✔
183
}
184

185
static jp_errno_t handle_unknown_argument(const char* cmd) {
4✔
186
    return jp_errno_log_err_format(JP_EUNKNOWN_RUN_CMD, "Invalid or incomplete command: '%.32s'.", cmd);
4✔
187
}
188

189
static jp_errno_t create_and_normalize_out_dir(worker_ctx_t* ctx) {
29✔
190
    char tmp[JP_PATH_MAX]           = {0};
29✔
191
    char absolute_path[JP_PATH_MAX] = {0};
29✔
192
    char* p                         = NULL;
29✔
193
    struct stat st;
29✔
194

195
    if (ctx->out_dir == NULL) {
29✔
196
        JP_ALLOC_ERRNO(ctx->out_dir, strdup(JP_WRK_OUTDIR_DEF));
22✔
197
    }
198
    size_t path_len = strlen(ctx->out_dir);
29✔
199
    strncpy(tmp, ctx->out_dir, sizeof(tmp));
29✔
200

201
    while (path_len > 1 && tmp[path_len - 1] == '/') {
29✔
202
        tmp[path_len - 1] = '\0';
×
203
        path_len--;
×
204
    }
205

206
    for (p = tmp + 1; *p; p++) {
207✔
207
        if (*p == '/') {
178✔
208
            *p = '\0';
13✔
209
            if (mkdir(tmp, 0755) != 0 && errno != EEXIST) {
13✔
210
                return jp_errno_log_err_format(JP_EOUT_DIR, "Could not create the output directory: '%s'", tmp);
×
211
            }
212
            *p = '/';
13✔
213
        }
214
    }
215

216
    if (mkdir(tmp, 0755) != 0 && errno != EEXIST) {
29✔
217
        return jp_errno_log_err_format(JP_EOUT_DIR, "Could not create the directory: '%s'.", tmp);
3✔
218
    }
219

220
    if (stat(tmp, &st) != 0 || !S_ISDIR(st.st_mode)) {
26✔
221
        return jp_errno_log_err_format(JP_EOUT_DIR, "The target is not a directory: '%s'.", tmp);
1✔
222
    }
223

224
    if (access(tmp, W_OK) != 0) {
25✔
225
        return jp_errno_log_err_format(JP_EOUT_DIR, "The target is inaccessible: '%s'.", tmp);
×
226
    }
227

228
    if (realpath(tmp, absolute_path) == NULL) {
25✔
229
        return jp_errno_log_err_format(JP_EOUT_DIR, "Could not resolve absolute path: '%s'.", tmp);
×
230
    }
231

232
    JP_FREE(ctx->out_dir);
25✔
233
    JP_ALLOC_ERRNO(ctx->out_dir, strdup(absolute_path));
25✔
234
    return 0;
235
}
236

237
static int get_field_args_count(int argc, char* argv[]) {
74✔
238
    int c = 0;
74✔
239
    for (int i = 0; i < argc; i++) {
487✔
240
        if (JP_CMD_EQ(argv[i], "-f", "--field")) {
413✔
241
            c++;
65✔
242
        }
243
    }
244
    return c;
74✔
245
}
246

247
static jp_errno_t init_worker_args(int argc, char* argv[], worker_ctx_t* ctx) {
74✔
248
    int fields = get_field_args_count(argc, argv);
74✔
249
    if (fields > JP_WRK_FIELDS_MAX) {
74✔
250
        return jp_errno_log_err_format(JP_ETOO_MANY_FIELD, "Too many fields specified: '%d'", fields);
1✔
251
    }
252
    JP_ALLOC_ERRNO(ctx->fields, jp_field_set_create((size_t) fields));
73✔
253
    return 0;
254
}
255

256
static jp_errno_t collect_cli_args(int argc, char* argv[], worker_ctx_t* ctx) {
73✔
257
    int option;
73✔
258
    opterr = 0;
73✔
259
    optind = 1;
73✔
260

261
    static struct option long_options[] = {{"chunk-size", required_argument, 0, 'c'},
73✔
262
                                           {"buffer-size", required_argument, 0, 'b'},
263
                                           {"policy", required_argument, 0, 'p'},
264
                                           {"field", required_argument, 0, 'f'},
265
                                           {"output", required_argument, 0, 'o'},
266
                                           {"dry-run", no_argument, 0, 'n'},
267
                                           {"quiet", no_argument, 0, 'q'},
268
                                           {"help", no_argument, 0, 'h'},
269
                                           {0, 0, 0, 0}};
270

271
    while ((option = getopt_long(argc, argv, ":c:b:p:o:f:hnq", long_options, NULL)) != -1) {
278✔
272
        switch (option) {
175✔
273
            case 'c':
27✔
274
                JP_VERIFY(set_chunk_size(optarg, ctx));
27✔
275
                break;
276
            case 'b':
19✔
277
                JP_VERIFY(set_buffer_size(optarg, ctx));
19✔
278
                break;
279
            case 'p':
10✔
280
                JP_VERIFY(set_policy(optarg, ctx));
10✔
281
                break;
282
            case 'o':
9✔
283
                JP_VERIFY(set_out_dir(optarg, ctx));
9✔
284
                break;
285
            case 'n':
73✔
286
                ctx->dry_run = true;
73✔
287
                break;
73✔
288
            case 'q':
2✔
289
                JP_CONF_SILENT_SET(true);
2✔
290
                break;
2✔
291
            case 'f':
32✔
292
                JP_VERIFY(set_field(optarg, ctx));
32✔
293
                break;
294
            case 'h':
295
                break;
296
            case ':':
3✔
297
                JP_FALLTHROUGH;
3✔
298
            case '?':
299
                JP_VERIFY(handle_unknown_argument((optind < argc) ? argv[optind] : argv[argc - 1]));
3✔
300
                break;
301
            default: {
132✔
302
            }
205✔
303
        }
304
    }
305

306
    if (optind < argc) {
30✔
307
        JP_VERIFY(handle_unknown_argument(argv[optind]));
1✔
308
    }
309

310
    return 0;
311
}
312

313
static jp_errno_t finalize_worker_args(worker_ctx_t* ctx) {
29✔
314
    JP_VERIFY(create_and_normalize_out_dir(ctx));
29✔
315
    JP_ALLOC_ERRNO(ctx->queue, jp_queue_create(ctx->buffer_size, ctx->chunk_size, ctx->policy));
25✔
316
    return 0;
317
}
318

319
static void* producer_thread_init(void* data) {
×
NEW
320
    const worker_ctx_t* args   = data;
×
NEW
321
    jp_reader_ctx_t reader_ctx = {
×
NEW
322
        .chunk_size   = args->chunk_size,
×
NEW
323
        .queue        = args->queue,
×
NEW
324
        .input_stream = args->input_stream,
×
325
    };
NEW
326
    jp_errno_t err = jp_reader_consume(reader_ctx);
×
UNCOV
327
    pthread_exit((void*) (uintptr_t) err);  // NOLINT(performance-no-int-to-ptr)
×
328
}
329

330
static void* consumer_thread_init(void* data) {
×
331
    jp_errno_t err = 0;
×
332
    jp_block_t* block;
×
NEW
333
    const worker_ctx_t* args = data;
×
334
    unsigned char* buffer    = malloc(args->chunk_size);
×
335
    if (buffer == NULL) {
×
336
        err = JP_ENOMEMORY;
×
337
        goto clean_up;
×
338
    }
339

340
    while (true) {
×
341
        err = jp_queue_pop_uncommitted(args->queue, &block);
×
342
        if (err) {
×
343
            if (err == JP_ESHUTTING_DOWN) {
×
344
                err = 0;
345
            }
346
            break;
347
        }
348
        jp_queue_pop_commit(args->queue);
×
349
    }
350

351
clean_up:
352
    if (err) {
×
353
        jp_errno_log_err(err);
×
354
    }
355
    JP_FREE(buffer);
×
356
    jp_queue_finalize(args->queue);
×
357
    pthread_exit((void*) (uintptr_t) err);  // NOLINT(performance-no-int-to-ptr)
×
358
}
359

360
static void* watcher_thread_init(void* data) {
×
361
    int sig;
×
362
    sigset_t set;
×
NEW
363
    worker_ctx_t* args = data;
×
364

365
    sigemptyset(&set);
×
366
    sigaddset(&set, SIGINT);
×
367
    sigaddset(&set, SIGTERM);
×
368
    if (sigwait(&set, &sig) == 0) {
×
369
        JP_LOG_DEBUG("[WATCHER]: Termination signal (%s) was received. Shutting down...",
×
370
                     sig == SIGINT ? "SIGINT" : "SIGTERM");
371
        jp_queue_finalize(args->queue);
×
372
    }
373
    pthread_exit(NULL);
×
374
}
375

NEW
376
static jp_errno_t orchestrate_threads(worker_ctx_t* ctx) {
×
377
    int err = 0, join_err = 0, t_size = 0;
×
378
    void* thread_result;
×
379
    sigset_t set;
×
380
    worker_thread_t threads[3] = {{.func = producer_thread_init, .running = false, .detached = false},
×
381
                                  {.func = consumer_thread_init, .running = false, .detached = false},
382
                                  {.func = watcher_thread_init, .running = false, .detached = true}};
383

384
    sigemptyset(&set);
×
385
    sigaddset(&set, SIGTERM);
×
386
    sigaddset(&set, SIGINT);
×
387
    pthread_sigmask(SIG_BLOCK, &set, NULL);
×
388

NEW
389
    t_size = sizeof(threads) / sizeof(threads[0]);
×
390
    for (int i = 0; i < t_size; i++) {
×
NEW
391
        err = pthread_create(&threads[i].tid, NULL, threads[i].func, ctx);
×
392
        if (err) {
×
393
            goto clean_up;
×
394
        }
395
        threads[i].running = true;
×
396
        if (!threads[i].detached) {
×
397
            continue;
×
398
        }
399
        err = pthread_detach(threads[i].tid);
×
400
        if (err) {
×
401
            goto clean_up;
×
402
        }
403
    }
404

405
clean_up:
×
406
    if (err) {
×
407
        jp_errno_log_err_format(JP_ERUN_FAILED, "%s", strerror(err));
×
408
    }
409

410
    for (int i = 0; i < t_size; i++) {
×
411
        if (threads[i].running && !threads[i].detached) {
×
412
            join_err = pthread_join(threads[i].tid, &thread_result);
×
413
            if (join_err) {
×
414
                jp_errno_log_err_format(JP_ERUN_FAILED, "%s", strerror(join_err));
×
415
            }
416
            err                = (int) (uintptr_t) thread_result;
×
417
            threads[i].running = false;
×
418
        }
419
    }
420

NEW
421
    jp_queue_finalize(ctx->queue);
×
UNCOV
422
    return err ? JP_ERUN_FAILED : 0;
×
423
}
424

425
jp_errno_t jp_wrk_exec(int argc, char* argv[]) {
76✔
426
    jp_errno_t err   = 0;
76✔
427
    worker_ctx_t ctx = {
76✔
428
        .input_stream = STDIN_FILENO,
429
        .buffer_size  = JP_WRK_BUFFER_SIZE_DEF,
430
        .chunk_size   = JP_WRK_CHUNK_SIZE_DEF,
431
        .out_dir      = NULL,
432
    };
433

434
    if (argc == 2 && JP_CMD_EQ(argv[1], "-h", "--help")) {
76✔
435
        return display_help();
2✔
436
    }
437

438
    err = init_worker_args(argc, argv, &ctx);
74✔
439
    if (err) {
74✔
440
        goto clean_up;
1✔
441
    }
442

443
    err = collect_cli_args(argc, argv, &ctx);
73✔
444
    if (err) {
73✔
445
        goto clean_up;
44✔
446
    }
447

448
    err = finalize_worker_args(&ctx);
29✔
449
    if (err) {
29✔
450
        goto clean_up;
4✔
451
    }
452

453
    display_summary(&ctx);
25✔
454
    if (!ctx.dry_run) {
25✔
NEW
455
        err = orchestrate_threads(&ctx);
×
456
    }
457

458
clean_up:
25✔
459
    JP_FREE(ctx.out_dir);
74✔
460
    jp_field_set_destroy(ctx.fields);
74✔
461
    jp_queue_destroy(ctx.queue);
74✔
462
    return err;
74✔
463
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc