• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

saitoha / libsixel / 20102559991

10 Dec 2025 02:45PM UTC coverage: 41.331% (+0.1%) from 41.187%
20102559991

push

github

saitoha
Merge branch 'refactor/lookup' into develop

10835 of 40936 branches covered (26.47%)

22 of 49 new or added lines in 4 files covered. (44.9%)

2 existing lines in 2 files now uncovered.

14876 of 35992 relevant lines covered (41.33%)

2727539.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/threadpool.c
1
/*
2
 * SPDX-License-Identifier: MIT
3
 *
4
 * Copyright (c) 2025 libsixel developers. See `AUTHORS`.
5
 *
6
 * Permission is hereby granted, free of charge, to any person obtaining a copy of
7
 * this software and associated documentation files (the "Software"), to deal in
8
 * the Software without restriction, including without limitation the rights to
9
 * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
10
 * of the Software, and to permit persons to whom the Software is furnished to do
11
 * so, subject to the following conditions:
12
 *
13
 * The above copyright notice and this permission notice shall be included in all
14
 * copies or substantial portions of the Software.
15
 *
16
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22
 * SOFTWARE.
23
 */
24

25
#include "config.h"
26

27
#include <errno.h>
28
#include <stdlib.h>
29
#include <string.h>
30

31
#include "threadpool.h"
32
#include "threading.h"
33

34
typedef struct threadpool_worker threadpool_worker_t;
35

36
struct threadpool_worker {
37
    threadpool_t *pool;
38
    sixel_thread_t thread;
39
    void *workspace;
40
    int started;
41
    int index;
42
    int pinned;
43
};
44

45
struct threadpool {
46
    int nthreads;
47
    int qsize;
48
    size_t workspace_size;
49
    tp_workspace_cleanup_fn workspace_cleanup;
50
    tp_worker_fn worker;
51
    void *userdata;
52
    tp_job_t *jobs;
53
    int head;
54
    int tail;
55
    int count;
56
    int running;
57
    int shutting_down;
58
    int joined;
59
    int error;
60
    int threads_started;
61
    int worker_capacity;
62
    int pin_threads;
63
    int hw_threads;
64
    sixel_mutex_t mutex;
65
    sixel_cond_t cond_not_empty;
66
    sixel_cond_t cond_not_full;
67
    sixel_cond_t cond_drained;
68
    int mutex_ready;
69
    int cond_not_empty_ready;
70
    int cond_not_full_ready;
71
    int cond_drained_ready;
72
    threadpool_worker_t **workers; /* owned worker slots (stable addresses) */
73
};
74

75
static void threadpool_free(threadpool_t *pool);
76
static int threadpool_worker_main(void *arg);
77
static int threadpool_spawn_worker(threadpool_t *pool,
78
                                   threadpool_worker_t *worker);
79

80
/*
81
 * Release every dynamically allocated component of the pool. Callers must
82
 * ensure that worker threads have already terminated before invoking this
83
 * helper; otherwise joining would operate on freed memory.
84
 */
85
static void
86
threadpool_free(threadpool_t *pool)
×
87
{
88
    int i;
×
89

90
    if (pool == NULL) {
×
91
        return;
92
    }
93
    if (pool->workers != NULL) {
×
94
        for (i = 0; i < pool->worker_capacity; ++i) {
×
95
            if (pool->workers[i] == NULL) {
×
96
                continue;
×
97
            }
98
            if (pool->workers[i]->workspace != NULL) {
×
NEW
99
                if (pool->workspace_cleanup != NULL) {
×
NEW
100
                    pool->workspace_cleanup(pool->workers[i]->workspace);
×
101
                }
UNCOV
102
                free(pool->workers[i]->workspace);
×
103
            }
104
            free(pool->workers[i]);
×
105
        }
106
        free(pool->workers);
×
107
    }
108
    if (pool->jobs != NULL) {
×
109
        free(pool->jobs);
×
110
    }
111
    if (pool->cond_drained_ready) {
×
112
        sixel_cond_destroy(&pool->cond_drained);
×
113
    }
114
    if (pool->cond_not_full_ready) {
×
115
        sixel_cond_destroy(&pool->cond_not_full);
×
116
    }
117
    if (pool->cond_not_empty_ready) {
×
118
        sixel_cond_destroy(&pool->cond_not_empty);
×
119
    }
120
    if (pool->mutex_ready) {
×
121
        sixel_mutex_destroy(&pool->mutex);
×
122
    }
123
    free(pool);
×
124
}
×
125

126
/*
127
 * Worker threads pull jobs from the ring buffer, execute the supplied callback
128
 * outside the critical section, and record the first failure code. All
129
 * synchronization is delegated to the mutex/condition helpers provided by the
130
 * threading abstraction.
131
 */
132
static int
133
threadpool_worker_main(void *arg)
×
134
{
135
    threadpool_worker_t *worker;
×
136
    threadpool_t *pool;
×
137
    tp_job_t job;
×
138
    int rc;
×
139

140
    worker = (threadpool_worker_t *)arg;
×
141
    pool = worker->pool;
×
142
    for (;;) {
×
143
        sixel_mutex_lock(&pool->mutex);
×
144
        while (pool->count == 0 && !pool->shutting_down) {
×
145
            sixel_cond_wait(&pool->cond_not_empty, &pool->mutex);
×
146
        }
147
        if (pool->count == 0 && pool->shutting_down) {
×
148
            sixel_mutex_unlock(&pool->mutex);
×
149
            break;
×
150
        }
151
        job = pool->jobs[pool->head];
×
152
        pool->head = (pool->head + 1) % pool->qsize;
×
153
        pool->count -= 1;
×
154
        pool->running += 1;
×
155
        sixel_cond_signal(&pool->cond_not_full);
×
156
        sixel_mutex_unlock(&pool->mutex);
×
157

158
        if (pool->pin_threads && !worker->pinned && pool->hw_threads > 0) {
×
159
            int cpu_index;
×
160

161
            cpu_index = worker->index % pool->hw_threads;
×
162
            (void)sixel_thread_pin_self(cpu_index);
×
163
            worker->pinned = 1;
×
164
        }
165

166
        rc = pool->worker(job, pool->userdata, worker->workspace);
×
167

168
        sixel_mutex_lock(&pool->mutex);
×
169
        pool->running -= 1;
×
170
        if (rc != SIXEL_OK && pool->error == SIXEL_OK) {
×
171
            pool->error = rc;
×
172
        }
173
        if (pool->count == 0 && pool->running == 0) {
×
174
            sixel_cond_broadcast(&pool->cond_drained);
×
175
        }
176
        sixel_mutex_unlock(&pool->mutex);
×
177
    }
178
    return SIXEL_OK;
×
179
}
180

181
SIXELAPI threadpool_t *
182
threadpool_create(int nthreads,
×
183
                  int qsize,
184
                  size_t workspace_size,
185
                  tp_worker_fn worker,
186
                  void *userdata,
187
                  tp_workspace_cleanup_fn workspace_cleanup)
188
{
189
    threadpool_t *pool;
×
190
    int i;
×
191
    int rc;
×
192

193
    if (nthreads <= 0 || qsize <= 0 || worker == NULL) {
×
194
        return NULL;
195
    }
196
    pool = (threadpool_t *)calloc(1, sizeof(threadpool_t));
×
197
    if (pool == NULL) {
×
198
        return NULL;
199
    }
200
    pool->nthreads = nthreads;
×
201
    pool->qsize = qsize;
×
202
    pool->workspace_size = workspace_size;
×
203
    pool->worker = worker;
×
204
    pool->userdata = userdata;
×
205
    pool->jobs = NULL;
×
206
    pool->head = 0;
×
207
    pool->tail = 0;
×
208
    pool->count = 0;
×
209
    pool->running = 0;
×
210
    pool->shutting_down = 0;
×
211
    pool->joined = 0;
×
212
    pool->error = SIXEL_OK;
×
213
    pool->threads_started = 0;
×
214
    pool->mutex_ready = 0;
×
215
    pool->cond_not_empty_ready = 0;
×
216
    pool->cond_not_full_ready = 0;
×
217
    pool->cond_drained_ready = 0;
×
218
    pool->pin_threads = 0;
×
219
    pool->hw_threads = 0;
×
220
    pool->workers = NULL;
×
NEW
221
    pool->workspace_cleanup = workspace_cleanup;
×
222

223
    rc = sixel_mutex_init(&pool->mutex);
×
224
    if (rc != SIXEL_OK) {
×
225
        errno = EINVAL;
×
226
        threadpool_free(pool);
×
227
        return NULL;
×
228
    }
229
    pool->mutex_ready = 1;
×
230

231
    rc = sixel_cond_init(&pool->cond_not_empty);
×
232
    if (rc != SIXEL_OK) {
×
233
        errno = EINVAL;
×
234
        threadpool_free(pool);
×
235
        return NULL;
×
236
    }
237
    pool->cond_not_empty_ready = 1;
×
238

239
    rc = sixel_cond_init(&pool->cond_not_full);
×
240
    if (rc != SIXEL_OK) {
×
241
        errno = EINVAL;
×
242
        threadpool_free(pool);
×
243
        return NULL;
×
244
    }
245
    pool->cond_not_full_ready = 1;
×
246

247
    rc = sixel_cond_init(&pool->cond_drained);
×
248
    if (rc != SIXEL_OK) {
×
249
        errno = EINVAL;
×
250
        threadpool_free(pool);
×
251
        return NULL;
×
252
    }
253
    pool->cond_drained_ready = 1;
×
254

255
    pool->jobs = (tp_job_t *)malloc(sizeof(tp_job_t) * (size_t)qsize);
×
256
    if (pool->jobs == NULL) {
×
257
        threadpool_free(pool);
×
258
        return NULL;
×
259
    }
260

261
    pool->worker_capacity = nthreads;
×
262
    pool->workers = (threadpool_worker_t **)calloc((size_t)nthreads,
×
263
            sizeof(threadpool_worker_t *));
264
    if (pool->workers == NULL) {
×
265
        threadpool_free(pool);
×
266
        return NULL;
×
267
    }
268

269
    for (i = 0; i < nthreads; ++i) {
×
270
        pool->workers[i] = (threadpool_worker_t *)
×
271
            calloc(1, sizeof(threadpool_worker_t));
×
272
        if (pool->workers[i] == NULL) {
×
273
            pool->shutting_down = 1;
×
274
            sixel_cond_broadcast(&pool->cond_not_empty);
×
275
            break;
×
276
        }
277
        pool->workers[i]->pool = pool;
×
278
        pool->workers[i]->workspace = NULL;
×
279
        pool->workers[i]->started = 0;
×
280
        pool->workers[i]->index = i;
×
281
        pool->workers[i]->pinned = 0;
×
282
        if (workspace_size > 0) {
×
283
            /*
284
             * Zero-initialize the per-thread workspace so that structures like
285
             * `sixel_parallel_worker_state_t` start with predictable values.
286
             * The worker initialization logic assumes fields such as
287
             * `initialized` are cleared before the first job.
288
             */
289
            pool->workers[i]->workspace = calloc(1, workspace_size);
×
290
            if (pool->workers[i]->workspace == NULL) {
×
291
                pool->shutting_down = 1;
×
292
                sixel_cond_broadcast(&pool->cond_not_empty);
×
293
                break;
×
294
            }
295
        }
296
        rc = threadpool_spawn_worker(pool, pool->workers[i]);
×
297
        if (rc != SIXEL_OK) {
×
298
            break;
299
        }
300
    }
301

302
    if (pool->threads_started != nthreads) {
×
303
        int started;
304

305
        started = pool->threads_started;
306
        for (i = 0; i < started; ++i) {
×
307
            sixel_cond_broadcast(&pool->cond_not_empty);
×
308
            sixel_thread_join(&pool->workers[i]->thread);
×
309
        }
310
        threadpool_free(pool);
×
311
        return NULL;
×
312
    }
313

314
    return pool;
315
}
316

317
SIXELAPI void
318
threadpool_set_affinity(threadpool_t *pool, int pin_threads)
×
319
{
320
    if (pool == NULL) {
×
321
        return;
322
    }
323

324
    sixel_mutex_lock(&pool->mutex);
×
325
    pool->pin_threads = (pin_threads != 0) ? 1 : 0;
×
326
    if (pool->pin_threads != 0) {
×
327
        pool->hw_threads = sixel_get_hw_threads();
×
328
        if (pool->hw_threads < 1) {
×
329
            pool->pin_threads = 0;
×
330
        }
331
    } else {
332
        pool->hw_threads = 0;
×
333
    }
334
    sixel_mutex_unlock(&pool->mutex);
×
335
}
336

337
static int
338
threadpool_spawn_worker(threadpool_t *pool, threadpool_worker_t *worker)
×
339
{
340
    int rc;
×
341

342
    if (pool == NULL || worker == NULL) {
×
343
        return SIXEL_BAD_ARGUMENT;
344
    }
345
    rc = sixel_thread_create(&worker->thread,
×
346
                             threadpool_worker_main,
347
                             worker);
348
    if (rc != SIXEL_OK) {
×
349
        sixel_mutex_lock(&pool->mutex);
×
350
        pool->shutting_down = 1;
×
351
        sixel_cond_broadcast(&pool->cond_not_empty);
×
352
        sixel_mutex_unlock(&pool->mutex);
×
353
        return rc;
×
354
    }
355
    worker->started = 1;
×
356
    pool->threads_started += 1;
×
357
    return SIXEL_OK;
×
358
}
359

360
SIXELAPI void
361
threadpool_destroy(threadpool_t *pool)
×
362
{
363
    if (pool == NULL) {
×
364
        return;
365
    }
366
    threadpool_finish(pool);
×
367
    threadpool_free(pool);
×
368
}
369

370
SIXELAPI void
371
threadpool_push(threadpool_t *pool, tp_job_t job)
×
372
{
373
    if (pool == NULL) {
×
374
        return;
375
    }
376
    sixel_mutex_lock(&pool->mutex);
×
377
    if (pool->shutting_down) {
×
378
        sixel_mutex_unlock(&pool->mutex);
×
379
        return;
×
380
    }
381
    while (pool->count == pool->qsize && !pool->shutting_down) {
×
382
        sixel_cond_wait(&pool->cond_not_full, &pool->mutex);
×
383
    }
384
    if (pool->shutting_down) {
×
385
        sixel_mutex_unlock(&pool->mutex);
×
386
        return;
×
387
    }
388
    pool->jobs[pool->tail] = job;
×
389
    pool->tail = (pool->tail + 1) % pool->qsize;
×
390
    pool->count += 1;
×
391
    sixel_cond_signal(&pool->cond_not_empty);
×
392
    sixel_mutex_unlock(&pool->mutex);
×
393
}
394

395
SIXELAPI void
396
threadpool_finish(threadpool_t *pool)
×
397
{
398
    int i;
×
399

400
    if (pool == NULL) {
×
401
        return;
402
    }
403
    sixel_mutex_lock(&pool->mutex);
×
404
    if (pool->joined) {
×
405
        sixel_mutex_unlock(&pool->mutex);
×
406
        return;
×
407
    }
408
    pool->shutting_down = 1;
×
409
    sixel_cond_broadcast(&pool->cond_not_empty);
×
410
    sixel_cond_broadcast(&pool->cond_not_full);
×
411
    while (pool->count > 0 || pool->running > 0) {
×
412
        sixel_cond_wait(&pool->cond_drained, &pool->mutex);
×
413
    }
414
    sixel_mutex_unlock(&pool->mutex);
×
415

416
    for (i = 0; i < pool->threads_started; ++i) {
×
417
        if (pool->workers[i] != NULL && pool->workers[i]->started) {
×
418
            sixel_thread_join(&pool->workers[i]->thread);
×
419
            pool->workers[i]->started = 0;
×
420
        }
421
    }
422

423
    sixel_mutex_lock(&pool->mutex);
×
424
    pool->joined = 1;
×
425
    sixel_mutex_unlock(&pool->mutex);
×
426
}
×
427

428
SIXELAPI int
429
threadpool_grow(threadpool_t *pool, int additional_threads)
×
430
{
431
    threadpool_worker_t **expanded;
×
432
    int new_target;
×
433
    int started_new;
×
434
    int i;
×
435
    int rc;
×
436

437
    if (pool == NULL || additional_threads <= 0) {
×
438
        return SIXEL_OK;
439
    }
440

441
    sixel_mutex_lock(&pool->mutex);
×
442
    if (pool->shutting_down) {
×
443
        sixel_mutex_unlock(&pool->mutex);
×
444
        return SIXEL_RUNTIME_ERROR;
×
445
    }
446
    new_target = pool->nthreads + additional_threads;
×
447
    /*
448
     * Worker structs stay heap-allocated per slot so pointer-table growth
449
     * never invalidates addresses already held by running threads.
450
     */
451
    if (new_target > pool->worker_capacity) {
×
452
        expanded = (threadpool_worker_t **)realloc(
×
453
            pool->workers,
×
454
            (size_t)new_target * sizeof(threadpool_worker_t *));
×
455
        if (expanded == NULL) {
×
456
            sixel_mutex_unlock(&pool->mutex);
×
457
            return SIXEL_BAD_ALLOCATION;
×
458
        }
459
        memset(expanded + pool->worker_capacity,
×
460
               0,
461
               (size_t)(new_target - pool->worker_capacity)
×
462
                   * sizeof(threadpool_worker_t *));
463
        pool->workers = expanded;
×
464
        pool->worker_capacity = new_target;
×
465
    }
466
    sixel_mutex_unlock(&pool->mutex);
×
467

468
    started_new = 0;
×
469
    rc = SIXEL_OK;
×
470
    for (i = pool->nthreads; i < new_target; ++i) {
×
471
        pool->workers[i] = (threadpool_worker_t *)
×
472
            calloc(1, sizeof(threadpool_worker_t));
×
473
        if (pool->workers[i] == NULL) {
×
474
            rc = SIXEL_BAD_ALLOCATION;
475
            break;
476
        }
477
        pool->workers[i]->pool = pool;
×
478
        pool->workers[i]->workspace = NULL;
×
479
        pool->workers[i]->started = 0;
×
480
        pool->workers[i]->index = i;
×
481
        pool->workers[i]->pinned = 0;
×
482
        if (pool->workspace_size > 0) {
×
483
            pool->workers[i]->workspace =
×
484
                calloc(1, pool->workspace_size);
×
485
            if (pool->workers[i]->workspace == NULL) {
×
486
                rc = SIXEL_BAD_ALLOCATION;
487
                break;
488
            }
489
        }
490

491
        rc = threadpool_spawn_worker(pool, pool->workers[i]);
×
492
        if (rc != SIXEL_OK) {
×
493
            break;
494
        }
495
        started_new += 1;
×
496
    }
497

498
    if (rc != SIXEL_OK) {
×
499
        int j;
500

501
        for (j = i; j < new_target; ++j) {
×
502
            if (pool->workers[j] != NULL) {
×
503
                if (pool->workers[j]->workspace != NULL) {
×
504
                    free(pool->workers[j]->workspace);
×
505
                }
506
                free(pool->workers[j]);
×
507
                pool->workers[j] = NULL;
×
508
            }
509
        }
510
    }
511

512
    sixel_mutex_lock(&pool->mutex);
×
513
    pool->nthreads = pool->nthreads + started_new;
×
514
    sixel_mutex_unlock(&pool->mutex);
×
515

516
    return rc;
×
517
}
518

519
SIXELAPI int
520
threadpool_get_error(threadpool_t *pool)
×
521
{
522
    int error;
×
523

524
    if (pool == NULL) {
×
525
        return SIXEL_BAD_ARGUMENT;
526
    }
527
    sixel_mutex_lock(&pool->mutex);
×
528
    error = pool->error;
×
529
    sixel_mutex_unlock(&pool->mutex);
×
530
    return error;
×
531
}
532

533
/* emacs Local Variables:      */
534
/* emacs mode: c               */
535
/* emacs tab-width: 4          */
536
/* emacs indent-tabs-mode: nil */
537
/* emacs c-basic-offset: 4     */
538
/* emacs End:                  */
539
/* vim: set expandtab ts=4 sts=4 sw=4 : */
540
/* EOF */
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc