• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

saitoha / libsixel / 19550705690

20 Nov 2025 08:40PM UTC coverage: 42.352% (+1.6%) from 40.773%
19550705690

push

github

saitoha
perf: boost encoder workers after dithering completes

9753 of 34088 branches covered (28.61%)

25 of 124 new or added lines in 2 files covered. (20.16%)

1896 existing lines in 12 files now uncovered.

13481 of 31831 relevant lines covered (42.35%)

933434.44 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

60.98
/src/threadpool.c
1
/*
2
 * SPDX-License-Identifier: MIT
3
 *
4
 * Copyright (c) 2025 libsixel developers. See `AUTHORS`.
5
 *
6
 * Permission is hereby granted, free of charge, to any person obtaining a copy of
7
 * this software and associated documentation files (the "Software"), to deal in
8
 * the Software without restriction, including without limitation the rights to
9
 * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
10
 * of the Software, and to permit persons to whom the Software is furnished to do
11
 * so, subject to the following conditions:
12
 *
13
 * The above copyright notice and this permission notice shall be included in all
14
 * copies or substantial portions of the Software.
15
 *
16
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22
 * SOFTWARE.
23
 */
24

25
#include "config.h"
26

27
#include <errno.h>
28
#include <stdlib.h>
29
#include <string.h>
30

31
#include "threadpool.h"
32

33
typedef struct threadpool_worker threadpool_worker_t;
34

35
struct threadpool_worker {
36
    threadpool_t *pool;
37
    sixel_thread_t thread;
38
    void *workspace;
39
    int started;
40
};
41

42
struct threadpool {
43
    int nthreads;
44
    int qsize;
45
    size_t workspace_size;
46
    tp_worker_fn worker;
47
    void *userdata;
48
    tp_job_t *jobs;
49
    int head;
50
    int tail;
51
    int count;
52
    int running;
53
    int shutting_down;
54
    int joined;
55
    int error;
56
    int threads_started;
57
    int worker_capacity;
58
    sixel_mutex_t mutex;
59
    sixel_cond_t cond_not_empty;
60
    sixel_cond_t cond_not_full;
61
    sixel_cond_t cond_drained;
62
    int mutex_ready;
63
    int cond_not_empty_ready;
64
    int cond_not_full_ready;
65
    int cond_drained_ready;
66
    threadpool_worker_t **workers; /* owned worker slots (stable addresses) */
67
};
68

69
static void threadpool_free(threadpool_t *pool);
70
static int threadpool_worker_main(void *arg);
71
static int threadpool_spawn_worker(threadpool_t *pool,
72
                                   threadpool_worker_t *worker);
73

74
/*
75
 * Release every dynamically allocated component of the pool. Callers must
76
 * ensure that worker threads have already terminated before invoking this
77
 * helper; otherwise joining would operate on freed memory.
78
 */
79
static void
80
threadpool_free(threadpool_t *pool)
6✔
81
{
82
    int i;
83

84
    if (pool == NULL) {
6!
85
        return;
×
86
    }
87
    if (pool->workers != NULL) {
6!
88
        for (i = 0; i < pool->worker_capacity; ++i) {
30✔
89
            if (pool->workers[i] == NULL) {
24!
NEW
90
                continue;
×
91
            }
92
            if (pool->workers[i]->workspace != NULL) {
24!
NEW
93
                free(pool->workers[i]->workspace);
×
94
            }
95
            free(pool->workers[i]);
24✔
96
        }
8✔
97
        free(pool->workers);
6✔
98
    }
2✔
99
    if (pool->jobs != NULL) {
6!
100
        free(pool->jobs);
6✔
101
    }
2✔
102
    if (pool->cond_drained_ready) {
6!
103
        sixel_cond_destroy(&pool->cond_drained);
6✔
104
    }
2✔
105
    if (pool->cond_not_full_ready) {
6!
106
        sixel_cond_destroy(&pool->cond_not_full);
6✔
107
    }
2✔
108
    if (pool->cond_not_empty_ready) {
6!
109
        sixel_cond_destroy(&pool->cond_not_empty);
6✔
110
    }
2✔
111
    if (pool->mutex_ready) {
6!
112
        sixel_mutex_destroy(&pool->mutex);
6✔
113
    }
2✔
114
    free(pool);
6✔
115
}
2✔
116

117
/*
118
 * Worker threads pull jobs from the ring buffer, execute the supplied callback
119
 * outside the critical section, and record the first failure code. All
120
 * synchronization is delegated to the mutex/condition helpers provided by the
121
 * threading abstraction.
122
 */
123
static int
124
threadpool_worker_main(void *arg)
24✔
125
{
126
    threadpool_worker_t *worker;
127
    threadpool_t *pool;
128
    tp_job_t job;
129
    int rc;
130

131
    worker = (threadpool_worker_t *)arg;
24✔
132
    pool = worker->pool;
24✔
133
    for (;;) {
158✔
134
        sixel_mutex_lock(&pool->mutex);
474✔
135
        while (pool->count == 0 && !pool->shutting_down) {
486!
136
            sixel_cond_wait(&pool->cond_not_empty, &pool->mutex);
12✔
137
        }
138
        if (pool->count == 0 && pool->shutting_down) {
474!
139
            sixel_mutex_unlock(&pool->mutex);
24✔
140
            break;
24✔
141
        }
142
        job = pool->jobs[pool->head];
450✔
143
        pool->head = (pool->head + 1) % pool->qsize;
450✔
144
        pool->count -= 1;
450✔
145
        pool->running += 1;
450✔
146
        sixel_cond_signal(&pool->cond_not_full);
450✔
147
        sixel_mutex_unlock(&pool->mutex);
450✔
148

149
        rc = pool->worker(job, pool->userdata, worker->workspace);
450✔
150

151
        sixel_mutex_lock(&pool->mutex);
450✔
152
        pool->running -= 1;
450✔
153
        if (rc != SIXEL_OK && pool->error == SIXEL_OK) {
450!
154
            pool->error = rc;
×
155
        }
156
        if (pool->count == 0 && pool->running == 0) {
450✔
157
            sixel_cond_broadcast(&pool->cond_drained);
6✔
158
        }
2✔
159
        sixel_mutex_unlock(&pool->mutex);
450✔
160
    }
161
    return SIXEL_OK;
24✔
162
}
163

164
SIXELAPI threadpool_t *
165
threadpool_create(int nthreads,
6✔
166
                  int qsize,
167
                  size_t workspace_size,
168
                  tp_worker_fn worker,
169
                  void *userdata)
170
{
171
    threadpool_t *pool;
172
    int i;
173
    int rc;
174

175
    if (nthreads <= 0 || qsize <= 0 || worker == NULL) {
6!
176
        return NULL;
×
177
    }
178
    pool = (threadpool_t *)calloc(1, sizeof(threadpool_t));
6✔
179
    if (pool == NULL) {
6!
180
        return NULL;
×
181
    }
182
    pool->nthreads = nthreads;
6✔
183
    pool->qsize = qsize;
6✔
184
    pool->workspace_size = workspace_size;
6✔
185
    pool->worker = worker;
6✔
186
    pool->userdata = userdata;
6✔
187
    pool->jobs = NULL;
6✔
188
    pool->head = 0;
6✔
189
    pool->tail = 0;
6✔
190
    pool->count = 0;
6✔
191
    pool->running = 0;
6✔
192
    pool->shutting_down = 0;
6✔
193
    pool->joined = 0;
6✔
194
    pool->error = SIXEL_OK;
6✔
195
    pool->threads_started = 0;
6✔
196
    pool->mutex_ready = 0;
6✔
197
    pool->cond_not_empty_ready = 0;
6✔
198
    pool->cond_not_full_ready = 0;
6✔
199
    pool->cond_drained_ready = 0;
6✔
200
    pool->workers = NULL;
6✔
201

202
    rc = sixel_mutex_init(&pool->mutex);
6✔
203
    if (rc != SIXEL_OK) {
6!
204
        errno = EINVAL;
×
205
        threadpool_free(pool);
×
206
        return NULL;
×
207
    }
208
    pool->mutex_ready = 1;
6✔
209

210
    rc = sixel_cond_init(&pool->cond_not_empty);
6✔
211
    if (rc != SIXEL_OK) {
6!
212
        errno = EINVAL;
×
213
        threadpool_free(pool);
×
214
        return NULL;
×
215
    }
216
    pool->cond_not_empty_ready = 1;
6✔
217

218
    rc = sixel_cond_init(&pool->cond_not_full);
6✔
219
    if (rc != SIXEL_OK) {
6!
220
        errno = EINVAL;
×
221
        threadpool_free(pool);
×
222
        return NULL;
×
223
    }
224
    pool->cond_not_full_ready = 1;
6✔
225

226
    rc = sixel_cond_init(&pool->cond_drained);
6✔
227
    if (rc != SIXEL_OK) {
6!
228
        errno = EINVAL;
×
229
        threadpool_free(pool);
×
230
        return NULL;
×
231
    }
232
    pool->cond_drained_ready = 1;
6✔
233

234
    pool->jobs = (tp_job_t *)malloc(sizeof(tp_job_t) * (size_t)qsize);
6✔
235
    if (pool->jobs == NULL) {
6!
236
        threadpool_free(pool);
×
237
        return NULL;
×
238
    }
239

240
    pool->worker_capacity = nthreads;
6✔
241
    pool->workers = (threadpool_worker_t **)calloc((size_t)nthreads,
6✔
242
            sizeof(threadpool_worker_t *));
243
    if (pool->workers == NULL) {
6!
244
        threadpool_free(pool);
×
245
        return NULL;
×
246
    }
247

248
    for (i = 0; i < nthreads; ++i) {
30✔
249
        pool->workers[i] = (threadpool_worker_t *)
24✔
250
            calloc(1, sizeof(threadpool_worker_t));
24✔
251
        if (pool->workers[i] == NULL) {
24!
NEW
252
            pool->shutting_down = 1;
×
NEW
253
            sixel_cond_broadcast(&pool->cond_not_empty);
×
NEW
254
            break;
×
255
        }
256
        pool->workers[i]->pool = pool;
24✔
257
        pool->workers[i]->workspace = NULL;
24✔
258
        pool->workers[i]->started = 0;
24✔
259
        if (workspace_size > 0) {
24!
260
            /*
261
             * Zero-initialize the per-thread workspace so that structures like
262
             * `sixel_parallel_worker_state_t` start with predictable values.
263
             * The worker initialization logic assumes fields such as
264
             * `initialized` are cleared before the first job.
265
             */
NEW
266
            pool->workers[i]->workspace = calloc(1, workspace_size);
×
NEW
267
            if (pool->workers[i]->workspace == NULL) {
×
268
                pool->shutting_down = 1;
×
269
                sixel_cond_broadcast(&pool->cond_not_empty);
×
270
                break;
×
271
            }
272
        }
273
        rc = threadpool_spawn_worker(pool, pool->workers[i]);
24✔
274
        if (rc != SIXEL_OK) {
24!
275
            break;
×
276
        }
277
    }
8✔
278

279
    if (pool->threads_started != nthreads) {
6!
280
        int started;
281

282
        started = pool->threads_started;
×
283
        for (i = 0; i < started; ++i) {
×
284
            sixel_cond_broadcast(&pool->cond_not_empty);
×
NEW
285
            sixel_thread_join(&pool->workers[i]->thread);
×
286
        }
287
        threadpool_free(pool);
×
288
        return NULL;
×
289
    }
290

291
    return pool;
6✔
292
}
2✔
293

294
static int
295
threadpool_spawn_worker(threadpool_t *pool, threadpool_worker_t *worker)
24✔
296
{
297
    int rc;
298

299
    if (pool == NULL || worker == NULL) {
24!
NEW
300
        return SIXEL_BAD_ARGUMENT;
×
301
    }
302
    rc = sixel_thread_create(&worker->thread,
32✔
303
                             threadpool_worker_main,
304
                             worker);
8✔
305
    if (rc != SIXEL_OK) {
24!
NEW
306
        sixel_mutex_lock(&pool->mutex);
×
NEW
307
        pool->shutting_down = 1;
×
NEW
308
        sixel_cond_broadcast(&pool->cond_not_empty);
×
NEW
309
        sixel_mutex_unlock(&pool->mutex);
×
NEW
310
        return rc;
×
311
    }
312
    worker->started = 1;
24✔
313
    pool->threads_started += 1;
24✔
314
    return SIXEL_OK;
24✔
315
}
8✔
316

317
SIXELAPI void
318
threadpool_destroy(threadpool_t *pool)
6✔
319
{
320
    if (pool == NULL) {
6!
321
        return;
×
322
    }
323
    threadpool_finish(pool);
6✔
324
    threadpool_free(pool);
6✔
325
}
2✔
326

327
SIXELAPI void
328
threadpool_push(threadpool_t *pool, tp_job_t job)
450✔
329
{
330
    if (pool == NULL) {
450!
331
        return;
×
332
    }
333
    sixel_mutex_lock(&pool->mutex);
450✔
334
    if (pool->shutting_down) {
450!
335
        sixel_mutex_unlock(&pool->mutex);
×
336
        return;
×
337
    }
338
    while (pool->count == pool->qsize && !pool->shutting_down) {
450!
339
        sixel_cond_wait(&pool->cond_not_full, &pool->mutex);
×
340
    }
341
    if (pool->shutting_down) {
450!
342
        sixel_mutex_unlock(&pool->mutex);
×
343
        return;
×
344
    }
345
    pool->jobs[pool->tail] = job;
450✔
346
    pool->tail = (pool->tail + 1) % pool->qsize;
450✔
347
    pool->count += 1;
450✔
348
    sixel_cond_signal(&pool->cond_not_empty);
450✔
349
    sixel_mutex_unlock(&pool->mutex);
450✔
350
}
150✔
351

352
SIXELAPI void
353
threadpool_finish(threadpool_t *pool)
12✔
354
{
355
    int i;
356

357
    if (pool == NULL) {
12!
358
        return;
×
359
    }
360
    sixel_mutex_lock(&pool->mutex);
12✔
361
    if (pool->joined) {
12✔
362
        sixel_mutex_unlock(&pool->mutex);
6✔
363
        return;
6✔
364
    }
365
    pool->shutting_down = 1;
6✔
366
    sixel_cond_broadcast(&pool->cond_not_empty);
6✔
367
    sixel_cond_broadcast(&pool->cond_not_full);
6✔
368
    while (pool->count > 0 || pool->running > 0) {
12!
369
        sixel_cond_wait(&pool->cond_drained, &pool->mutex);
6✔
370
    }
371
    sixel_mutex_unlock(&pool->mutex);
6✔
372

373
    for (i = 0; i < pool->threads_started; ++i) {
30✔
374
        if (pool->workers[i] != NULL && pool->workers[i]->started) {
24!
375
            sixel_thread_join(&pool->workers[i]->thread);
24✔
376
            pool->workers[i]->started = 0;
24✔
377
        }
8✔
378
    }
8✔
379

380
    sixel_mutex_lock(&pool->mutex);
6✔
381
    pool->joined = 1;
6✔
382
    sixel_mutex_unlock(&pool->mutex);
6✔
383
}
4✔
384

385
SIXELAPI int
NEW
386
threadpool_grow(threadpool_t *pool, int additional_threads)
×
387
{
388
    threadpool_worker_t **expanded;
389
    int new_target;
390
    int started_new;
391
    int i;
392
    int rc;
393

NEW
394
    if (pool == NULL || additional_threads <= 0) {
×
NEW
395
        return SIXEL_OK;
×
396
    }
397

NEW
398
    sixel_mutex_lock(&pool->mutex);
×
NEW
399
    if (pool->shutting_down) {
×
NEW
400
        sixel_mutex_unlock(&pool->mutex);
×
NEW
401
        return SIXEL_RUNTIME_ERROR;
×
402
    }
NEW
403
    new_target = pool->nthreads + additional_threads;
×
404
    /*
405
     * Worker structs stay heap-allocated per slot so pointer-table growth
406
     * never invalidates addresses already held by running threads.
407
     */
NEW
408
    if (new_target > pool->worker_capacity) {
×
NEW
409
        expanded = (threadpool_worker_t **)realloc(
×
NEW
410
            pool->workers,
×
NEW
411
            (size_t)new_target * sizeof(threadpool_worker_t *));
×
NEW
412
        if (expanded == NULL) {
×
NEW
413
            sixel_mutex_unlock(&pool->mutex);
×
NEW
414
            return SIXEL_BAD_ALLOCATION;
×
415
        }
NEW
416
        memset(expanded + pool->worker_capacity,
×
417
               0,
418
               (size_t)(new_target - pool->worker_capacity)
419
                   * sizeof(threadpool_worker_t *));
NEW
420
        pool->workers = expanded;
×
NEW
421
        pool->worker_capacity = new_target;
×
422
    }
NEW
423
    sixel_mutex_unlock(&pool->mutex);
×
424

NEW
425
    started_new = 0;
×
NEW
426
    rc = SIXEL_OK;
×
NEW
427
    for (i = pool->nthreads; i < new_target; ++i) {
×
NEW
428
        pool->workers[i] = (threadpool_worker_t *)
×
NEW
429
            calloc(1, sizeof(threadpool_worker_t));
×
NEW
430
        if (pool->workers[i] == NULL) {
×
NEW
431
            rc = SIXEL_BAD_ALLOCATION;
×
NEW
432
            break;
×
433
        }
NEW
434
        pool->workers[i]->pool = pool;
×
NEW
435
        pool->workers[i]->workspace = NULL;
×
NEW
436
        pool->workers[i]->started = 0;
×
NEW
437
        if (pool->workspace_size > 0) {
×
NEW
438
            pool->workers[i]->workspace =
×
NEW
439
                calloc(1, pool->workspace_size);
×
NEW
440
            if (pool->workers[i]->workspace == NULL) {
×
NEW
441
                rc = SIXEL_BAD_ALLOCATION;
×
NEW
442
                break;
×
443
            }
444
        }
445

NEW
446
        rc = threadpool_spawn_worker(pool, pool->workers[i]);
×
NEW
447
        if (rc != SIXEL_OK) {
×
NEW
448
            break;
×
449
        }
NEW
450
        started_new += 1;
×
451
    }
452

NEW
453
    if (rc != SIXEL_OK) {
×
454
        int j;
455

NEW
456
        for (j = i; j < new_target; ++j) {
×
NEW
457
            if (pool->workers[j] != NULL) {
×
NEW
458
                if (pool->workers[j]->workspace != NULL) {
×
NEW
459
                    free(pool->workers[j]->workspace);
×
460
                }
NEW
461
                free(pool->workers[j]);
×
NEW
462
                pool->workers[j] = NULL;
×
463
            }
464
        }
465
    }
466

NEW
467
    sixel_mutex_lock(&pool->mutex);
×
NEW
468
    pool->nthreads = pool->nthreads + started_new;
×
NEW
469
    sixel_mutex_unlock(&pool->mutex);
×
470

NEW
471
    return rc;
×
472
}
473

474
SIXELAPI int
475
threadpool_get_error(threadpool_t *pool)
6✔
476
{
477
    int error;
478

479
    if (pool == NULL) {
6!
480
        return SIXEL_BAD_ARGUMENT;
×
481
    }
482
    sixel_mutex_lock(&pool->mutex);
6✔
483
    error = pool->error;
6✔
484
    sixel_mutex_unlock(&pool->mutex);
6✔
485
    return error;
6✔
486
}
2✔
487

488
/* emacs Local Variables:      */
489
/* emacs mode: c               */
490
/* emacs tab-width: 4          */
491
/* emacs indent-tabs-mode: nil */
492
/* emacs c-basic-offset: 4     */
493
/* emacs End:                  */
494
/* vim: set expandtab ts=4 sts=4 sw=4 : */
495
/* EOF */
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc