• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nickg / nvc / 9730737038

30 Jun 2024 08:35AM UTC coverage: 91.564% (-0.004%) from 91.568%
9730737038

push

github

nickg
Add NVC_CONCURRENT_JOBS environment variable

7 of 11 new or added lines in 1 file covered. (63.64%)

69 existing lines in 1 file now uncovered.

56823 of 62058 relevant lines covered (91.56%)

661466.61 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.66
/src/thread.c
1
//
2
//  Copyright (C) 2021-2023  Nick Gasson
3
//
4
//  This program is free software: you can redistribute it and/or modify
5
//  it under the terms of the GNU General Public License as published by
6
//  the Free Software Foundation, either version 3 of the License, or
7
//  (at your option) any later version.
8
//
9
//  This program is distributed in the hope that it will be useful,
10
//  but WITHOUT ANY WARRANTY; without even the implied warranty of
11
//  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
//  GNU General Public License for more details.
13
//
14
//  You should have received a copy of the GNU General Public License
15
//  along with this program.  If not, see <http://www.gnu.org/licenses/>.
16
//
17

18
#include "util.h"
19
#include "array.h"
20
#include "cpustate.h"
21
#include "rt/mspace.h"
22
#include "thread.h"
23

24
#include <assert.h>
25
#include <errno.h>
26
#include <math.h>
27
#include <signal.h>
28
#include <stdlib.h>
29
#include <string.h>
30
#include <inttypes.h>
31
#include <unistd.h>
32
#include <semaphore.h>
33

34
#ifdef HAVE_PTHREAD
35
#include <pthread.h>
36
#elif !defined __MINGW32__
37
#error missing pthread support
38
#endif
39

40
#ifdef __SANITIZE_THREAD__
41
#include <sanitizer/tsan_interface.h>
42
#endif
43

44
#if defined __MINGW32__
45
#define WIN32_LEAN_AND_MEAN
46
#include <windows.h>
47
#elif defined __APPLE__
48
#define task_t __task_t
49
#include <mach/thread_act.h>
50
#include <mach/machine.h>
51
#undef task_t
52
#endif
53

54
#define LOCK_SPINS      64
55
#define YIELD_SPINS     32
56
#define MIN_TAKE        8
57
#define PARKING_BAYS    64
58
#define SUSPEND_TIMEOUT 1
59

60
#if !defined __MINGW32__ && !defined __APPLE__
61
#define POSIX_SUSPEND 1
62
#define SIGSUSPEND    SIGRTMIN
63
#define SIGRESUME     (SIGRTMIN + 1)
64
#endif
65

66
typedef struct {
67
   int64_t locks;
68
   int64_t spins;
69
   int64_t contended;
70
   int64_t parks;
71
   int64_t spurious;
72
   int64_t retries;
73
} __attribute__((aligned(64))) lock_stats_t;
74

75
STATIC_ASSERT(sizeof(lock_stats_t) == 64)
76

77
#ifdef DEBUG
78
#define LOCK_EVENT(what, n) do {                  \
79
      if (likely(my_thread != NULL))              \
80
         lock_stats[my_thread->id].what += (n);   \
81
   } while (0)
82

83
#define WORKQ_EVENT(what, n) do {                 \
84
      if (likely(my_thread != NULL))              \
85
         workq_stats[my_thread->id].what += (n);  \
86
   } while (0)
87
#else
88
#define LOCK_EVENT(what, n) (void)(n)
89
#define WORKQ_EVENT(what, n) (void)(n)
90
#endif
91

92
#ifdef __SANITIZE_THREAD__
93
#define TSAN_PRE_LOCK(addr) \
94
   __tsan_mutex_pre_lock(addr, __tsan_mutex_linker_init);
95
#define TSAN_POST_LOCK(addr) \
96
   __tsan_mutex_post_lock(addr, __tsan_mutex_linker_init, 0);
97
#define TSAN_PRE_UNLOCK(addr) \
98
   __tsan_mutex_pre_unlock(addr, __tsan_mutex_linker_init);
99
#define TSAN_POST_UNLOCK(addr) \
100
   __tsan_mutex_post_unlock(addr, __tsan_mutex_linker_init);
101
#else
102
#define TSAN_PRE_LOCK(addr)
103
#define TSAN_POST_LOCK(addr)
104
#define TSAN_PRE_UNLOCK(addr)
105
#define TSAN_POST_UNLOCK(addr)
106
#endif
107

108
#ifndef PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP
109
#define PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP PTHREAD_MUTEX_INITIALIZER
110
#endif
111

112
#define PTHREAD_CHECK(op, ...) do {             \
113
      if (unlikely(op(__VA_ARGS__) != 0))       \
114
         fatal_errno(#op);                      \
115
   } while (0)
116

117
// Lock implementation based on WTF::Lock in WebKit
118
//   https://webkit.org/blog/6161/locking-in-webkit/
119

120
typedef enum {
121
   IS_LOCKED  = (1 << 0),
122
   HAS_PARKED = (1 << 1)
123
} lock_bits_t;
124

125
typedef struct {
126
#ifdef __MINGW32__
127
   CRITICAL_SECTION   mutex;
128
   CONDITION_VARIABLE cond;
129
#else
130
   pthread_mutex_t    mutex;
131
   pthread_cond_t     cond;
132
#endif
133
   int                parked;
134
} __attribute__((aligned(64))) parking_bay_t;
135

136
typedef bool (*park_fn_t)(parking_bay_t *, void *);
137
typedef void (*unpark_fn_t)(parking_bay_t *, void *);
138

139
typedef struct {
140
   task_fn_t  fn;
141
   void      *context;
142
   void      *arg;
143
   workq_t   *workq;
144
} task_t;
145

146
// Work sealing task queue is based on:
147
//   Arora, N. S., Blumofe, R. D., and Plaxton, C. G.
148
//   Thread scheduling for multiprogrammed multiprocessors.
149
//   Theory of Computing Systems 34, 2 (2001), 115-144.
150

151
typedef uint32_t abp_idx_t;
152
typedef uint32_t abp_tag_t;
153

154
typedef union {
155
   struct {
156
      abp_idx_t top;
157
      abp_tag_t tag;
158
   };
159
   uint64_t bits;
160
} abp_age_t;
161

162
STATIC_ASSERT(sizeof(abp_age_t) <= 8);
163

164
#define THREADQ_SIZE 256
165

166
typedef struct {
167
   task_t    deque[THREADQ_SIZE];
168
   abp_age_t age;
169
   abp_idx_t bot;
170
} __attribute__((aligned(64))) threadq_t;
171

172
typedef enum { IDLE, START } workq_state_t;
173

174
typedef union {
175
   struct {
176
      uint32_t count;
177
      uint32_t epoch;
178
   };
179
   uint64_t bits;
180
} entryq_ptr_t;
181

182
STATIC_ASSERT(sizeof(entryq_ptr_t) <= 8);
183

184
typedef struct {
185
   task_t       *tasks;
186
   unsigned      queuesz;
187
   entryq_ptr_t  wptr;
188
   entryq_ptr_t  comp;
189
} __attribute__((aligned(64))) entryq_t;
190

191
STATIC_ASSERT(sizeof(entryq_t) == 64);
192

193
struct _workq {
194
   void          *context;
195
   workq_state_t  state;
196
   unsigned       epoch;
197
   unsigned       maxthread;
198
   bool           parallel;
199
   entryq_t       entryqs[MAX_THREADS];
200
};
201

202
typedef struct {
203
   int64_t comp;
204
   int64_t steals;
205
   int64_t wakes;
206
} __attribute__((aligned(64))) workq_stats_t;
207

208
STATIC_ASSERT(sizeof(workq_stats_t) == 64);
209

210
typedef enum {
211
   MAIN_THREAD,
212
   USER_THREAD,
213
   WORKER_THREAD,
214
} thread_kind_t;
215

216
struct _nvc_thread {
217
   unsigned        id;
218
   thread_kind_t   kind;
219
   unsigned        spins;
220
   threadq_t       queue;
221
   char           *name;
222
   thread_fn_t     fn;
223
   void           *arg;
224
   int             victim;
225
   uint32_t        rngstate;
226
#ifdef __MINGW32__
227
   HANDLE          handle;
228
   void           *retval;
229
#else
230
   pthread_t       handle;
231
#endif
232
#ifdef __APPLE__
233
   thread_port_t   port;
234
#endif
235
};
236

237
typedef struct {
238
   nvc_lock_t   lock;
239
   task_t      *tasks;
240
   unsigned     wptr;
241
   unsigned     rptr;
242
   unsigned     max;
243
} globalq_t;
244

245
typedef struct _barrier {
246
   unsigned count;
247
   unsigned reached;
248
   unsigned passed;
249
} __attribute__((aligned(64))) barrier_t;
250

251
static parking_bay_t parking_bays[PARKING_BAYS] = {
252
#ifndef __MINGW32__
253
   [0 ... PARKING_BAYS - 1] = {
254
      PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP,
255
      PTHREAD_COND_INITIALIZER
256
   }
257
#endif
258
};
259

260
static nvc_thread_t    *threads[MAX_THREADS];
261
static unsigned         max_workers = 0;
262
static int              running_threads = 0;
263
static unsigned         max_thread_id = 0;
264
static bool             should_stop = false;
265
static globalq_t        globalq __attribute__((aligned(64)));
266
static int              async_pending __attribute__((aligned(64))) = 0;
267
static nvc_lock_t       stop_lock = 0;
268
static stop_world_fn_t  stop_callback = NULL;
269
static void            *stop_arg = NULL;
270

271
#ifdef __MINGW32__
272
static CONDITION_VARIABLE wake_workers = CONDITION_VARIABLE_INIT;
273
static CRITICAL_SECTION   wakelock;
274
#else
275
static pthread_cond_t     wake_workers = PTHREAD_COND_INITIALIZER;
276
static pthread_mutex_t    wakelock = PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP;
277
#endif
278

279
#ifdef POSIX_SUSPEND
280
static sem_t stop_sem;
281
#endif
282

283
#ifdef DEBUG
284
static lock_stats_t  lock_stats[MAX_THREADS];
285
static workq_stats_t workq_stats[MAX_THREADS];
286
#endif
287

288
static __thread nvc_thread_t *my_thread = NULL;
289

290
static parking_bay_t *parking_bay_for(void *cookie);
291

292
#ifdef POSIX_SUSPEND
293
static void suspend_handler(int sig, siginfo_t *info, void *context);
294
#endif
295

296
#ifdef DEBUG
297
static void print_lock_stats(void)
×
298
{
299
   lock_stats_t s = {};
×
300
   workq_stats_t t = {};
×
301
   for (int i = 0; i < MAX_THREADS; i++) {
×
302
      s.locks     += relaxed_load(&(lock_stats[i].locks));
×
303
      s.contended += relaxed_load(&(lock_stats[i].contended));
×
304
      s.parks     += relaxed_load(&(lock_stats[i].parks));
×
305
      s.spins     += relaxed_load(&(lock_stats[i].spins));
×
306
      s.spurious  += relaxed_load(&(lock_stats[i].spurious));
×
307
      s.retries   += relaxed_load(&(lock_stats[i].retries));
×
308

309
      t.comp   += relaxed_load(&(workq_stats[i].comp));
×
310
      t.steals += relaxed_load(&(workq_stats[i].steals));
×
311
      t.wakes  += relaxed_load(&(workq_stats[i].wakes));
×
312
   }
313

314
   printf("\nLock statistics:\n");
×
315
   printf("\tTotal locks      : %"PRIi64"\n", s.locks);
×
316
   printf("\tContended        : %"PRIi64" (%.1f%%)\n",
×
317
          s.contended, 100.0 * ((double)s.contended / (double)s.locks));
×
318
   printf("\tParked           : %"PRIi64" (%.1f%%)\n",
×
319
          s.parks, 100.0 * ((double)s.parks / (double)s.locks));
×
320
   printf("\tAverage spins    : %.1f\n", (double)s.spins / (double)s.retries);
×
321
   printf("\tSpurious wakeups : %"PRIi64"\n", s.spurious);
×
322

323
   printf("\nWork queue statistics:\n");
×
324
   printf("\tCompleted tasks  : %"PRIi64"\n", t.comp);
×
325
   printf("\tSteals           : %"PRIi64"\n", t.steals);
×
326
   printf("\tWakeups          : %"PRIi64"\n", t.wakes);
×
327
}
×
328
#endif
329

330
#ifdef __MINGW32__
331
static inline void platform_mutex_lock(LPCRITICAL_SECTION lpcs)
332
{
333
   EnterCriticalSection(lpcs);
334
}
335

336
static inline void platform_mutex_unlock(LPCRITICAL_SECTION lpcs)
337
{
338
   LeaveCriticalSection(lpcs);
339
}
340

341
static inline void platform_cond_broadcast(PCONDITION_VARIABLE pcv)
342
{
343
   WakeAllConditionVariable(pcv);
344
}
345

346
static inline void platform_cond_wait(PCONDITION_VARIABLE pcv,
347
                                      LPCRITICAL_SECTION lpcs)
348
{
349
   SleepConditionVariableCS(pcv, lpcs, INFINITE);
350
}
351
#else
352
static inline void platform_mutex_lock(pthread_mutex_t *mtx)
7,064✔
353
{
354
   PTHREAD_CHECK(pthread_mutex_lock, mtx);
7,064✔
355
}
7,064✔
356

357
static inline void platform_mutex_unlock(pthread_mutex_t *mtx)
7,064✔
358
{
359
   PTHREAD_CHECK(pthread_mutex_unlock, mtx);
7,064✔
360
}
7,064✔
361

362
static inline void platform_cond_broadcast(pthread_cond_t *cond)
5,593✔
363
{
364
   PTHREAD_CHECK(pthread_cond_broadcast, cond);
5,593✔
365
}
5,593✔
366

367
static inline void platform_cond_wait(pthread_cond_t *cond,
1,609✔
368
                                      pthread_mutex_t *mtx)
369
{
370
   PTHREAD_CHECK(pthread_cond_wait, cond, mtx);
1,609✔
371
}
1,609✔
372
#endif
373

374
static void join_worker_threads(void)
4,584✔
375
{
376
   SCOPED_A(nvc_thread_t *) join_list = AINIT;
9,168✔
377

378
   for (int i = 1; i < MAX_THREADS; i++) {
293,376✔
379
      nvc_thread_t *t = atomic_load(&threads[i]);
288,792✔
380
      if (t != NULL)
288,792✔
381
         APUSH(join_list, t);
288,792✔
382
   }
383

384
   // Lock the wake mutex here to avoid races with workers sleeping
385
   platform_mutex_lock(&wakelock);
4,584✔
386
   {
387
      atomic_store(&should_stop, true);
4,584✔
388
      platform_cond_broadcast(&wake_workers);
4,584✔
389
   }
390
   platform_mutex_unlock(&wakelock);
4,584✔
391

392
   for (int i = 0; i < join_list.count; i++) {
4,640✔
393
      nvc_thread_t *t = join_list.items[i];
56✔
394

395
      switch (relaxed_load(&t->kind)) {
56✔
396
      case WORKER_THREAD:
56✔
397
         thread_join(t);
56✔
398
         continue;  // Freed thread struct
56✔
399
      case USER_THREAD:
×
400
      case MAIN_THREAD:
401
         fatal_trace("leaked a user thread: %s", t->name);
×
402
      }
403
   }
404

405
   assert(atomic_load(&running_threads) == 1);
4,584✔
406

407
#ifdef DEBUG
408
   for (int i = 0; i < PARKING_BAYS; i++)
297,960✔
409
      assert(parking_bays[i].parked == 0);
293,376✔
410
#endif
411
}
4,584✔
412

413
static nvc_thread_t *thread_new(thread_fn_t fn, void *arg,
4,649✔
414
                                thread_kind_t kind, char *name)
415
{
416
   nvc_thread_t *thread = xcalloc(sizeof(nvc_thread_t));
4,649✔
417
   thread->name     = name;
4,649✔
418
   thread->fn       = fn;
4,649✔
419
   thread->arg      = arg;
4,649✔
420
   thread->kind     = kind;
4,649✔
421
   thread->rngstate = rand();
4,649✔
422

423
   atomic_store(&thread->queue.age.bits, 0);
4,649✔
424
   atomic_store(&thread->queue.bot, 0);
4,649✔
425

426
   int id = 0;
4,649✔
427
   for (; id < MAX_THREADS; id++) {
4,775✔
428
      if (relaxed_load(&(threads[id])) != NULL)
4,775✔
429
         continue;
126✔
430
      else if (atomic_cas(&(threads[id]), NULL, thread))
4,649✔
431
         break;
432
   }
433

434
   if (id == MAX_THREADS)
4,649✔
435
      fatal_trace("cannot create more than %d threads", MAX_THREADS);
×
436

437
   thread->id = id;
4,649✔
438

439
   unsigned max = relaxed_load(&max_thread_id);
4,649✔
440
   while (max < id) {
4,649✔
441
      if (__atomic_cas(&max_thread_id, &max, id))
70✔
442
         break;
443
   }
444

445
   atomic_add(&running_threads, 1);
4,649✔
446
   return thread;
4,649✔
447
}
448

449
#ifdef POSIX_SUSPEND
450
static void unmask_fatal_signals(sigset_t *mask)
6,472✔
451
{
452
   sigdelset(mask, SIGQUIT);
6,472✔
453
   sigdelset(mask, SIGABRT);
6,472✔
454
   sigdelset(mask, SIGTERM);
6,472✔
455
}
6,472✔
456
#endif
457

458
#ifdef __APPLE__
459
static void reset_mach_ports(void)
460
{
461
   for (int i = 0; i < MAX_THREADS; i++) {
462
      nvc_thread_t *t = atomic_load(&(threads[i]));
463
      if (t == NULL)
464
         continue;
465

466
      // Mach ports are not valid after fork
467
      t->port = pthread_mach_thread_np(t->handle);
468
   }
469
}
470
#endif
471

472
void thread_init(void)
4,579✔
473
{
474
   assert(my_thread == NULL);
4,579✔
475

476
   my_thread = thread_new(NULL, NULL, MAIN_THREAD, xstrdup("main thread"));
4,579✔
477

478
#ifdef __MINGW32__
479
   my_thread->handle = GetCurrentThread();
480
#else
481
   my_thread->handle = pthread_self();
4,579✔
482
#endif
483

484
#ifdef __APPLE__
485
   my_thread->port = pthread_mach_thread_np(my_thread->handle);
486
   pthread_atfork(NULL, NULL, reset_mach_ports);
487
#endif
488

489
#ifdef __MINGW32__
490
   InitializeCriticalSectionAndSpinCount(&wakelock, LOCK_SPINS);
491
   InitializeConditionVariable(&wake_workers);
492

493
   for (int i = 0; i < PARKING_BAYS; i++) {
494
      parking_bay_t *bay = &(parking_bays[i]);
495
      InitializeCriticalSectionAndSpinCount(&(bay->mutex), LOCK_SPINS);
496
      InitializeConditionVariable(&(bay->cond));
497
   }
498
#endif
499

500
   assert(my_thread->id == 0);
4,579✔
501

502
   const char *max_env = getenv("NVC_MAX_THREADS");
4,579✔
503
   if (max_env != NULL)
4,579✔
NEW
504
      max_workers = MAX(1, MIN(atoi(max_env), MAX_THREADS));
×
505
   else
506
      max_workers = DEFAULT_THREADS;
4,579✔
507

508
   const int num_cpus = nvc_nprocs();
4,579✔
509
   max_workers = MIN(num_cpus, max_workers);
4,579✔
510

511
   const char *jobs_env = getenv("NVC_CONCURRENT_JOBS");
4,579✔
512
   if (jobs_env != NULL) {
4,579✔
NEW
513
      const int num_jobs = MAX(1, atoi(jobs_env));
×
NEW
514
      const int limit = (int)round((double)num_cpus / (double)num_jobs);
×
NEW
515
      max_workers = MAX(1, MIN(max_workers, limit));
×
516
   }
517

518
   assert(max_workers > 0);
4,579✔
519

520
#ifdef DEBUG
521
   if (getenv("NVC_THREAD_VERBOSE") != NULL)
4,579✔
522
      atexit(print_lock_stats);
×
523
#endif
524

525
   atexit(join_worker_threads);
4,579✔
526

527
#ifdef POSIX_SUSPEND
528
   sem_init(&stop_sem, 0, 0);
4,579✔
529

530
   struct sigaction sa = {
4,579✔
531
      .sa_sigaction = suspend_handler,
532
      .sa_flags = SA_RESTART | SA_SIGINFO
533
   };
534
   sigfillset(&sa.sa_mask);
4,579✔
535
   unmask_fatal_signals(&sa.sa_mask);
4,579✔
536

537
   sigaction(SIGSUSPEND, &sa, NULL);
4,579✔
538
   sigaction(SIGRESUME, &sa, NULL);
4,579✔
539

540
   sigset_t mask;
4,579✔
541
   PTHREAD_CHECK(pthread_sigmask, SIG_SETMASK, NULL, &mask);
4,579✔
542

543
   sigdelset(&mask, SIGSUSPEND);
4,579✔
544
   sigaddset(&mask, SIGRESUME);
4,579✔
545

546
   PTHREAD_CHECK(pthread_sigmask, SIG_SETMASK, &mask, NULL);
4,579✔
547
#endif
548
}
4,579✔
549

550
int thread_id(void)
75,596,989✔
551
{
552
   assert(my_thread != NULL);
75,596,989✔
553
   return my_thread->id;
75,596,989✔
554
}
555

556
bool thread_attached(void)
45✔
557
{
558
   return my_thread != NULL;
45✔
559
}
560

561
void thread_sleep(int usec)
×
562
{
563
   usleep(usec);
×
564
}
×
565

566
static void *thread_wrapper(void *arg)
70✔
567
{
568
   assert(my_thread == NULL);
70✔
569
   my_thread = arg;
70✔
570

571
   void *result = (*my_thread->fn)(my_thread->arg);
70✔
572

573
   // Avoid races with stop_world
574
   SCOPED_LOCK(stop_lock);
70✔
575

576
   assert(threads[my_thread->id] == my_thread);
70✔
577
   atomic_store(&(threads[my_thread->id]),  NULL);
70✔
578

579
   atomic_add(&running_threads, -1);
70✔
580
   return result;
70✔
581
}
582

583
#ifdef __MINGW32__
584
static DWORD win32_thread_wrapper(LPVOID param)
585
{
586
   void *ret = thread_wrapper(param);
587
   atomic_store(&(my_thread->retval), ret);
588
   return 0;
589
}
590
#endif
591

592
nvc_thread_t *thread_create(thread_fn_t fn, void *arg, const char *fmt, ...)
14✔
593
{
594
   va_list ap;
14✔
595
   va_start(ap, fmt);
14✔
596
   char *name = xvasprintf(fmt, ap);
14✔
597
   va_end(ap);
14✔
598

599
   // Avoid races with stop_world
600
   SCOPED_LOCK(stop_lock);
14✔
601

602
   nvc_thread_t *thread = thread_new(fn, arg, USER_THREAD, name);
14✔
603

604
#ifdef __MINGW32__
605
   if ((thread->handle = CreateThread(NULL, 0, win32_thread_wrapper,
606
                                      thread, 0, NULL)) == NULL)
607
      fatal_errno("CreateThread");
608
#else
609
   PTHREAD_CHECK(pthread_create, &(thread->handle), NULL,
14✔
610
                 thread_wrapper, thread);
611
#endif
612

613
#ifdef __APPLE__
614
   thread->port = pthread_mach_thread_np(thread->handle);
615
#endif
616

617
   return thread;
14✔
618
}
619

620
void *thread_join(nvc_thread_t *thread)
70✔
621
{
622
   if (thread == my_thread || thread->kind == MAIN_THREAD)
70✔
623
      fatal_trace("cannot join self or main thread");
×
624

625
   void *retval = NULL;
70✔
626
#ifdef __MINGW32__
627
   if (WaitForSingleObject(thread->handle, INFINITE) == WAIT_FAILED)
628
      fatal_errno("WaitForSingleObject failed for thread %s", thread->name);
629

630
   retval = atomic_load(&(thread->retval));
631
#else
632
   PTHREAD_CHECK(pthread_join, thread->handle, &retval);
70✔
633
#endif
634

635
   async_free(thread->name);
70✔
636
   async_free(thread);
70✔
637

638
   return retval;
70✔
639
}
640

641
nvc_thread_t *get_thread(int id)
27,072✔
642
{
643
   assert(id >= 0 && id < MAX_THREADS);
27,072✔
644
   return atomic_load(&threads[id]);
27,072✔
645
}
646

647
static inline parking_bay_t *parking_bay_for(void *cookie)
2,383✔
648
{
649
   return &(parking_bays[mix_bits_64(cookie) % PARKING_BAYS]);
2,383✔
650
}
651

652
static void thread_park(void *cookie, park_fn_t fn)
1,517✔
653
{
654
   parking_bay_t *bay = parking_bay_for(cookie);
1,517✔
655

656
   platform_mutex_lock(&(bay->mutex));
1,517✔
657
   {
658
      if ((*fn)(bay, cookie)) {
1,517✔
659
         bay->parked++;
1,512✔
660
         platform_cond_wait(&(bay->cond), &(bay->mutex));
1,512✔
661
         assert(bay->parked > 0);
1,512✔
662
         bay->parked--;
1,512✔
663
      }
664
   }
665
   platform_mutex_unlock(&(bay->mutex));
1,517✔
666
}
1,517✔
667

668
static void thread_unpark(void *cookie, unpark_fn_t fn)
866✔
669
{
670
   parking_bay_t *bay = parking_bay_for(cookie);
866✔
671

672
   if (fn != NULL) {
866✔
673
      platform_mutex_lock(&(bay->mutex));
866✔
674
      {
675
         (*fn)(bay, cookie);
866✔
676
      }
677
      platform_mutex_unlock(&(bay->mutex));
866✔
678
   }
679

680
   // Do not use pthread_cond_signal here as multiple threads parked in
681
   // this bay may be waiting on different cookies
682
   platform_cond_broadcast(&(bay->cond));
866✔
683
}
866✔
684

685
void spin_wait(void)
56,983,865✔
686
{
687
#ifdef __x86_64__
688
   __asm__ volatile ("pause");
56,983,865✔
689
#elif defined __aarch64__
690
   // YIELD is a no-op on most AArch64 cores so also do an ISB to stall
691
   // the pipeline for a bit
692
   __asm__ volatile ("yield; isb");
693
#endif
694
}
56,983,865✔
695

696
static bool lock_park_cb(parking_bay_t *bay, void *cookie)
1,517✔
697
{
698
   nvc_lock_t *lock = cookie;
1,517✔
699

700
   // This is called with the park mutex held: check the lock is still
701
   // owned by someone and the park bit is still set
702
   return relaxed_load(lock) == (IS_LOCKED | HAS_PARKED);
1,517✔
703
}
704

705
static void lock_unpark_cb(parking_bay_t *bay, void *cookie)
866✔
706
{
707
   nvc_lock_t *lock = cookie;
866✔
708

709
   assert(relaxed_load(lock) == (IS_LOCKED | HAS_PARKED));
866✔
710

711
   // Unlock must have release semantics
712
   atomic_store(lock, (bay->parked > 0 ? HAS_PARKED : 0));
866✔
713
}
866✔
714

715
void nvc_lock(nvc_lock_t *lock)
70,254,170✔
716
{
717
   LOCK_EVENT(locks, 1);
70,254,170✔
718
   TSAN_PRE_LOCK(lock);
70,254,170✔
719

720
   int8_t state = 0;
70,254,170✔
721
   if (likely(__atomic_cas(lock, &state, state | IS_LOCKED)))
70,254,170✔
722
      goto locked;  // Fast path: acquired the lock without contention
70,237,578✔
723

724
   LOCK_EVENT(contended, 1);
16,592✔
725

726
   for (;;) {
36,409✔
727
      LOCK_EVENT(retries, 1);
36,409✔
728

729
      // Spin a few times waiting for the owner to release the lock
730
      // before parking
731
      int spins = 0;
732
      for (; (state & IS_LOCKED) && spins < LOCK_SPINS;
214,435✔
733
           spins++, state = relaxed_load(lock))
178,026✔
734
         spin_wait();
178,026✔
735

736
      if (state & IS_LOCKED) {
36,409✔
737
         // Ignore failures here as we will check the lock state again
738
         // in the callback with the park mutex held
739
         atomic_cas(lock, IS_LOCKED, IS_LOCKED | HAS_PARKED);
1,517✔
740

741
         LOCK_EVENT(parks, 1);
1,517✔
742
         thread_park(lock, lock_park_cb);
1,517✔
743

744
         if ((state = relaxed_load(lock)) & IS_LOCKED) {
1,517✔
745
            // Someone else grabbed the lock before our thread was unparked
746
            LOCK_EVENT(spurious, 1);
1,332✔
747
            continue;
1,332✔
748
         }
749
      }
750
      else
751
         LOCK_EVENT(spins, spins);
34,892✔
752

753
      assert(!(state & IS_LOCKED));
35,077✔
754

755
      // If we get here then we've seen the lock in an unowned state:
756
      // attempt to grab it with a CAS
757
      if (__atomic_cas(lock, &state, state | IS_LOCKED))
35,077✔
758
         goto locked;
16,592✔
759
   }
760

761
 locked:
70,254,170✔
762
   TSAN_POST_LOCK(lock);
70,254,170✔
763
}
70,254,170✔
764

765
void nvc_unlock(nvc_lock_t *lock)
70,254,167✔
766
{
767
   TSAN_PRE_UNLOCK(lock);
70,254,167✔
768

769
   // Fast path: unlock assuming no parked waiters
770
   if (likely(atomic_cas(lock, IS_LOCKED, 0)))
70,254,167✔
771
      goto unlocked;
70,253,301✔
772

773
   // If we get here then we must own the lock with at least one parked
774
   // waiting thread
775
   assert(relaxed_load(lock) == (IS_LOCKED | HAS_PARKED));
866✔
776

777
   // Lock released in callback
778
   thread_unpark(lock, lock_unpark_cb);
866✔
779

780
 unlocked:
70,254,167✔
781
   TSAN_POST_UNLOCK(lock);
70,254,167✔
782
}
70,254,167✔
783

784
#ifdef DEBUG
785
void assert_lock_held(nvc_lock_t *lock)
160,408✔
786
{
787
   int8_t state = relaxed_load(lock);
160,408✔
788
   if (unlikely(!(state & IS_LOCKED)))
160,408✔
789
      fatal_trace("expected lock at %p to be held", lock);
×
790
}
160,408✔
791
#endif
792

793
void __scoped_unlock(nvc_lock_t **plock)
70,203,212✔
794
{
795
   nvc_unlock(*plock);
70,203,212✔
796
}
70,203,212✔
797

798
static void push_bot(threadq_t *tq, const task_t *tasks, size_t count)
72✔
799
{
800
   const abp_idx_t bot = relaxed_load(&tq->bot);
72✔
801
   assert(bot + count <= THREADQ_SIZE);
72✔
802

803
   memcpy(tq->deque + bot, tasks, count * sizeof(task_t));
72✔
804
   store_release(&tq->bot, bot + count);
72✔
805
}
72✔
806

807
static bool pop_bot(threadq_t *tq, task_t *task)
237✔
808
{
809
   const abp_idx_t old_bot = relaxed_load(&tq->bot);
237✔
810
   if (old_bot == 0)
237✔
811
      return false;
812

813
   const abp_idx_t new_bot = old_bot - 1;
199✔
814
   atomic_store(&tq->bot, new_bot);
199✔
815

816
   *task = tq->deque[new_bot];
199✔
817

818
   const abp_age_t old_age = { .bits = atomic_load(&tq->age.bits) };
199✔
819
   if (new_bot > old_age.top)
199✔
820
      return true;
821

822
   atomic_store(&tq->bot, 0);
72✔
823

824
   const abp_age_t new_age = { .top = 0, .tag = old_age.tag + 1 };
72✔
825
   if (new_bot == old_age.top) {
72✔
826
      if (atomic_cas(&tq->age.bits, old_age.bits, new_age.bits))
38✔
827
         return true;
828
   }
829

830
   atomic_store(&tq->age.bits, new_age.bits);
34✔
831
   return false;
34✔
832
}
833

834
__attribute__((no_sanitize("thread")))
835
static bool pop_top(threadq_t *tq, task_t *task)
67✔
836
{
837
   const abp_age_t old_age = { .bits = atomic_load(&tq->age.bits) };
67✔
838
   const abp_idx_t bot = atomic_load(&tq->bot);
67✔
839

840
   if (bot <= old_age.top)
67✔
841
      return false;
842

843
   // This triggers a TSAN false-positive: we will never read from *task
844
   // if the CAS fails below, so it's safe to ignore
845
   *task = tq->deque[old_age.top];
65✔
846

847
   const abp_age_t new_age = {
65✔
848
      .tag = old_age.tag,
849
      .top = old_age.top + 1
65✔
850
   };
851

852
   return atomic_cas(&tq->age.bits, old_age.bits, new_age.bits);
65✔
853
}
854

855
static void globalq_put(globalq_t *gq, const task_t *tasks, size_t count)
143✔
856
{
857
   assert_lock_held(&gq->lock);
143✔
858

859
   if (gq->wptr == gq->rptr)
143✔
860
      gq->wptr = gq->rptr = 0;
63✔
861

862
   if (gq->wptr + count > gq->max) {
143✔
863
      gq->max = next_power_of_2(gq->wptr + count);
41✔
864
      gq->tasks = xrealloc_array(gq->tasks, gq->max, sizeof(task_t));
41✔
865
   }
866

867
   memcpy(gq->tasks + gq->wptr, tasks, count * sizeof(task_t));
143✔
868
   gq->wptr += count;
143✔
869
}
143✔
870

871
__attribute__((no_sanitize("thread")))
872
static bool globalq_unlocked_empty(globalq_t *gq)
410✔
873
{
874
   return relaxed_load(&gq->wptr) == relaxed_load(&gq->rptr);
410✔
875
}
876

877
static size_t globalq_take(globalq_t *gq, threadq_t *tq)
410✔
878
{
879
   if (globalq_unlocked_empty(gq))
410✔
880
      return 0;
881

882
   const int nthreads = relaxed_load(&running_threads);
84✔
883

884
   SCOPED_LOCK(gq->lock);
168✔
885

886
   if (gq->wptr == gq->rptr)
84✔
887
      return 0;
888

889
   const int remain = gq->wptr - gq->rptr;
72✔
890
   const int share = gq->wptr / nthreads;
72✔
891
   const int take = MIN(remain, MAX(MIN_TAKE, MIN(THREADQ_SIZE, share)));
72✔
892
   const int from = gq->rptr;
72✔
893

894
   gq->rptr += take;
72✔
895

896
   push_bot(tq, gq->tasks + from, take);
72✔
897
   return take;
72✔
898
}
899

900
static void execute_task(task_t *task)
1,294✔
901
{
902
   (*task->fn)(task->context, task->arg);
1,294✔
903

904
   if (task->workq != NULL) {
1,294✔
905
      entryq_t *eq = &(task->workq->entryqs[my_thread->id]);
1,183✔
906

907
      const entryq_ptr_t cur = { .bits = relaxed_load(&eq->comp.bits) };
1,183✔
908
      const int epoch = atomic_load(&task->workq->epoch);
1,183✔
909
      const int count = cur.epoch == epoch ? cur.count : 0;
1,183✔
910
      const entryq_ptr_t next = { .count = count + 1, .epoch = epoch };
1,183✔
911
      store_release(&eq->comp.bits, next.bits);
1,183✔
912
   }
913
   else
914
      atomic_add(&async_pending, -1);
111✔
915
}
1,294✔
916

917
static bool globalq_poll(globalq_t *gq, threadq_t *tq)
410✔
918
{
919
   int ntasks;
410✔
920
   if ((ntasks = globalq_take(gq, tq))) {
410✔
921
      task_t task;
72✔
922
      int comp = 0;
72✔
923
      for (; pop_bot(tq, &task); comp++)
309✔
924
         execute_task(&task);
165✔
925

926
      WORKQ_EVENT(comp, comp);
72✔
927
      return true;
72✔
928
   }
929
   else
930
      return false;
931
}
932

933
workq_t *workq_new(void *context)
1,096✔
934
{
935
   if (my_thread->kind != MAIN_THREAD)
1,096✔
936
      fatal_trace("work queues can only be created by the main thread");
×
937

938
   workq_t *wq = xcalloc(sizeof(workq_t));
1,096✔
939
   wq->state    = IDLE;
1,096✔
940
   wq->context  = context;
1,096✔
941
   wq->parallel = max_workers > 1;
1,096✔
942
   wq->epoch    = 1;
1,096✔
943

944
   return wq;
1,096✔
945
}
946

947
void workq_not_thread_safe(workq_t *wq)
×
948
{
949
   wq->parallel = false;
×
950
}
×
951

952
void workq_free(workq_t *wq)
1,096✔
953
{
954
   if (my_thread->kind != MAIN_THREAD)
1,096✔
955
      fatal_trace("work queues can only be freed by the main thread");
×
956

957
   assert(wq->state == IDLE);
1,096✔
958

959
   for (int i = 0; i < MAX_THREADS; i++)
71,240✔
960
      free(wq->entryqs[i].tasks);
70,144✔
961

962
   free(wq);
1,096✔
963
}
1,096✔
964

965
void workq_do(workq_t *wq, task_fn_t fn, void *arg)
1,183✔
966
{
967
   assert(wq->state == IDLE);
1,183✔
968

969
   entryq_t *eq = &(wq->entryqs[my_thread->id]);
1,183✔
970

971
   const entryq_ptr_t cur = { .bits = relaxed_load(&eq->wptr.bits) };
1,183✔
972
   const int epoch = atomic_load(&wq->epoch);
1,183✔
973
   const int wptr = cur.epoch == epoch ? cur.count : 0;
1,183✔
974

975
   if (wptr == eq->queuesz) {
1,183✔
976
      eq->queuesz = MAX(eq->queuesz * 2, 64);
1,096✔
977
      eq->tasks = xrealloc_array(eq->tasks, eq->queuesz, sizeof(task_t));
1,096✔
978
   }
979

980
   eq->tasks[wptr] = (task_t){ fn, wq->context, arg, wq };
1,183✔
981

982
   const entryq_ptr_t next = { .count = wptr + 1, .epoch = epoch };
1,183✔
983
   store_release(&eq->wptr.bits, next.bits);
1,183✔
984

985
   unsigned maxthread = relaxed_load(&wq->maxthread);
1,183✔
986
   while (maxthread < my_thread->id) {
1,183✔
987
      if (__atomic_cas(&wq->maxthread, &maxthread, my_thread->id))
×
988
         break;
989
   }
990
}
1,183✔
991

992
void workq_scan(workq_t *wq, scan_fn_t fn, void *arg)
×
993
{
994
   const int maxthread = relaxed_load(&wq->maxthread);
×
995
   for (int i = 0; i <= maxthread; i++) {
×
996
      entryq_t *eq = &(wq->entryqs[my_thread->id]);
×
997
      const entryq_ptr_t wptr = { .bits = load_acquire(&eq->wptr.bits) };
×
998
      for (int j = 0; j < wptr.count; j++)
×
999
         (*fn)(wq->context, eq->tasks[j].arg, arg);
×
1000
   }
1001
}
×
1002

1003
static int estimate_depth(threadq_t *tq)
1,015✔
1004
{
1005
   const abp_age_t age = { .bits = relaxed_load(&tq->age.bits) };
1,015✔
1006
   const abp_idx_t bot = relaxed_load(&tq->bot);
1,015✔
1007

1008
   return bot <= age.top ? 0 : bot - age.top;
1,015✔
1009
}
1010

1011
static threadq_t *get_thread_queue(int id)
1,015✔
1012
{
1013
   assert(id < MAX_THREADS);
1,015✔
1014

1015
   nvc_thread_t *t = atomic_load(&(threads[id]));
1,015✔
1016
   if (t == NULL)
1,015✔
1017
      return NULL;
1018

1019
   return &(t->queue);
1,015✔
1020
}
1021

1022
static uint32_t fast_rand(void)
322✔
1023
{
1024
   uint32_t state = my_thread->rngstate;
322✔
1025
   state ^= (state << 13);
322✔
1026
   state ^= (state >> 17);
322✔
1027
   state ^= (state << 5);
322✔
1028
   return (my_thread->rngstate = state);
322✔
1029
}
1030

1031
static threadq_t *find_victim(void)
354✔
1032
{
1033
   threadq_t *last = get_thread_queue(my_thread->victim);
354✔
1034
   if (last != NULL && estimate_depth(last) > 0)
354✔
1035
      return last;
1036

1037
   const int maxthread = relaxed_load(&max_thread_id);
322✔
1038
   const int start = fast_rand() % (maxthread + 1);
322✔
1039
   int idx = start;
322✔
1040
   do {
967✔
1041
      if (idx != my_thread->id) {
967✔
1042
         threadq_t *q = get_thread_queue(idx);
661✔
1043
         if (q != NULL && estimate_depth(q) > 0) {
661✔
1044
            my_thread->victim = idx;
35✔
1045
            return q;
35✔
1046
         }
1047
      }
1048
   } while ((idx = (idx + 1) % (maxthread + 1)) != start);
932✔
1049

1050
   return NULL;
1051
}
1052

1053
static bool steal_task(void)
354✔
1054
{
1055
   threadq_t *tq = find_victim();
354✔
1056
   if (tq == NULL)
354✔
1057
      return false;
1058

1059
   task_t task;
67✔
1060
   if (pop_top(tq, &task)) {
67✔
1061
      WORKQ_EVENT(steals, 1);
65✔
1062
      execute_task(&task);
65✔
1063
      WORKQ_EVENT(comp, 1);
65✔
1064
      return true;
65✔
1065
   }
1066

1067
   return false;
1068
}
1069

1070
static void progressive_backoff(void)
58,576,127✔
1071
{
1072
   if (my_thread->spins++ < YIELD_SPINS)
58,576,127✔
1073
      spin_wait();
56,801,099✔
1074
   else {
1075
#ifdef __MINGW32__
1076
      SwitchToThread();
1077
#else
1078
      sched_yield();
1,775,028✔
1079
#endif
1080
      my_thread->spins = 0;
1,775,028✔
1081
   }
1082
}
58,576,127✔
1083

1084
static void *worker_thread(void *arg)
56✔
1085
{
1086
   mspace_stack_limit(MSPACE_CURRENT_FRAME);
56✔
1087

1088
   do {
357✔
1089
      if (globalq_poll(&globalq, &(my_thread->queue)) || steal_task())
357✔
1090
         my_thread->spins = 0;  // Did work
100✔
1091
      else if (my_thread->spins++ < 2)
257✔
1092
         spin_wait();
160✔
1093
      else {
1094
         platform_mutex_lock(&wakelock);
97✔
1095
         {
1096
            if (!relaxed_load(&should_stop))
97✔
1097
               platform_cond_wait(&wake_workers, &wakelock);
97✔
1098
         }
1099
         platform_mutex_unlock(&wakelock);
97✔
1100
      }
1101
   } while (likely(!relaxed_load(&should_stop)));
357✔
1102

1103
   return NULL;
56✔
1104
}
1105

1106
static void create_workers(int needed)
143✔
1107
{
1108
   assert(my_thread->kind == MAIN_THREAD);
143✔
1109

1110
   if (relaxed_load(&should_stop))
143✔
1111
      return;
1112

1113
   while (relaxed_load(&running_threads) < MIN(max_workers, needed)) {
199✔
1114
      static int counter = 0;
56✔
1115
      char *name = xasprintf("worker thread %d", atomic_add(&counter, 1));
56✔
1116
      nvc_thread_t *thread =
56✔
1117
         thread_new(worker_thread, NULL, WORKER_THREAD, name);
56✔
1118

1119
#ifdef __MINGW32__
1120
      if ((thread->handle = CreateThread(NULL, 0, win32_thread_wrapper,
1121
                                         thread, 0, NULL)) == NULL)
1122
         fatal_errno("CreateThread");
1123
#else
1124
      PTHREAD_CHECK(pthread_create, &(thread->handle), NULL,
199✔
1125
                    thread_wrapper, thread);
1126
#endif
1127
   }
1128

1129
   platform_cond_broadcast(&wake_workers);
143✔
1130
}
1131

1132
void workq_start(workq_t *wq)
1,096✔
1133
{
1134
   assert(my_thread->kind == MAIN_THREAD);
1,096✔
1135

1136
   const int epoch = relaxed_load(&wq->epoch);
1,096✔
1137
   const int maxthread = relaxed_load(&wq->maxthread);
1,096✔
1138

1139
   assert(wq->state == IDLE);
1,096✔
1140
   wq->state = START;
1,096✔
1141

1142
   int nserial = 0, nparallel = 0;
1,096✔
1143
   for (int i = 0; i <= maxthread; i++) {
2,192✔
1144
      entryq_t *eq = &wq->entryqs[i];
1,096✔
1145
      const entryq_ptr_t wptr = { .bits = load_acquire(&eq->wptr.bits) };
1,096✔
1146
      if (wptr.epoch == epoch) {
1,096✔
1147
         // Only bump epoch if there are tasks to run
1148
         if (nserial + nparallel == 0)
1,096✔
1149
            atomic_add(&wq->epoch, 1);
1,096✔
1150

1151
         assert(wptr.count > 0);
1,096✔
1152

1153
         if (i == maxthread && nserial + nparallel == 0 && wptr.count == 1) {
1,096✔
1154
            execute_task(&(eq->tasks[0]));   // Only one task in total
1,064✔
1155
            nserial++;
1,064✔
1156
         }
1157
         else if (wq->parallel) {
32✔
1158
            if (nparallel == 0)
32✔
1159
               nvc_lock(&globalq.lock);   // Lazily acquire lock
32✔
1160
            globalq_put(&globalq, eq->tasks, wptr.count);
32✔
1161
            nparallel += wptr.count;
32✔
1162
         }
1163
         else {
1164
            for (int j = 0; j < wptr.count; j++)
×
1165
               execute_task(&(eq->tasks[j]));
×
1166
            nserial += wptr.count;
×
1167
         }
1168
      }
1169
   }
1170

1171
   if (wq->parallel && nparallel > 0) {
1,096✔
1172
      nvc_unlock(&globalq.lock);
32✔
1173
      create_workers(nparallel);
32✔
1174
   }
1175
}
1,096✔
1176

1177
static int workq_outstanding(workq_t *wq)
58,457,158✔
1178
{
1179
   assert(wq->state == START);
58,457,158✔
1180

1181
   const int epoch = atomic_load(&wq->epoch);
58,457,158✔
1182
   const int maxthread = relaxed_load(&max_thread_id);
58,457,158✔
1183

1184
   int pending = 0;
58,457,158✔
1185
   for (int i = 0; i <= maxthread; i++) {
216,330,433✔
1186
      entryq_t *eq = &(wq->entryqs[i]);
157,873,275✔
1187

1188
      const entryq_ptr_t wptr = { .bits = load_acquire(&eq->wptr.bits) };
157,873,275✔
1189
      if (wptr.epoch == epoch - 1)
157,873,275✔
1190
         pending += wptr.count;
58,457,158✔
1191

1192
      const entryq_ptr_t comp = { .bits = load_acquire(&eq->comp.bits) };
157,873,275✔
1193
      if (comp.epoch == epoch)
157,873,275✔
1194
         pending -= comp.count;
94,941,483✔
1195
   }
1196

1197
   assert(pending >= 0);
58,457,158✔
1198
   return pending;
58,457,158✔
1199
}
1200

1201
static void workq_parallel_drain(workq_t *wq)
1,096✔
1202
{
1203
   if (workq_outstanding(wq) > 0) {
1,096✔
1204
      while (globalq_poll(&globalq, &(my_thread->queue)));
52✔
1205
      while (steal_task());
48✔
1206

1207
      while (workq_outstanding(wq) > 0)
58,456,062✔
1208
         progressive_backoff();
58,456,030✔
1209
   }
1210

1211
   wq->state = IDLE;
1,096✔
1212
}
1,096✔
1213

1214
void workq_drain(workq_t *wq)
1,096✔
1215
{
1216
   if (my_thread->kind != MAIN_THREAD)
1,096✔
1217
      fatal_trace("workq_drain can only be called from the main thread");
×
1218

1219
   if (wq->parallel)
1,096✔
1220
      workq_parallel_drain(wq);
1,096✔
1221
   else {
1222
      assert(wq->state == START);
×
1223
      wq->state = IDLE;
×
1224
   }
1225
}
1,096✔
1226

1227
void async_do(task_fn_t fn, void *context, void *arg)
111✔
1228
{
1229
   if (max_workers == 1)
111✔
1230
      (*fn)(context, arg);   // Single CPU
×
1231
   else {
1232
      const int npending = atomic_add(&async_pending, 1);
111✔
1233
      create_workers(npending + 1 /* Do not count main thread */);
111✔
1234

1235
      task_t tasks[1] = {
111✔
1236
         { fn, context, arg, NULL }
1237
      };
1238
      SCOPED_LOCK(globalq.lock);
222✔
1239
      globalq_put(&globalq, tasks, 1);
111✔
1240
   }
1241
}
111✔
1242

1243
void async_barrier(void)
8,029✔
1244
{
1245
   while (atomic_load(&async_pending) > 0) {
16,059✔
1246
      if (!globalq_poll(&globalq, &(my_thread->queue)))
1✔
1247
         progressive_backoff();
×
1248
   }
1249
}
8,029✔
1250

1251
void async_free(void *ptr)
6,025✔
1252
{
1253
   if (relaxed_load(&running_threads) == 1)
6,025✔
1254
      free(ptr);
6,006✔
1255
   else if (ptr != NULL) {
1256
      // TODO: free when all threads in quiescent state
1257
   }
6,025✔
1258
}
6,025✔
1259

1260
#ifdef POSIX_SUSPEND
1261
static void suspend_handler(int sig, siginfo_t *info, void *context)
3,786✔
1262
{
1263
   if (info->si_pid != getpid())
3,786✔
1264
      return;   // Not sent by us, ignore it
1,893✔
1265
   else if (sig == SIGRESUME)
3,786✔
1266
      return;
1267

1268
   const int olderrno = errno;
1,893✔
1269

1270
   if (my_thread != NULL) {
1,893✔
1271
      struct cpu_state cpu;
1,893✔
1272
      fill_cpu_state(&cpu, (ucontext_t *)context);
1,893✔
1273

1274
      stop_world_fn_t callback = atomic_load(&stop_callback);
1,893✔
1275
      void *arg = atomic_load(&stop_arg);
1,893✔
1276

1277
      (*callback)(my_thread->id, &cpu, arg);
1,893✔
1278
   }
1279

1280
   sem_post(&stop_sem);
1,893✔
1281

1282
   sigset_t mask;
1,893✔
1283
   sigfillset(&mask);
1,893✔
1284
   unmask_fatal_signals(&mask);
1,893✔
1285
   sigdelset(&mask, SIGRESUME);
1,893✔
1286

1287
   sigsuspend(&mask);
1,893✔
1288

1289
   sem_post(&stop_sem);
1,893✔
1290

1291
   errno = olderrno;
1,893✔
1292
}
1293
#endif
1294

1295
void stop_world(stop_world_fn_t callback, void *arg)
923✔
1296
{
1297
   nvc_lock(&stop_lock);
923✔
1298

1299
   atomic_store(&stop_callback, callback);
923✔
1300
   atomic_store(&stop_arg, arg);
923✔
1301

1302
#ifdef __MINGW32__
1303
   const int maxthread = relaxed_load(&max_thread_id);
1304
   for (int i = 0; i <= maxthread; i++) {
1305
      nvc_thread_t *thread = atomic_load(&threads[i]);
1306
      if (thread == NULL || thread == my_thread)
1307
         continue;
1308

1309
      if (SuspendThread(thread->handle) != 0)
1310
         fatal_errno("SuspendThread");
1311

1312
      CONTEXT context;
1313
      context.ContextFlags = CONTEXT_INTEGER | CONTEXT_CONTROL;
1314
      if (!GetThreadContext(thread->handle, &context))
1315
         fatal_errno("GetThreadContext");
1316

1317
      struct cpu_state cpu;
1318
      fill_cpu_state(&cpu, &context);
1319

1320
      (*callback)(thread->id, &cpu, arg);
1321
   }
1322
#elif defined __APPLE__
1323
   const int maxthread = relaxed_load(&max_thread_id);
1324
   for (int i = 0; i <= maxthread; i++) {
1325
      nvc_thread_t *thread = atomic_load(&threads[i]);
1326
      if (thread == NULL || thread == my_thread)
1327
         continue;
1328

1329
      assert(thread->port != MACH_PORT_NULL);
1330

1331
      kern_return_t kern_result;
1332
      do {
1333
         kern_result = thread_suspend(thread->port);
1334
      } while (kern_result == KERN_ABORTED);
1335

1336
      if (kern_result != KERN_SUCCESS)
1337
         fatal_trace("failed to suspend thread %d (%d)", i, kern_result);
1338

1339
#ifdef __aarch64__
1340
      arm_thread_state64_t state;
1341
      mach_msg_type_number_t count = ARM_THREAD_STATE64_COUNT;
1342
      thread_state_flavor_t flavor = ARM_THREAD_STATE64;
1343
#else
1344
      x86_thread_state64_t state;
1345
      mach_msg_type_number_t count = x86_THREAD_STATE64_COUNT;
1346
      thread_state_flavor_t flavor = x86_THREAD_STATE64;
1347
#endif
1348
      kern_result = thread_get_state(thread->port, flavor,
1349
                                     (natural_t *)&state, &count);
1350
      if (kern_result != KERN_SUCCESS)
1351
         fatal_trace("failed to get thread %d state (%d)", i, kern_result);
1352

1353
      // Fake a ucontext_t that we can pass to fill_cpu_state
1354
      ucontext_t uc;
1355
      typeof(*uc.uc_mcontext) mc;
1356
      uc.uc_mcontext = &mc;
1357
      mc.__ss = state;
1358

1359
      struct cpu_state cpu;
1360
      fill_cpu_state(&cpu, &uc);
1361

1362
      (*callback)(thread->id, &cpu, arg);
1363
   }
1364
#elif defined __SANITIZE_THREAD__
1365
   // https://github.com/google/sanitizers/issues/1179
1366
   fatal_trace("stop_world is not supported with tsan");
1367
#else
1368
   assert(sem_trywait(&stop_sem) == -1 && errno == EAGAIN);
923✔
1369

1370
   int signalled = 0;
923✔
1371
   const int maxthread = relaxed_load(&max_thread_id);
923✔
1372
   for (int i = 0; i <= maxthread; i++) {
4,175✔
1373
      nvc_thread_t *thread = atomic_load(&threads[i]);
3,252✔
1374
      if (thread == NULL || thread == my_thread)
3,252✔
1375
         continue;
1,359✔
1376

1377
      PTHREAD_CHECK(pthread_kill, thread->handle, SIGSUSPEND);
1,893✔
1378
      signalled++;
1,893✔
1379
   }
1380

1381
   struct timespec ts;
923✔
1382
   if (clock_gettime(CLOCK_REALTIME, &ts) != 0)
923✔
1383
      fatal_errno("clock_gettime");
×
1384

1385
   ts.tv_sec += SUSPEND_TIMEOUT;
923✔
1386

1387
   for (; signalled > 0; signalled--) {
2,816✔
1388
      if (sem_timedwait(&stop_sem, &ts) != 0)
1,893✔
1389
         fatal_trace("timeout waiting for %d threads to suspend", signalled);
×
1390
   }
1391

1392
   assert(sem_trywait(&stop_sem) == -1 && errno == EAGAIN);
923✔
1393
#endif
1394

1395
   struct cpu_state cpu;
923✔
1396
   capture_registers(&cpu);
923✔
1397

1398
   (*callback)(my_thread->id, &cpu, arg);
923✔
1399
}
923✔
1400

1401
void start_world(void)
923✔
1402
{
1403
   assert_lock_held(&stop_lock);
923✔
1404

1405
   const int maxthread = relaxed_load(&max_thread_id);
923✔
1406

1407
#ifdef __MINGW32__
1408
   for (int i = 0; i <= maxthread; i++) {
1409
      nvc_thread_t *thread = atomic_load(&threads[i]);
1410
      if (thread == NULL || thread == my_thread)
1411
         continue;
1412

1413
      if (ResumeThread(thread->handle) != 1)
1414
         fatal_errno("ResumeThread");
1415
   }
1416
#elif defined __APPLE__
1417
   for (int i = 0; i <= maxthread; i++) {
1418
      nvc_thread_t *thread = atomic_load(&threads[i]);
1419
      if (thread == NULL || thread == my_thread)
1420
         continue;
1421

1422
      kern_return_t kern_result;
1423
      do {
1424
         kern_result = thread_resume(thread->port);
1425
      } while (kern_result == KERN_ABORTED);
1426

1427
      if (kern_result != KERN_SUCCESS)
1428
         fatal_trace("failed to resume thread %d (%d)", i, kern_result);
1429
   }
1430
#else
1431
   assert(sem_trywait(&stop_sem) == -1 && errno == EAGAIN);
923✔
1432

1433
   int signalled = 0;
1434
   for (int i = 0; i <= maxthread; i++) {
4,175✔
1435
      nvc_thread_t *thread = atomic_load(&threads[i]);
3,252✔
1436
      if (thread == NULL || thread == my_thread)
3,252✔
1437
         continue;
1,359✔
1438

1439
      PTHREAD_CHECK(pthread_kill, thread->handle, SIGRESUME);
1,893✔
1440
      signalled++;
1,893✔
1441
   }
1442

1443
   struct timespec ts;
923✔
1444
   if (clock_gettime(CLOCK_REALTIME, &ts) != 0)
923✔
1445
      fatal_errno("clock_gettime");
×
1446

1447
   ts.tv_sec += SUSPEND_TIMEOUT;
923✔
1448

1449
   for (; signalled > 0; signalled--) {
2,816✔
1450
      if (sem_timedwait(&stop_sem, &ts) != 0)
1,893✔
1451
         fatal_trace("timeout waiting for %d threads to resume", signalled);
×
1452
   }
1453

1454
   assert(sem_trywait(&stop_sem) == -1 && errno == EAGAIN);
923✔
1455
#endif
1456

1457
   nvc_unlock(&stop_lock);
923✔
1458
}
923✔
1459

1460
void thread_wx_mode(wx_mode_t mode)
14,807✔
1461
{
1462
#ifdef __APPLE__
1463
   pthread_jit_write_protect_np(mode == WX_EXECUTE);
1464
#else
1465
   // Could use Intel memory protection keys here
1466
#endif
1467
}
14,807✔
1468

1469
barrier_t *barrier_new(int count)
1✔
1470
{
1471
   barrier_t *b = xcalloc(sizeof(barrier_t));
1✔
1472
   b->count = count;
1✔
1473
   return b;
1✔
1474
}
1475

1476
void barrier_free(barrier_t *b)
1✔
1477
{
1478
   free(b);
1✔
1479
}
1✔
1480

1481
void barrier_wait(barrier_t *b)
88✔
1482
{
1483
   const int count = relaxed_load(&b->count);
88✔
1484
   const int passed = relaxed_load(&b->passed);
88✔
1485

1486
   if (atomic_fetch_add(&b->reached, 1) == count - 1) {
88✔
1487
      // Last thread to pass barrier
1488
      relaxed_store(&b->reached, 0);
22✔
1489
      store_release(&b->passed, passed + 1);
22✔
1490
   }
1491
   else {
1492
      while (load_acquire(&b->passed) == passed)
120,163✔
1493
         progressive_backoff();
120,097✔
1494
   }
1495
}
88✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc