• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nickg / nvc / 21458432965

28 Jan 2026 10:49PM UTC coverage: 92.632% (+0.02%) from 92.616%
21458432965

Pull #1384

github

web-flow
Merge a7f274965 into 71ebda0ca
Pull Request #1384: Random signal initialization plugin.

17 of 34 new or added lines in 1 file covered. (50.0%)

1096 existing lines in 29 files now uncovered.

76515 of 82601 relevant lines covered (92.63%)

444886.01 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.65
/src/thread.c
1
//
2
//  Copyright (C) 2021-2023  Nick Gasson
3
//
4
//  This program is free software: you can redistribute it and/or modify
5
//  it under the terms of the GNU General Public License as published by
6
//  the Free Software Foundation, either version 3 of the License, or
7
//  (at your option) any later version.
8
//
9
//  This program is distributed in the hope that it will be useful,
10
//  but WITHOUT ANY WARRANTY; without even the implied warranty of
11
//  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
//  GNU General Public License for more details.
13
//
14
//  You should have received a copy of the GNU General Public License
15
//  along with this program.  If not, see <http://www.gnu.org/licenses/>.
16
//
17

18
#include "util.h"
19
#include "array.h"
20
#include "cpustate.h"
21
#include "rt/mspace.h"
22
#include "thread.h"
23

24
#include <assert.h>
25
#include <errno.h>
26
#include <math.h>
27
#include <signal.h>
28
#include <stdlib.h>
29
#include <string.h>
30
#include <inttypes.h>
31
#include <unistd.h>
32
#include <semaphore.h>
33

34
#ifdef HAVE_PTHREAD
35
#include <pthread.h>
36
#elif !defined __MINGW32__
37
#error missing pthread support
38
#endif
39

40
#ifdef __SANITIZE_THREAD__
41
#include <sanitizer/tsan_interface.h>
42
#endif
43

44
#if defined __MINGW32__
45
#define WIN32_LEAN_AND_MEAN
46
#include <windows.h>
47
#elif defined __APPLE__
48
#define task_t __task_t
49
#include <mach/thread_act.h>
50
#include <mach/machine.h>
51
#undef task_t
52
#endif
53

54
#define LOCK_SPINS      64
55
#define YIELD_SPINS     32
56
#define MIN_TAKE        8
57
#define PARKING_BAYS    64
58
#define SUSPEND_TIMEOUT 1
59
#define THREAD_NAME_LEN 16
60

61
#if !defined __MINGW32__ && !defined __APPLE__
62
#define POSIX_SUSPEND 1
63
#define SIGSUSPEND    SIGRTMIN
64
#define SIGRESUME     (SIGRTMIN + 1)
65
#endif
66

67
typedef struct {
68
   int64_t locks;
69
   int64_t spins;
70
   int64_t contended;
71
   int64_t parks;
72
   int64_t spurious;
73
   int64_t retries;
74
} __attribute__((aligned(64))) lock_stats_t;
75

76
STATIC_ASSERT(sizeof(lock_stats_t) == 64)
77

78
#ifdef DEBUG
79
#define LOCK_EVENT(what, n) do {                  \
80
      if (likely(my_thread != NULL))              \
81
         lock_stats[my_thread->id].what += (n);   \
82
   } while (0)
83

84
#define WORKQ_EVENT(what, n) do {                 \
85
      if (likely(my_thread != NULL))              \
86
         workq_stats[my_thread->id].what += (n);  \
87
   } while (0)
88
#else
89
#define LOCK_EVENT(what, n) (void)(n)
90
#define WORKQ_EVENT(what, n) (void)(n)
91
#endif
92

93
#ifdef __SANITIZE_THREAD__
94
#define TSAN_PRE_LOCK(addr) \
95
   __tsan_mutex_pre_lock(addr, __tsan_mutex_linker_init);
96
#define TSAN_POST_LOCK(addr) \
97
   __tsan_mutex_post_lock(addr, __tsan_mutex_linker_init, 0);
98
#define TSAN_PRE_UNLOCK(addr) \
99
   __tsan_mutex_pre_unlock(addr, __tsan_mutex_linker_init);
100
#define TSAN_POST_UNLOCK(addr) \
101
   __tsan_mutex_post_unlock(addr, __tsan_mutex_linker_init);
102
#else
103
#define TSAN_PRE_LOCK(addr)
104
#define TSAN_POST_LOCK(addr)
105
#define TSAN_PRE_UNLOCK(addr)
106
#define TSAN_POST_UNLOCK(addr)
107
#endif
108

109
#ifndef PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP
110
#define PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP PTHREAD_MUTEX_INITIALIZER
111
#endif
112

113
#define PTHREAD_CHECK(op, ...) do {             \
114
      if (unlikely(op(__VA_ARGS__) != 0))       \
115
         fatal_errno(#op);                      \
116
   } while (0)
117

118
// Lock implementation based on WTF::Lock in WebKit
119
//   https://webkit.org/blog/6161/locking-in-webkit/
120

121
typedef enum {
122
   IS_LOCKED  = (1 << 0),
123
   HAS_PARKED = (1 << 1)
124
} lock_bits_t;
125

126
typedef struct {
127
#ifdef __MINGW32__
128
   CRITICAL_SECTION   mutex;
129
   CONDITION_VARIABLE cond;
130
#else
131
   pthread_mutex_t    mutex;
132
   pthread_cond_t     cond;
133
#endif
134
   int                parked;
135
} __attribute__((aligned(64))) parking_bay_t;
136

137
typedef bool (*park_fn_t)(parking_bay_t *, void *);
138
typedef void (*unpark_fn_t)(parking_bay_t *, void *);
139

140
typedef struct {
141
   task_fn_t  fn;
142
   void      *context;
143
   void      *arg;
144
   workq_t   *workq;
145
} task_t;
146

147
// Work sealing task queue is based on:
148
//   Arora, N. S., Blumofe, R. D., and Plaxton, C. G.
149
//   Thread scheduling for multiprogrammed multiprocessors.
150
//   Theory of Computing Systems 34, 2 (2001), 115-144.
151

152
typedef uint32_t abp_idx_t;
153
typedef uint32_t abp_tag_t;
154

155
typedef union {
156
   struct {
157
      abp_idx_t top;
158
      abp_tag_t tag;
159
   };
160
   uint64_t bits;
161
} abp_age_t;
162

163
STATIC_ASSERT(sizeof(abp_age_t) <= 8);
164

165
#define THREADQ_SIZE 256
166

167
typedef struct {
168
   task_t    deque[THREADQ_SIZE];
169
   abp_age_t age;
170
   abp_idx_t bot;
171
} threadq_t;
172

173
typedef enum { IDLE, START } workq_state_t;
174

175
typedef union {
176
   struct {
177
      uint32_t count;
178
      uint32_t epoch;
179
   };
180
   uint64_t bits;
181
} entryq_ptr_t;
182

183
STATIC_ASSERT(sizeof(entryq_ptr_t) <= 8);
184

185
typedef struct {
186
   task_t       *tasks;
187
   unsigned      queuesz;
188
   entryq_ptr_t  wptr;
189
   entryq_ptr_t  comp;
190
} __attribute__((aligned(64))) entryq_t;
191

192
STATIC_ASSERT(sizeof(entryq_t) == 64);
193

194
struct _workq {
195
   void          *context;
196
   workq_state_t  state;
197
   unsigned       epoch;
198
   unsigned       maxthread;
199
   bool           parallel;
200
   entryq_t       entryqs[MAX_THREADS];
201
};
202

203
typedef struct {
204
   int64_t comp;
205
   int64_t steals;
206
   int64_t wakes;
207
} __attribute__((aligned(64))) workq_stats_t;
208

209
STATIC_ASSERT(sizeof(workq_stats_t) == 64);
210

211
typedef enum {
212
   MAIN_THREAD,
213
   USER_THREAD,
214
   WORKER_THREAD,
215
} thread_kind_t;
216

217
typedef char thread_name_t[THREAD_NAME_LEN];
218

219
struct _nvc_thread {
220
   unsigned        id;
221
   thread_kind_t   kind;
222
   unsigned        spins;
223
   threadq_t       queue;
224
   thread_name_t   name;
225
   thread_fn_t     fn;
226
   void           *arg;
227
   int             victim;
228
   uint32_t        rngstate;
229
#ifdef __MINGW32__
230
   HANDLE          handle;
231
   void           *retval;
232
#else
233
   pthread_t       handle;
234
#endif
235
#ifdef __APPLE__
236
   thread_port_t   port;
237
#endif
238
};
239

240
typedef struct {
241
   nvc_lock_t   lock;
242
   task_t      *tasks;
243
   unsigned     wptr;
244
   unsigned     rptr;
245
   unsigned     max;
246
} globalq_t;
247

248
typedef struct _barrier {
249
   unsigned count;
250
   unsigned reached;
251
   unsigned passed;
252
} __attribute__((aligned(64))) barrier_t;
253

254
static parking_bay_t parking_bays[PARKING_BAYS] = {
255
#ifndef __MINGW32__
256
   [0 ... PARKING_BAYS - 1] = {
257
      PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP,
258
      PTHREAD_COND_INITIALIZER
259
   }
260
#endif
261
};
262

263
static nvc_thread_t    *threads[MAX_THREADS];
264
static unsigned         max_workers = 0;
265
static int              running_threads = 0;
266
static unsigned         max_thread_id = 0;
267
static bool             should_stop = false;
268
static globalq_t        globalq __attribute__((aligned(64)));
269
static int              async_pending __attribute__((aligned(64))) = 0;
270
static nvc_lock_t       stop_lock = 0;
271
static stop_world_fn_t  stop_callback = NULL;
272
static void            *stop_arg = NULL;
273

274
#ifdef __MINGW32__
275
static CONDITION_VARIABLE wake_workers = CONDITION_VARIABLE_INIT;
276
static CRITICAL_SECTION   wakelock;
277
#else
278
static pthread_cond_t     wake_workers = PTHREAD_COND_INITIALIZER;
279
static pthread_mutex_t    wakelock = PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP;
280
#endif
281

282
#ifdef POSIX_SUSPEND
283
static sem_t stop_sem;
284
#endif
285

286
#ifdef DEBUG
287
static lock_stats_t  lock_stats[MAX_THREADS];
288
static workq_stats_t workq_stats[MAX_THREADS];
289
#endif
290

291
static __thread nvc_thread_t *my_thread = NULL;
292

293
static parking_bay_t *parking_bay_for(void *cookie);
294

295
#ifdef POSIX_SUSPEND
296
static void suspend_handler(int sig, siginfo_t *info, void *context);
297
#endif
298

299
#ifdef DEBUG
300
static void print_lock_stats(void)
×
301
{
302
   lock_stats_t s = {};
×
303
   workq_stats_t t = {};
×
304
   for (int i = 0; i < MAX_THREADS; i++) {
×
305
      s.locks     += relaxed_load(&(lock_stats[i].locks));
×
306
      s.contended += relaxed_load(&(lock_stats[i].contended));
×
307
      s.parks     += relaxed_load(&(lock_stats[i].parks));
×
UNCOV
308
      s.spins     += relaxed_load(&(lock_stats[i].spins));
×
309
      s.spurious  += relaxed_load(&(lock_stats[i].spurious));
×
310
      s.retries   += relaxed_load(&(lock_stats[i].retries));
×
311

UNCOV
312
      t.comp   += relaxed_load(&(workq_stats[i].comp));
×
UNCOV
313
      t.steals += relaxed_load(&(workq_stats[i].steals));
×
314
      t.wakes  += relaxed_load(&(workq_stats[i].wakes));
×
315
   }
316

317
   printf("\nLock statistics:\n");
×
318
   printf("\tTotal locks      : %"PRIi64"\n", s.locks);
×
319
   printf("\tContended        : %"PRIi64" (%.1f%%)\n",
×
320
          s.contended, 100.0 * ((double)s.contended / (double)s.locks));
×
321
   printf("\tParked           : %"PRIi64" (%.1f%%)\n",
×
UNCOV
322
          s.parks, 100.0 * ((double)s.parks / (double)s.locks));
×
323
   printf("\tAverage spins    : %.1f\n", (double)s.spins / (double)s.retries);
×
324
   printf("\tSpurious wakeups : %"PRIi64"\n", s.spurious);
×
325

326
   printf("\nWork queue statistics:\n");
×
327
   printf("\tCompleted tasks  : %"PRIi64"\n", t.comp);
×
UNCOV
328
   printf("\tSteals           : %"PRIi64"\n", t.steals);
×
UNCOV
329
   printf("\tWakeups          : %"PRIi64"\n", t.wakes);
×
UNCOV
330
}
×
331
#endif
332

333
#ifdef __MINGW32__
334
static inline void platform_mutex_lock(LPCRITICAL_SECTION lpcs)
335
{
336
   EnterCriticalSection(lpcs);
337
}
338

339
static inline void platform_mutex_unlock(LPCRITICAL_SECTION lpcs)
340
{
341
   LeaveCriticalSection(lpcs);
342
}
343

344
static inline void platform_cond_broadcast(PCONDITION_VARIABLE pcv)
345
{
346
   WakeAllConditionVariable(pcv);
347
}
348

349
static inline void platform_cond_wait(PCONDITION_VARIABLE pcv,
350
                                      LPCRITICAL_SECTION lpcs)
351
{
352
   SleepConditionVariableCS(pcv, lpcs, INFINITE);
353
}
354
#else
355
static inline void platform_mutex_lock(pthread_mutex_t *mtx)
12,644✔
356
{
357
   PTHREAD_CHECK(pthread_mutex_lock, mtx);
12,644✔
358
}
12,644✔
359

360
static inline void platform_mutex_unlock(pthread_mutex_t *mtx)
12,644✔
361
{
362
   PTHREAD_CHECK(pthread_mutex_unlock, mtx);
12,644✔
363
}
12,644✔
364

365
static inline void platform_cond_broadcast(pthread_cond_t *cond)
11,147✔
366
{
367
   PTHREAD_CHECK(pthread_cond_broadcast, cond);
11,147✔
368
}
11,147✔
369

370
static inline void platform_cond_wait(pthread_cond_t *cond,
1,996✔
371
                                      pthread_mutex_t *mtx)
372
{
373
   PTHREAD_CHECK(pthread_cond_wait, cond, mtx);
1,996✔
374
}
1,996✔
375
#endif
376

377
static void join_worker_threads(void)
5,746✔
378
{
379
   SCOPED_A(nvc_thread_t *) join_list = AINIT;
11,492✔
380

381
   for (int i = 1; i < MAX_THREADS; i++) {
367,744✔
382
      nvc_thread_t *t = atomic_load(&threads[i]);
361,998✔
383
      if (t != NULL)
361,998✔
384
         APUSH(join_list, t);
361,998✔
385
   }
386

387
   // Lock the wake mutex here to avoid races with workers sleeping
388
   platform_mutex_lock(&wakelock);
5,746✔
389
   {
390
      atomic_store(&should_stop, true);
5,746✔
391
      platform_cond_broadcast(&wake_workers);
5,746✔
392
   }
393
   platform_mutex_unlock(&wakelock);
5,746✔
394

395
   for (int i = 0; i < join_list.count; i++) {
5,909✔
396
      nvc_thread_t *t = join_list.items[i];
163✔
397

398
      switch (relaxed_load(&t->kind)) {
163✔
399
      case WORKER_THREAD:
163✔
400
         thread_join(t);
163✔
401
         continue;  // Freed thread struct
163✔
UNCOV
402
      case USER_THREAD:
×
403
      case MAIN_THREAD:
404
         fatal_trace("leaked a user thread: %s", t->name);
405
      }
406
   }
407

408
   assert(atomic_load(&running_threads) == 1);
5,746✔
409

410
   for (int i = 0; i < join_list.count; i++)
5,909✔
411
      free(join_list.items[i]);
163✔
412

413
#ifdef DEBUG
414
   for (int i = 0; i < PARKING_BAYS; i++)
373,490✔
415
      assert(parking_bays[i].parked == 0);
367,744✔
416
#endif
417
}
5,746✔
418

419
static nvc_thread_t *thread_new(thread_fn_t fn, void *arg, thread_kind_t kind,
5,923✔
420
                                const thread_name_t name)
421
{
422
   nvc_thread_t *thread = xcalloc(sizeof(nvc_thread_t));
5,923✔
423
   thread->fn       = fn;
5,923✔
424
   thread->arg      = arg;
5,923✔
425
   thread->kind     = kind;
5,923✔
426
   thread->rngstate = rand();
5,923✔
427
   memcpy(thread->name, name, THREAD_NAME_LEN);
5,923✔
428

429
   atomic_store(&thread->queue.age.bits, 0);
5,923✔
430
   atomic_store(&thread->queue.bot, 0);
5,923✔
431

432
   int id = 0;
5,923✔
433
   for (; id < MAX_THREADS; id++) {
6,242✔
434
      if (relaxed_load(&(threads[id])) != NULL)
6,242✔
435
         continue;
319✔
436
      else if (atomic_cas(&(threads[id]), NULL, thread))
5,923✔
437
         break;
438
   }
439

440
   if (id == MAX_THREADS)
5,923✔
441
      fatal_trace("cannot create more than %d threads", MAX_THREADS);
442

443
   thread->id = id;
5,923✔
444

445
   unsigned max = relaxed_load(&max_thread_id);
5,923✔
446
   while (max < id) {
5,923✔
447
      if (__atomic_cas(&max_thread_id, &max, id))
177✔
448
         break;
449
   }
450

451
   atomic_add(&running_threads, 1);
5,923✔
452
   return thread;
5,923✔
453
}
454

455
#ifdef POSIX_SUSPEND
456
static void unmask_fatal_signals(sigset_t *mask)
8,182✔
457
{
458
   sigdelset(mask, SIGQUIT);
8,182✔
459
   sigdelset(mask, SIGABRT);
8,182✔
460
   sigdelset(mask, SIGTERM);
8,182✔
461
}
8,182✔
462
#endif
463

464
#ifdef __APPLE__
465
static void reset_mach_ports(void)
466
{
467
   for (int i = 0; i < MAX_THREADS; i++) {
468
      nvc_thread_t *t = atomic_load(&(threads[i]));
469
      if (t == NULL)
470
         continue;
471

472
      // Mach ports are not valid after fork
473
      t->port = pthread_mach_thread_np(t->handle);
474
   }
475
}
476
#endif
477

478
void thread_init(void)
5,746✔
479
{
480
   assert(my_thread == NULL);
5,746✔
481

482
   const thread_name_t name = "main thread";
5,746✔
483
   my_thread = thread_new(NULL, NULL, MAIN_THREAD, name);
5,746✔
484

485
#ifdef __MINGW32__
486
   my_thread->handle = GetCurrentThread();
487
#else
488
   my_thread->handle = pthread_self();
5,746✔
489
#endif
490

491
#ifdef __APPLE__
492
   my_thread->port = pthread_mach_thread_np(my_thread->handle);
493
   pthread_atfork(NULL, NULL, reset_mach_ports);
494
#endif
495

496
#ifdef __MINGW32__
497
   InitializeCriticalSectionAndSpinCount(&wakelock, LOCK_SPINS);
498
   InitializeConditionVariable(&wake_workers);
499

500
   for (int i = 0; i < PARKING_BAYS; i++) {
501
      parking_bay_t *bay = &(parking_bays[i]);
502
      InitializeCriticalSectionAndSpinCount(&(bay->mutex), LOCK_SPINS);
503
      InitializeConditionVariable(&(bay->cond));
504
   }
505
#endif
506

507
   assert(my_thread->id == 0);
5,746✔
508

509
   const char *max_env = getenv("NVC_MAX_THREADS");
5,746✔
510
   if (max_env != NULL)
5,746✔
UNCOV
511
      max_workers = MAX(1, MIN(atoi(max_env), MAX_THREADS));
×
512
   else
513
      max_workers = DEFAULT_THREADS;
5,746✔
514

515
   const int num_cpus = nvc_nprocs();
5,746✔
516
   max_workers = MIN(num_cpus, max_workers);
5,746✔
517

518
   const char *jobs_env = getenv("NVC_CONCURRENT_JOBS");
5,746✔
519
   if (jobs_env != NULL) {
5,746✔
UNCOV
520
      const int num_jobs = MAX(1, atoi(jobs_env));
×
UNCOV
521
      const int limit = (int)round((double)num_cpus / (double)num_jobs);
×
522
      max_workers = MAX(1, MIN(max_workers, limit));
×
523
   }
524

525
   assert(max_workers > 0);
5,746✔
526

527
#ifdef DEBUG
528
   if (getenv("NVC_THREAD_VERBOSE") != NULL)
5,746✔
UNCOV
529
      atexit(print_lock_stats);
×
530
#endif
531

532
   atexit(join_worker_threads);
5,746✔
533

534
#ifdef POSIX_SUSPEND
535
   sem_init(&stop_sem, 0, 0);
5,746✔
536

537
   struct sigaction sa = {
5,746✔
538
      .sa_sigaction = suspend_handler,
539
      .sa_flags = SA_RESTART | SA_SIGINFO
540
   };
541
   sigfillset(&sa.sa_mask);
5,746✔
542
   unmask_fatal_signals(&sa.sa_mask);
5,746✔
543

544
   sigaction(SIGSUSPEND, &sa, NULL);
5,746✔
545
   sigaction(SIGRESUME, &sa, NULL);
5,746✔
546

547
   sigset_t mask;
5,746✔
548
   PTHREAD_CHECK(pthread_sigmask, SIG_SETMASK, NULL, &mask);
5,746✔
549

550
   sigdelset(&mask, SIGSUSPEND);
5,746✔
551
   sigaddset(&mask, SIGRESUME);
5,746✔
552

553
   PTHREAD_CHECK(pthread_sigmask, SIG_SETMASK, &mask, NULL);
5,746✔
554
#endif
555
}
5,746✔
556

557
int thread_id(void)
35,783,622✔
558
{
559
   assert(my_thread != NULL);
35,783,622✔
560
   return my_thread->id;
35,783,622✔
561
}
562

563
bool thread_attached(void)
40✔
564
{
565
   return my_thread != NULL;
40✔
566
}
567

UNCOV
568
void thread_sleep(int usec)
×
569
{
UNCOV
570
   usleep(usec);
×
UNCOV
571
}
×
572

573
static void *thread_wrapper(void *arg)
177✔
574
{
575
   assert(my_thread == NULL);
177✔
576
   my_thread = arg;
177✔
577

578
   void *result = (*my_thread->fn)(my_thread->arg);
177✔
579

580
   // Avoid races with stop_world
581
   SCOPED_LOCK(stop_lock);
177✔
582

583
   assert(threads[my_thread->id] == my_thread);
177✔
584
   atomic_store(&(threads[my_thread->id]),  NULL);
177✔
585

586
   atomic_add(&running_threads, -1);
177✔
587
   return result;
177✔
588
}
589

590
#ifdef __MINGW32__
591
static DWORD win32_thread_wrapper(LPVOID param)
592
{
593
   void *ret = thread_wrapper(param);
594
   atomic_store(&(my_thread->retval), ret);
595
   return 0;
596
}
597
#endif
598

599
static void thread_start(nvc_thread_t *thread)
177✔
600
{
601
   assert_lock_held(&stop_lock);
177✔
602

603
#ifdef __MINGW32__
604
   if ((thread->handle = CreateThread(NULL, 0, win32_thread_wrapper,
605
                                      thread, 0, NULL)) == NULL)
606
      fatal_errno("CreateThread");
607
#else
608
   PTHREAD_CHECK(pthread_create, &(thread->handle), NULL,
177✔
609
                 thread_wrapper, thread);
610
#endif
611

612
#ifdef __APPLE__
613
   thread->port = pthread_mach_thread_np(thread->handle);
614
#endif
615
}
177✔
616

617
nvc_thread_t *thread_create(thread_fn_t fn, void *arg, const char *fmt, ...)
14✔
618
{
619
   va_list ap;
14✔
620
   va_start(ap, fmt);
14✔
621

622
   thread_name_t name;
14✔
623
   checked_vsprintf(name, THREAD_NAME_LEN, fmt, ap);
14✔
624

625
   va_end(ap);
14✔
626

627
   // Avoid races with stop_world
628
   SCOPED_LOCK(stop_lock);
14✔
629

630
   nvc_thread_t *thread = thread_new(fn, arg, USER_THREAD, name);
14✔
631
   thread_start(thread);
14✔
632

633
   return thread;
14✔
634
}
635

636
void *thread_join(nvc_thread_t *thread)
177✔
637
{
638
   if (thread == my_thread || thread->kind == MAIN_THREAD)
177✔
639
      fatal_trace("cannot join self or main thread");
640

641
   void *retval = NULL;
177✔
642
#ifdef __MINGW32__
643
   if (WaitForSingleObject(thread->handle, INFINITE) == WAIT_FAILED)
644
      fatal_errno("WaitForSingleObject failed for thread %s", thread->name);
645

646
   retval = atomic_load(&(thread->retval));
647
#else
648
   PTHREAD_CHECK(pthread_join, thread->handle, &retval);
177✔
649
#endif
650

651
   async_free(thread->name);
177✔
652
   async_free(thread);
177✔
653

654
   return retval;
177✔
655
}
656

657
nvc_thread_t *get_thread(int id)
21,312✔
658
{
659
   assert(id >= 0 && id < MAX_THREADS);
21,312✔
660
   return atomic_load(&threads[id]);
21,312✔
661
}
662

663
static inline parking_bay_t *parking_bay_for(void *cookie)
6,536✔
664
{
665
   return &(parking_bays[mix_bits_64(cookie) % PARKING_BAYS]);
6,536✔
666
}
667

668
static void thread_park(void *cookie, park_fn_t fn)
1,644✔
669
{
670
   parking_bay_t *bay = parking_bay_for(cookie);
1,644✔
671

672
   platform_mutex_lock(&(bay->mutex));
1,644✔
673
   {
674
      if ((*fn)(bay, cookie)) {
1,644✔
675
         bay->parked++;
1,634✔
676
         platform_cond_wait(&(bay->cond), &(bay->mutex));
1,634✔
677
         assert(bay->parked > 0);
1,634✔
678
         bay->parked--;
1,634✔
679
      }
680
   }
681
   platform_mutex_unlock(&(bay->mutex));
1,644✔
682
}
1,644✔
683

684
static void thread_unpark(void *cookie, unpark_fn_t fn)
4,892✔
685
{
686
   parking_bay_t *bay = parking_bay_for(cookie);
4,892✔
687

688
   if (fn != NULL) {
4,892✔
689
      platform_mutex_lock(&(bay->mutex));
4,892✔
690
      {
691
         (*fn)(bay, cookie);
4,892✔
692
      }
693
      platform_mutex_unlock(&(bay->mutex));
4,892✔
694
   }
695

696
   // Do not use pthread_cond_signal here as multiple threads parked in
697
   // this bay may be waiting on different cookies
698
   platform_cond_broadcast(&(bay->cond));
4,892✔
699
}
4,892✔
700

701
void spin_wait(void)
4,183,047✔
702
{
703
#if defined ARCH_X86_64
704
   __asm__ volatile ("pause");
4,183,047✔
705
#elif defined ARCH_ARM64
706
   // YIELD is a no-op on most AArch64 cores so also do an ISB to stall
707
   // the pipeline for a bit
708
   __asm__ volatile ("yield; isb");
709
#endif
710
}
4,183,047✔
711

712
static bool lock_park_cb(parking_bay_t *bay, void *cookie)
1,644✔
713
{
714
   nvc_lock_t *lock = cookie;
1,644✔
715

716
   // This is called with the park mutex held: check the lock is still
717
   // owned by someone and the park bit is still set
718
   return relaxed_load(lock) == (IS_LOCKED | HAS_PARKED);
1,644✔
719
}
720

721
static void lock_unpark_cb(parking_bay_t *bay, void *cookie)
4,892✔
722
{
723
   nvc_lock_t *lock = cookie;
4,892✔
724

725
   assert(relaxed_load(lock) == (IS_LOCKED | HAS_PARKED));
4,892✔
726

727
   // Unlock must have release semantics
728
   atomic_store(lock, (bay->parked > 0 ? HAS_PARKED : 0));
4,892✔
729
}
4,892✔
730

731
void nvc_lock(nvc_lock_t *lock)
27,101,470✔
732
{
733
   LOCK_EVENT(locks, 1);
27,101,470✔
734
   TSAN_PRE_LOCK(lock);
27,101,470✔
735

736
   int8_t state = 0;
27,101,470✔
737
   if (likely(__atomic_cas(lock, &state, state | IS_LOCKED)))
27,101,470✔
738
      goto locked;  // Fast path: acquired the lock without contention
27,085,403✔
739

740
   LOCK_EVENT(contended, 1);
16,067✔
741

742
   for (;;) {
74,392✔
743
      LOCK_EVENT(retries, 1);
74,392✔
744

745
      // Spin a few times waiting for the owner to release the lock
746
      // before parking
747
      int spins = 0;
748
      for (; (state & IS_LOCKED) && spins < LOCK_SPINS;
398,693✔
749
           spins++, state = relaxed_load(lock))
324,301✔
750
         spin_wait();
324,301✔
751

752
      if (state & IS_LOCKED) {
74,392✔
753
         // Ignore failures here as we will check the lock state again
754
         // in the callback with the park mutex held
755
         atomic_cas(lock, IS_LOCKED, IS_LOCKED | HAS_PARKED);
1,644✔
756

757
         LOCK_EVENT(parks, 1);
1,644✔
758
         thread_park(lock, lock_park_cb);
1,644✔
759

760
         if ((state = relaxed_load(lock)) & IS_LOCKED) {
1,644✔
761
            // Someone else grabbed the lock before our thread was unparked
762
            LOCK_EVENT(spurious, 1);
1,540✔
763
            continue;
1,540✔
764
         }
765
      }
766
      else
767
         LOCK_EVENT(spins, spins);
72,748✔
768

769
      assert(!(state & IS_LOCKED));
72,852✔
770

771
      // If we get here then we've seen the lock in an unowned state:
772
      // attempt to grab it with a CAS
773
      if (__atomic_cas(lock, &state, state | IS_LOCKED))
72,852✔
774
         goto locked;
16,067✔
775
   }
776

777
 locked:
27,101,470✔
778
   TSAN_POST_LOCK(lock);
27,101,470✔
779
}
27,101,470✔
780

781
void nvc_unlock(nvc_lock_t *lock)
27,101,467✔
782
{
783
   TSAN_PRE_UNLOCK(lock);
27,101,467✔
784

785
   // Fast path: unlock assuming no parked waiters
786
   if (likely(atomic_cas(lock, IS_LOCKED, 0)))
27,101,467✔
787
      goto unlocked;
27,096,575✔
788

789
   // If we get here then we must own the lock with at least one parked
790
   // waiting thread
791
   assert(relaxed_load(lock) == (IS_LOCKED | HAS_PARKED));
4,892✔
792

793
   // Lock released in callback
794
   thread_unpark(lock, lock_unpark_cb);
4,892✔
795

796
 unlocked:
27,101,467✔
797
   TSAN_POST_UNLOCK(lock);
27,101,467✔
798
}
27,101,467✔
799

800
#ifdef DEBUG
801
void assert_lock_held(nvc_lock_t *lock)
123,145✔
802
{
803
   int8_t state = relaxed_load(lock);
123,145✔
804
   if (unlikely(!(state & IS_LOCKED)))
123,145✔
805
      fatal_trace("expected lock at %p to be held", lock);
806
}
123,145✔
807
#endif
808

809
void __scoped_unlock(nvc_lock_t **plock)
27,050,634✔
810
{
811
   nvc_unlock(*plock);
27,050,634✔
812
}
27,050,634✔
813

814
static void push_bot(threadq_t *tq, const task_t *tasks, size_t count)
371✔
815
{
816
   const abp_idx_t bot = relaxed_load(&tq->bot);
371✔
817
   assert(bot + count <= THREADQ_SIZE);
371✔
818

819
   memcpy(tq->deque + bot, tasks, count * sizeof(task_t));
371✔
820
   store_release(&tq->bot, bot + count);
371✔
821
}
371✔
822

823
static bool pop_bot(threadq_t *tq, task_t *task)
830✔
824
{
825
   const abp_idx_t old_bot = relaxed_load(&tq->bot);
830✔
826
   if (old_bot == 0)
830✔
827
      return false;
828

829
   const abp_idx_t new_bot = old_bot - 1;
487✔
830
   atomic_store(&tq->bot, new_bot);
487✔
831

832
   *task = tq->deque[new_bot];
487✔
833

834
   const abp_age_t old_age = { .bits = atomic_load(&tq->age.bits) };
487✔
835
   if (new_bot > old_age.top)
487✔
836
      return true;
837

838
   atomic_store(&tq->bot, 0);
371✔
839

840
   const abp_age_t new_age = { .top = 0, .tag = old_age.tag + 1 };
371✔
841
   if (new_bot == old_age.top) {
371✔
842
      if (atomic_cas(&tq->age.bits, old_age.bits, new_age.bits))
343✔
843
         return true;
844
   }
845

846
   atomic_store(&tq->age.bits, new_age.bits);
28✔
847
   return false;
28✔
848
}
849

850
__attribute__((no_sanitize("thread")))
851
static bool pop_top(threadq_t *tq, task_t *task)
55✔
852
{
853
   const abp_age_t old_age = { .bits = atomic_load(&tq->age.bits) };
55✔
854
   const abp_idx_t bot = atomic_load(&tq->bot);
55✔
855

856
   if (bot <= old_age.top)
55✔
857
      return false;
858

859
   // This triggers a TSAN false-positive: we will never read from *task
860
   // if the CAS fails below, so it's safe to ignore
861
   *task = tq->deque[old_age.top];
52✔
862

863
   const abp_age_t new_age = {
52✔
864
      .tag = old_age.tag,
865
      .top = old_age.top + 1
52✔
866
   };
867

868
   return atomic_cas(&tq->age.bits, old_age.bits, new_age.bits);
52✔
869
}
870

871
static void globalq_put(globalq_t *gq, const task_t *tasks, size_t count)
509✔
872
{
873
   assert_lock_held(&gq->lock);
509✔
874

875
   if (gq->wptr == gq->rptr)
509✔
876
      gq->wptr = gq->rptr = 0;
367✔
877

878
   if (gq->wptr + count > gq->max) {
509✔
879
      gq->max = next_power_of_2(gq->wptr + count);
125✔
880
      gq->tasks = xrealloc_array(gq->tasks, gq->max, sizeof(task_t));
125✔
881
   }
882

883
   memcpy(gq->tasks + gq->wptr, tasks, count * sizeof(task_t));
509✔
884
   gq->wptr += count;
509✔
885
}
509✔
886

887
__attribute__((no_sanitize("thread")))
888
static bool globalq_unlocked_empty(globalq_t *gq)
3,960,994✔
889
{
890
   return relaxed_load(&gq->wptr) == relaxed_load(&gq->rptr);
3,960,994✔
891
}
892

893
static size_t globalq_take(globalq_t *gq, threadq_t *tq)
3,960,994✔
894
{
895
   if (globalq_unlocked_empty(gq))
3,960,994✔
896
      return 0;
897

898
   const int nthreads = relaxed_load(&running_threads);
397✔
899

900
   SCOPED_LOCK(gq->lock);
794✔
901

902
   if (gq->wptr == gq->rptr)
397✔
903
      return 0;
904

905
   const int remain = gq->wptr - gq->rptr;
371✔
906
   const int share = gq->wptr / nthreads;
371✔
907
   const int take = MIN(remain, MAX(MIN_TAKE, MIN(THREADQ_SIZE, share)));
371✔
908
   const int from = gq->rptr;
371✔
909

910
   gq->rptr += take;
371✔
911

912
   push_bot(tq, gq->tasks + from, take);
371✔
913
   return take;
371✔
914
}
915

916
static void execute_task(task_t *task)
509✔
917
{
918
   (*task->fn)(task->context, task->arg);
509✔
919

920
   if (task->workq != NULL) {
509✔
UNCOV
921
      entryq_t *eq = &(task->workq->entryqs[my_thread->id]);
×
922

UNCOV
923
      const entryq_ptr_t cur = { .bits = relaxed_load(&eq->comp.bits) };
×
UNCOV
924
      const int epoch = atomic_load(&task->workq->epoch);
×
UNCOV
925
      const int count = cur.epoch == epoch ? cur.count : 0;
×
UNCOV
926
      const entryq_ptr_t next = { .count = count + 1, .epoch = epoch };
×
UNCOV
927
      store_release(&eq->comp.bits, next.bits);
×
928
   }
929
   else
930
      atomic_add(&async_pending, -1);
509✔
931
}
509✔
932

933
static bool globalq_poll(globalq_t *gq, threadq_t *tq)
3,960,994✔
934
{
935
   int ntasks;
3,960,994✔
936
   if ((ntasks = globalq_take(gq, tq))) {
3,960,994✔
937
      task_t task;
371✔
938
      int comp = 0;
371✔
939
      for (; pop_bot(tq, &task); comp++)
1,201✔
940
         execute_task(&task);
459✔
941

942
      WORKQ_EVENT(comp, comp);
371✔
943
      return true;
371✔
944
   }
945
   else
946
      return false;
947
}
948

UNCOV
949
workq_t *workq_new(void *context)
×
950
{
UNCOV
951
   if (my_thread->kind != MAIN_THREAD)
×
952
      fatal_trace("work queues can only be created by the main thread");
953

UNCOV
954
   workq_t *wq = xcalloc(sizeof(workq_t));
×
955
   wq->state    = IDLE;
×
956
   wq->context  = context;
×
UNCOV
957
   wq->parallel = max_workers > 1;
×
958
   wq->epoch    = 1;
×
959

960
   return wq;
×
961
}
962

963
void workq_not_thread_safe(workq_t *wq)
×
964
{
965
   wq->parallel = false;
×
966
}
×
967

968
void workq_free(workq_t *wq)
×
969
{
UNCOV
970
   if (my_thread->kind != MAIN_THREAD)
×
971
      fatal_trace("work queues can only be freed by the main thread");
972

973
   assert(wq->state == IDLE);
×
974

975
   for (int i = 0; i < MAX_THREADS; i++)
×
UNCOV
976
      free(wq->entryqs[i].tasks);
×
977

978
   free(wq);
×
979
}
×
980

981
void workq_do(workq_t *wq, task_fn_t fn, void *arg)
×
982
{
983
   assert(wq->state == IDLE);
×
984

UNCOV
985
   entryq_t *eq = &(wq->entryqs[my_thread->id]);
×
986

UNCOV
987
   const entryq_ptr_t cur = { .bits = relaxed_load(&eq->wptr.bits) };
×
988
   const int epoch = atomic_load(&wq->epoch);
×
989
   const int wptr = cur.epoch == epoch ? cur.count : 0;
×
990

991
   if (wptr == eq->queuesz) {
×
992
      eq->queuesz = MAX(eq->queuesz * 2, 64);
×
993
      eq->tasks = xrealloc_array(eq->tasks, eq->queuesz, sizeof(task_t));
×
994
   }
995

996
   eq->tasks[wptr] = (task_t){ fn, wq->context, arg, wq };
×
997

998
   const entryq_ptr_t next = { .count = wptr + 1, .epoch = epoch };
×
UNCOV
999
   store_release(&eq->wptr.bits, next.bits);
×
1000

1001
   unsigned maxthread = relaxed_load(&wq->maxthread);
×
1002
   while (maxthread < my_thread->id) {
×
1003
      if (__atomic_cas(&wq->maxthread, &maxthread, my_thread->id))
×
1004
         break;
1005
   }
UNCOV
1006
}
×
1007

UNCOV
1008
void workq_scan(workq_t *wq, scan_fn_t fn, void *arg)
×
1009
{
UNCOV
1010
   const int maxthread = relaxed_load(&wq->maxthread);
×
UNCOV
1011
   for (int i = 0; i <= maxthread; i++) {
×
UNCOV
1012
      entryq_t *eq = &(wq->entryqs[my_thread->id]);
×
UNCOV
1013
      const entryq_ptr_t wptr = { .bits = load_acquire(&eq->wptr.bits) };
×
UNCOV
1014
      for (int j = 0; j < wptr.count; j++)
×
UNCOV
1015
         (*fn)(wq->context, eq->tasks[j].arg, arg);
×
1016
   }
UNCOV
1017
}
×
1018

1019
static int estimate_depth(threadq_t *tq)
3,500✔
1020
{
1021
   const abp_age_t age = { .bits = relaxed_load(&tq->age.bits) };
3,500✔
1022
   const abp_idx_t bot = relaxed_load(&tq->bot);
3,500✔
1023

1024
   return bot <= age.top ? 0 : bot - age.top;
3,500✔
1025
}
1026

1027
static threadq_t *get_thread_queue(int id)
3,500✔
1028
{
1029
   assert(id < MAX_THREADS);
3,500✔
1030

1031
   nvc_thread_t *t = atomic_load(&(threads[id]));
3,500✔
1032
   if (t == NULL)
3,500✔
1033
      return NULL;
1034

1035
   return &(t->queue);
3,500✔
1036
}
1037

1038
static uint32_t fast_rand(void)
992✔
1039
{
1040
   uint32_t state = my_thread->rngstate;
992✔
1041
   state ^= (state << 13);
992✔
1042
   state ^= (state >> 17);
992✔
1043
   state ^= (state << 5);
992✔
1044
   return (my_thread->rngstate = state);
992✔
1045
}
1046

1047
static threadq_t *find_victim(void)
1,006✔
1048
{
1049
   threadq_t *last = get_thread_queue(my_thread->victim);
1,006✔
1050
   if (last != NULL && estimate_depth(last) > 0)
1,006✔
1051
      return last;
1052

1053
   const int maxthread = relaxed_load(&max_thread_id);
992✔
1054
   const int start = fast_rand() % (maxthread + 1);
992✔
1055
   int idx = start;
992✔
1056
   do {
3,467✔
1057
      if (idx != my_thread->id) {
3,467✔
1058
         threadq_t *q = get_thread_queue(idx);
2,494✔
1059
         if (q != NULL && estimate_depth(q) > 0) {
2,494✔
1060
            my_thread->victim = idx;
41✔
1061
            return q;
41✔
1062
         }
1063
      }
1064
   } while ((idx = (idx + 1) % (maxthread + 1)) != start);
3,426✔
1065

1066
   return NULL;
1067
}
1068

1069
static bool steal_task(void)
1,006✔
1070
{
1071
   threadq_t *tq = find_victim();
1,006✔
1072
   if (tq == NULL)
1,006✔
1073
      return false;
1074

1075
   task_t task;
55✔
1076
   if (pop_top(tq, &task)) {
55✔
1077
      WORKQ_EVENT(steals, 1);
50✔
1078
      execute_task(&task);
50✔
1079
      WORKQ_EVENT(comp, 1);
50✔
1080
      return true;
50✔
1081
   }
1082

1083
   return false;
1084
}
1085

1086
static void progressive_backoff(void)
3,972,759✔
1087
{
1088
   if (my_thread->spins++ < YIELD_SPINS)
3,972,759✔
1089
      spin_wait();
3,852,395✔
1090
   else {
1091
#ifdef __MINGW32__
1092
      SwitchToThread();
1093
#else
1094
      sched_yield();
120,364✔
1095
#endif
1096
      my_thread->spins = 0;
120,364✔
1097
   }
1098
}
3,972,759✔
1099

1100
static void *worker_thread(void *arg)
163✔
1101
{
1102
   mspace_stack_limit(MSPACE_CURRENT_FRAME);
163✔
1103

1104
   do {
1,355✔
1105
      if (globalq_poll(&globalq, &(my_thread->queue)) || steal_task())
1,355✔
1106
         my_thread->spins = 0;  // Did work
399✔
1107
      else if (my_thread->spins++ < 2)
956✔
1108
         spin_wait();
594✔
1109
      else {
1110
         platform_mutex_lock(&wakelock);
362✔
1111
         {
1112
            if (!relaxed_load(&should_stop))
362✔
1113
               platform_cond_wait(&wake_workers, &wakelock);
362✔
1114
         }
1115
         platform_mutex_unlock(&wakelock);
362✔
1116
      }
1117
   } while (likely(!relaxed_load(&should_stop)));
1,355✔
1118

1119
   return NULL;
163✔
1120
}
1121

1122
static void create_workers(int needed)
509✔
1123
{
1124
   assert(my_thread->kind == MAIN_THREAD);
509✔
1125

1126
   if (relaxed_load(&should_stop))
509✔
1127
      return;
1128

1129
   while (relaxed_load(&running_threads) < MIN(max_workers, needed)) {
672✔
1130
      static int counter = 0;
163✔
1131
      thread_name_t name;
163✔
1132
      checked_sprintf(name, THREAD_NAME_LEN, "worker thread %d",
163✔
1133
                      atomic_add(&counter, 1));
163✔
1134
      SCOPED_LOCK(stop_lock);   // Avoid races with stop_world
326✔
1135
      nvc_thread_t *thread =
163✔
1136
         thread_new(worker_thread, NULL, WORKER_THREAD, name);
163✔
1137
      thread_start(thread);
163✔
1138
   }
1139

1140
   platform_cond_broadcast(&wake_workers);
509✔
1141
}
1142

1143
void workq_start(workq_t *wq)
×
1144
{
1145
   assert(my_thread->kind == MAIN_THREAD);
×
1146

1147
   const int epoch = relaxed_load(&wq->epoch);
×
1148
   const int maxthread = relaxed_load(&wq->maxthread);
×
1149

1150
   assert(wq->state == IDLE);
×
UNCOV
1151
   wq->state = START;
×
1152

1153
   int nserial = 0, nparallel = 0;
×
1154
   for (int i = 0; i <= maxthread; i++) {
×
UNCOV
1155
      entryq_t *eq = &wq->entryqs[i];
×
1156
      const entryq_ptr_t wptr = { .bits = load_acquire(&eq->wptr.bits) };
×
1157
      if (wptr.epoch == epoch) {
×
1158
         // Only bump epoch if there are tasks to run
1159
         if (nserial + nparallel == 0)
×
1160
            atomic_add(&wq->epoch, 1);
×
1161

UNCOV
1162
         assert(wptr.count > 0);
×
1163

1164
         if (i == maxthread && nserial + nparallel == 0 && wptr.count == 1) {
×
1165
            execute_task(&(eq->tasks[0]));   // Only one task in total
×
UNCOV
1166
            nserial++;
×
1167
         }
UNCOV
1168
         else if (wq->parallel) {
×
UNCOV
1169
            if (nparallel == 0)
×
1170
               nvc_lock(&globalq.lock);   // Lazily acquire lock
×
1171
            globalq_put(&globalq, eq->tasks, wptr.count);
×
1172
            nparallel += wptr.count;
×
1173
         }
1174
         else {
UNCOV
1175
            for (int j = 0; j < wptr.count; j++)
×
1176
               execute_task(&(eq->tasks[j]));
×
UNCOV
1177
            nserial += wptr.count;
×
1178
         }
1179
      }
1180
   }
1181

UNCOV
1182
   if (wq->parallel && nparallel > 0) {
×
1183
      nvc_unlock(&globalq.lock);
×
1184
      create_workers(nparallel);
×
1185
   }
UNCOV
1186
}
×
1187

1188
static int workq_outstanding(workq_t *wq)
×
1189
{
UNCOV
1190
   assert(wq->state == START);
×
1191

1192
   const int epoch = atomic_load(&wq->epoch);
×
1193
   const int maxthread = relaxed_load(&max_thread_id);
×
1194

UNCOV
1195
   int pending = 0;
×
1196
   for (int i = 0; i <= maxthread; i++) {
×
1197
      entryq_t *eq = &(wq->entryqs[i]);
×
1198

UNCOV
1199
      const entryq_ptr_t wptr = { .bits = load_acquire(&eq->wptr.bits) };
×
1200
      if (wptr.epoch == epoch - 1)
×
UNCOV
1201
         pending += wptr.count;
×
1202

1203
      const entryq_ptr_t comp = { .bits = load_acquire(&eq->comp.bits) };
×
1204
      if (comp.epoch == epoch)
×
UNCOV
1205
         pending -= comp.count;
×
1206
   }
1207

UNCOV
1208
   assert(pending >= 0);
×
UNCOV
1209
   return pending;
×
1210
}
1211

UNCOV
1212
static void workq_parallel_drain(workq_t *wq)
×
1213
{
UNCOV
1214
   if (workq_outstanding(wq) > 0) {
×
1215
      while (globalq_poll(&globalq, &(my_thread->queue)));
×
UNCOV
1216
      while (steal_task());
×
1217

1218
      while (workq_outstanding(wq) > 0)
×
1219
         progressive_backoff();
×
1220
   }
1221

1222
   wq->state = IDLE;
×
UNCOV
1223
}
×
1224

UNCOV
1225
void workq_drain(workq_t *wq)
×
1226
{
UNCOV
1227
   if (my_thread->kind != MAIN_THREAD)
×
1228
      fatal_trace("workq_drain can only be called from the main thread");
1229

UNCOV
1230
   if (wq->parallel)
×
UNCOV
1231
      workq_parallel_drain(wq);
×
1232
   else {
UNCOV
1233
      assert(wq->state == START);
×
UNCOV
1234
      wq->state = IDLE;
×
1235
   }
UNCOV
1236
}
×
1237

1238
void async_do(task_fn_t fn, void *context, void *arg)
509✔
1239
{
1240
   if (max_workers == 1)
509✔
UNCOV
1241
      (*fn)(context, arg);   // Single CPU
×
1242
   else {
1243
      const int npending = atomic_add(&async_pending, 1);
509✔
1244
      create_workers(npending + 1 /* Do not count main thread */);
509✔
1245

1246
      task_t tasks[1] = {
509✔
1247
         { fn, context, arg, NULL }
1248
      };
1249
      SCOPED_LOCK(globalq.lock);
1,018✔
1250
      globalq_put(&globalq, tasks, 1);
509✔
1251
   }
1252
}
509✔
1253

1254
void async_barrier(void)
8,743✔
1255
{
1256
   while (atomic_load(&async_pending) > 0) {
3,977,125✔
1257
      if (!globalq_poll(&globalq, &(my_thread->queue)))
3,959,639✔
1258
         progressive_backoff();
3,959,617✔
1259
   }
1260
}
8,743✔
1261

1262
void async_free(void *ptr)
21,718✔
1263
{
1264
   if (ptr == NULL)
21,718✔
1265
      return;
1266

1267
   // TODO: free when all threads in quiescent state
1268
}
1269

1270
#ifdef POSIX_SUSPEND
1271
static void suspend_handler(int sig, siginfo_t *info, void *context)
4,872✔
1272
{
1273
   if (info->si_pid != getpid())
4,872✔
1274
      return;   // Not sent by us, ignore it
2,436✔
1275
   else if (sig == SIGRESUME)
4,872✔
1276
      return;
1277

1278
   const int olderrno = errno;
2,436✔
1279

1280
   if (my_thread != NULL) {
2,436✔
1281
      struct cpu_state cpu;
2,436✔
1282
      fill_cpu_state(&cpu, (ucontext_t *)context);
2,436✔
1283

1284
      stop_world_fn_t callback = atomic_load(&stop_callback);
2,436✔
1285
      void *arg = atomic_load(&stop_arg);
2,436✔
1286

1287
      (*callback)(my_thread->id, &cpu, arg);
2,436✔
1288
   }
1289

1290
   sem_post(&stop_sem);
2,436✔
1291

1292
   sigset_t mask;
2,436✔
1293
   sigfillset(&mask);
2,436✔
1294
   unmask_fatal_signals(&mask);
2,436✔
1295
   sigdelset(&mask, SIGRESUME);
2,436✔
1296

1297
   sigsuspend(&mask);
2,436✔
1298

1299
   sem_post(&stop_sem);
2,436✔
1300

1301
   errno = olderrno;
2,436✔
1302
}
1303
#endif
1304

1305
void stop_world(stop_world_fn_t callback, void *arg)
833✔
1306
{
1307
   nvc_lock(&stop_lock);
833✔
1308

1309
   atomic_store(&stop_callback, callback);
833✔
1310
   atomic_store(&stop_arg, arg);
833✔
1311

1312
#ifdef __MINGW32__
1313
   const int maxthread = relaxed_load(&max_thread_id);
1314
   for (int i = 0; i <= maxthread; i++) {
1315
      nvc_thread_t *thread = atomic_load(&threads[i]);
1316
      if (thread == NULL || thread == my_thread)
1317
         continue;
1318

1319
      if (SuspendThread(thread->handle) != 0)
1320
         fatal_errno("SuspendThread");
1321

1322
      CONTEXT context;
1323
      context.ContextFlags = CONTEXT_INTEGER | CONTEXT_CONTROL;
1324
      if (!GetThreadContext(thread->handle, &context))
1325
         fatal_errno("GetThreadContext");
1326

1327
      struct cpu_state cpu;
1328
      fill_cpu_state(&cpu, &context);
1329

1330
      (*callback)(thread->id, &cpu, arg);
1331
   }
1332
#elif defined __APPLE__
1333
   const int maxthread = relaxed_load(&max_thread_id);
1334
   for (int i = 0; i <= maxthread; i++) {
1335
      nvc_thread_t *thread = atomic_load(&threads[i]);
1336
      if (thread == NULL || thread == my_thread)
1337
         continue;
1338

1339
      assert(thread->port != MACH_PORT_NULL);
1340

1341
      kern_return_t kern_result;
1342
      do {
1343
         kern_result = thread_suspend(thread->port);
1344
      } while (kern_result == KERN_ABORTED);
1345

1346
      if (kern_result != KERN_SUCCESS)
1347
         fatal_trace("failed to suspend thread %d (%d)", i, kern_result);
1348

1349
#ifdef ARCH_ARM64
1350
      arm_thread_state64_t state;
1351
      mach_msg_type_number_t count = ARM_THREAD_STATE64_COUNT;
1352
      thread_state_flavor_t flavor = ARM_THREAD_STATE64;
1353
#else
1354
      x86_thread_state64_t state;
1355
      mach_msg_type_number_t count = x86_THREAD_STATE64_COUNT;
1356
      thread_state_flavor_t flavor = x86_THREAD_STATE64;
1357
#endif
1358
      kern_result = thread_get_state(thread->port, flavor,
1359
                                     (natural_t *)&state, &count);
1360
      if (kern_result != KERN_SUCCESS)
1361
         fatal_trace("failed to get thread %d state (%d)", i, kern_result);
1362

1363
      // Fake a ucontext_t that we can pass to fill_cpu_state
1364
      ucontext_t uc;
1365
      typeof(*uc.uc_mcontext) mc;
1366
      uc.uc_mcontext = &mc;
1367
      mc.__ss = state;
1368

1369
      struct cpu_state cpu;
1370
      fill_cpu_state(&cpu, &uc);
1371

1372
      (*callback)(thread->id, &cpu, arg);
1373
   }
1374
#elif defined __SANITIZE_THREAD__
1375
   // https://github.com/google/sanitizers/issues/1179
1376
   fatal_trace("stop_world is not supported with tsan");
1377
#else
1378
   assert(sem_trywait(&stop_sem) == -1 && errno == EAGAIN);
833✔
1379

1380
   int signalled = 0;
833✔
1381
   const int maxthread = relaxed_load(&max_thread_id);
833✔
1382
   for (int i = 0; i <= maxthread; i++) {
4,283✔
1383
      nvc_thread_t *thread = atomic_load(&threads[i]);
3,450✔
1384
      if (thread == NULL || thread == my_thread)
3,450✔
1385
         continue;
1,014✔
1386

1387
      PTHREAD_CHECK(pthread_kill, thread->handle, SIGSUSPEND);
2,436✔
1388
      signalled++;
2,436✔
1389
   }
1390

1391
   struct timespec ts;
833✔
1392
   if (clock_gettime(CLOCK_REALTIME, &ts) != 0)
833✔
UNCOV
1393
      fatal_errno("clock_gettime");
×
1394

1395
   ts.tv_sec += SUSPEND_TIMEOUT;
833✔
1396

1397
   for (; signalled > 0; signalled--) {
3,269✔
1398
      if (sem_timedwait(&stop_sem, &ts) != 0)
2,436✔
1399
         fatal_trace("timeout waiting for %d threads to suspend", signalled);
1400
   }
1401

1402
   assert(sem_trywait(&stop_sem) == -1 && errno == EAGAIN);
833✔
1403
#endif
1404

1405
   struct cpu_state cpu;
833✔
1406
   capture_registers(&cpu);
833✔
1407

1408
   (*callback)(my_thread->id, &cpu, arg);
833✔
1409
}
833✔
1410

1411
void start_world(void)
833✔
1412
{
1413
   assert_lock_held(&stop_lock);
833✔
1414

1415
   const int maxthread = relaxed_load(&max_thread_id);
833✔
1416

1417
#ifdef __MINGW32__
1418
   for (int i = 0; i <= maxthread; i++) {
1419
      nvc_thread_t *thread = atomic_load(&threads[i]);
1420
      if (thread == NULL || thread == my_thread)
1421
         continue;
1422

1423
      if (ResumeThread(thread->handle) != 1)
1424
         fatal_errno("ResumeThread");
1425
   }
1426
#elif defined __APPLE__
1427
   for (int i = 0; i <= maxthread; i++) {
1428
      nvc_thread_t *thread = atomic_load(&threads[i]);
1429
      if (thread == NULL || thread == my_thread)
1430
         continue;
1431

1432
      kern_return_t kern_result;
1433
      do {
1434
         kern_result = thread_resume(thread->port);
1435
      } while (kern_result == KERN_ABORTED);
1436

1437
      if (kern_result != KERN_SUCCESS)
1438
         fatal_trace("failed to resume thread %d (%d)", i, kern_result);
1439
   }
1440
#else
1441
   assert(sem_trywait(&stop_sem) == -1 && errno == EAGAIN);
833✔
1442

1443
   int signalled = 0;
1444
   for (int i = 0; i <= maxthread; i++) {
4,283✔
1445
      nvc_thread_t *thread = atomic_load(&threads[i]);
3,450✔
1446
      if (thread == NULL || thread == my_thread)
3,450✔
1447
         continue;
1,014✔
1448

1449
      PTHREAD_CHECK(pthread_kill, thread->handle, SIGRESUME);
2,436✔
1450
      signalled++;
2,436✔
1451
   }
1452

1453
   struct timespec ts;
833✔
1454
   if (clock_gettime(CLOCK_REALTIME, &ts) != 0)
833✔
UNCOV
1455
      fatal_errno("clock_gettime");
×
1456

1457
   ts.tv_sec += SUSPEND_TIMEOUT;
833✔
1458

1459
   for (; signalled > 0; signalled--) {
3,269✔
1460
      if (sem_timedwait(&stop_sem, &ts) != 0)
2,436✔
1461
         fatal_trace("timeout waiting for %d threads to resume", signalled);
1462
   }
1463

1464
   assert(sem_trywait(&stop_sem) == -1 && errno == EAGAIN);
833✔
1465
#endif
1466

1467
   nvc_unlock(&stop_lock);
833✔
1468
}
833✔
1469

1470
void thread_wx_mode(wx_mode_t mode)
27,851✔
1471
{
1472
#ifdef __APPLE__
1473
   pthread_jit_write_protect_np(mode == WX_EXECUTE);
1474
#else
1475
   // Could use Intel memory protection keys here
1476
#endif
1477
}
27,851✔
1478

1479
barrier_t *barrier_new(int count)
1✔
1480
{
1481
   barrier_t *b = xcalloc(sizeof(barrier_t));
1✔
1482
   b->count = count;
1✔
1483
   return b;
1✔
1484
}
1485

1486
void barrier_free(barrier_t *b)
1✔
1487
{
1488
   free(b);
1✔
1489
}
1✔
1490

1491
void barrier_wait(barrier_t *b)
88✔
1492
{
1493
   const int count = relaxed_load(&b->count);
88✔
1494
   const int passed = relaxed_load(&b->passed);
88✔
1495

1496
   if (atomic_fetch_add(&b->reached, 1) == count - 1) {
88✔
1497
      // Last thread to pass barrier
1498
      relaxed_store(&b->reached, 0);
22✔
1499
      store_release(&b->passed, passed + 1);
22✔
1500
   }
1501
   else {
1502
      while (load_acquire(&b->passed) == passed)
13,208✔
1503
         progressive_backoff();
13,142✔
1504
   }
1505
}
88✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc