• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nickg / nvc / 10075205318

24 Jul 2024 10:44AM UTC coverage: 91.608% (-0.007%) from 91.615%
10075205318

push

github

nickg
Fix crash with shared variable in generic package. Fixes #923

9 of 9 new or added lines in 2 files covered. (100.0%)

37 existing lines in 3 files now uncovered.

57036 of 62261 relevant lines covered (91.61%)

671507.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.75
/src/thread.c
1
//
2
//  Copyright (C) 2021-2023  Nick Gasson
3
//
4
//  This program is free software: you can redistribute it and/or modify
5
//  it under the terms of the GNU General Public License as published by
6
//  the Free Software Foundation, either version 3 of the License, or
7
//  (at your option) any later version.
8
//
9
//  This program is distributed in the hope that it will be useful,
10
//  but WITHOUT ANY WARRANTY; without even the implied warranty of
11
//  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
//  GNU General Public License for more details.
13
//
14
//  You should have received a copy of the GNU General Public License
15
//  along with this program.  If not, see <http://www.gnu.org/licenses/>.
16
//
17

18
#include "util.h"
19
#include "array.h"
20
#include "cpustate.h"
21
#include "rt/mspace.h"
22
#include "thread.h"
23

24
#include <assert.h>
25
#include <errno.h>
26
#include <math.h>
27
#include <signal.h>
28
#include <stdlib.h>
29
#include <string.h>
30
#include <inttypes.h>
31
#include <unistd.h>
32
#include <semaphore.h>
33

34
#ifdef HAVE_PTHREAD
35
#include <pthread.h>
36
#elif !defined __MINGW32__
37
#error missing pthread support
38
#endif
39

40
#ifdef __SANITIZE_THREAD__
41
#include <sanitizer/tsan_interface.h>
42
#endif
43

44
#if defined __MINGW32__
45
#define WIN32_LEAN_AND_MEAN
46
#include <windows.h>
47
#elif defined __APPLE__
48
#define task_t __task_t
49
#include <mach/thread_act.h>
50
#include <mach/machine.h>
51
#undef task_t
52
#endif
53

54
#define LOCK_SPINS      64
55
#define YIELD_SPINS     32
56
#define MIN_TAKE        8
57
#define PARKING_BAYS    64
58
#define SUSPEND_TIMEOUT 1
59

60
#if !defined __MINGW32__ && !defined __APPLE__
61
#define POSIX_SUSPEND 1
62
#define SIGSUSPEND    SIGRTMIN
63
#define SIGRESUME     (SIGRTMIN + 1)
64
#endif
65

66
typedef struct {
67
   int64_t locks;
68
   int64_t spins;
69
   int64_t contended;
70
   int64_t parks;
71
   int64_t spurious;
72
   int64_t retries;
73
} __attribute__((aligned(64))) lock_stats_t;
74

75
STATIC_ASSERT(sizeof(lock_stats_t) == 64)
76

77
#ifdef DEBUG
78
#define LOCK_EVENT(what, n) do {                  \
79
      if (likely(my_thread != NULL))              \
80
         lock_stats[my_thread->id].what += (n);   \
81
   } while (0)
82

83
#define WORKQ_EVENT(what, n) do {                 \
84
      if (likely(my_thread != NULL))              \
85
         workq_stats[my_thread->id].what += (n);  \
86
   } while (0)
87
#else
88
#define LOCK_EVENT(what, n) (void)(n)
89
#define WORKQ_EVENT(what, n) (void)(n)
90
#endif
91

92
#ifdef __SANITIZE_THREAD__
93
#define TSAN_PRE_LOCK(addr) \
94
   __tsan_mutex_pre_lock(addr, __tsan_mutex_linker_init);
95
#define TSAN_POST_LOCK(addr) \
96
   __tsan_mutex_post_lock(addr, __tsan_mutex_linker_init, 0);
97
#define TSAN_PRE_UNLOCK(addr) \
98
   __tsan_mutex_pre_unlock(addr, __tsan_mutex_linker_init);
99
#define TSAN_POST_UNLOCK(addr) \
100
   __tsan_mutex_post_unlock(addr, __tsan_mutex_linker_init);
101
#else
102
#define TSAN_PRE_LOCK(addr)
103
#define TSAN_POST_LOCK(addr)
104
#define TSAN_PRE_UNLOCK(addr)
105
#define TSAN_POST_UNLOCK(addr)
106
#endif
107

108
#ifndef PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP
109
#define PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP PTHREAD_MUTEX_INITIALIZER
110
#endif
111

112
#define PTHREAD_CHECK(op, ...) do {             \
113
      if (unlikely(op(__VA_ARGS__) != 0))       \
114
         fatal_errno(#op);                      \
115
   } while (0)
116

117
// Lock implementation based on WTF::Lock in WebKit
118
//   https://webkit.org/blog/6161/locking-in-webkit/
119

120
typedef enum {
121
   IS_LOCKED  = (1 << 0),
122
   HAS_PARKED = (1 << 1)
123
} lock_bits_t;
124

125
typedef struct {
126
#ifdef __MINGW32__
127
   CRITICAL_SECTION   mutex;
128
   CONDITION_VARIABLE cond;
129
#else
130
   pthread_mutex_t    mutex;
131
   pthread_cond_t     cond;
132
#endif
133
   int                parked;
134
} __attribute__((aligned(64))) parking_bay_t;
135

136
typedef bool (*park_fn_t)(parking_bay_t *, void *);
137
typedef void (*unpark_fn_t)(parking_bay_t *, void *);
138

139
typedef struct {
140
   task_fn_t  fn;
141
   void      *context;
142
   void      *arg;
143
   workq_t   *workq;
144
} task_t;
145

146
// Work sealing task queue is based on:
147
//   Arora, N. S., Blumofe, R. D., and Plaxton, C. G.
148
//   Thread scheduling for multiprogrammed multiprocessors.
149
//   Theory of Computing Systems 34, 2 (2001), 115-144.
150

151
typedef uint32_t abp_idx_t;
152
typedef uint32_t abp_tag_t;
153

154
typedef union {
155
   struct {
156
      abp_idx_t top;
157
      abp_tag_t tag;
158
   };
159
   uint64_t bits;
160
} abp_age_t;
161

162
STATIC_ASSERT(sizeof(abp_age_t) <= 8);
163

164
#define THREADQ_SIZE 256
165

166
typedef struct {
167
   task_t    deque[THREADQ_SIZE];
168
   abp_age_t age;
169
   abp_idx_t bot;
170
} __attribute__((aligned(64))) threadq_t;
171

172
typedef enum { IDLE, START } workq_state_t;
173

174
typedef union {
175
   struct {
176
      uint32_t count;
177
      uint32_t epoch;
178
   };
179
   uint64_t bits;
180
} entryq_ptr_t;
181

182
STATIC_ASSERT(sizeof(entryq_ptr_t) <= 8);
183

184
typedef struct {
185
   task_t       *tasks;
186
   unsigned      queuesz;
187
   entryq_ptr_t  wptr;
188
   entryq_ptr_t  comp;
189
} __attribute__((aligned(64))) entryq_t;
190

191
STATIC_ASSERT(sizeof(entryq_t) == 64);
192

193
struct _workq {
194
   void          *context;
195
   workq_state_t  state;
196
   unsigned       epoch;
197
   unsigned       maxthread;
198
   bool           parallel;
199
   entryq_t       entryqs[MAX_THREADS];
200
};
201

202
typedef struct {
203
   int64_t comp;
204
   int64_t steals;
205
   int64_t wakes;
206
} __attribute__((aligned(64))) workq_stats_t;
207

208
STATIC_ASSERT(sizeof(workq_stats_t) == 64);
209

210
typedef enum {
211
   MAIN_THREAD,
212
   USER_THREAD,
213
   WORKER_THREAD,
214
} thread_kind_t;
215

216
struct _nvc_thread {
217
   unsigned        id;
218
   thread_kind_t   kind;
219
   unsigned        spins;
220
   threadq_t       queue;
221
   char           *name;
222
   thread_fn_t     fn;
223
   void           *arg;
224
   int             victim;
225
   uint32_t        rngstate;
226
#ifdef __MINGW32__
227
   HANDLE          handle;
228
   void           *retval;
229
#else
230
   pthread_t       handle;
231
#endif
232
#ifdef __APPLE__
233
   thread_port_t   port;
234
#endif
235
};
236

237
typedef struct {
238
   nvc_lock_t   lock;
239
   task_t      *tasks;
240
   unsigned     wptr;
241
   unsigned     rptr;
242
   unsigned     max;
243
} globalq_t;
244

245
typedef struct _barrier {
246
   unsigned count;
247
   unsigned reached;
248
   unsigned passed;
249
} __attribute__((aligned(64))) barrier_t;
250

251
static parking_bay_t parking_bays[PARKING_BAYS] = {
252
#ifndef __MINGW32__
253
   [0 ... PARKING_BAYS - 1] = {
254
      PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP,
255
      PTHREAD_COND_INITIALIZER
256
   }
257
#endif
258
};
259

260
static nvc_thread_t    *threads[MAX_THREADS];
261
static unsigned         max_workers = 0;
262
static int              running_threads = 0;
263
static unsigned         max_thread_id = 0;
264
static bool             should_stop = false;
265
static globalq_t        globalq __attribute__((aligned(64)));
266
static int              async_pending __attribute__((aligned(64))) = 0;
267
static nvc_lock_t       stop_lock = 0;
268
static stop_world_fn_t  stop_callback = NULL;
269
static void            *stop_arg = NULL;
270

271
#ifdef __MINGW32__
272
static CONDITION_VARIABLE wake_workers = CONDITION_VARIABLE_INIT;
273
static CRITICAL_SECTION   wakelock;
274
#else
275
static pthread_cond_t     wake_workers = PTHREAD_COND_INITIALIZER;
276
static pthread_mutex_t    wakelock = PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP;
277
#endif
278

279
#ifdef POSIX_SUSPEND
280
static sem_t stop_sem;
281
#endif
282

283
#ifdef DEBUG
284
static lock_stats_t  lock_stats[MAX_THREADS];
285
static workq_stats_t workq_stats[MAX_THREADS];
286
#endif
287

288
static __thread nvc_thread_t *my_thread = NULL;
289

290
static parking_bay_t *parking_bay_for(void *cookie);
291

292
#ifdef POSIX_SUSPEND
293
static void suspend_handler(int sig, siginfo_t *info, void *context);
294
#endif
295

296
#ifdef DEBUG
297
static void print_lock_stats(void)
×
298
{
299
   lock_stats_t s = {};
×
300
   workq_stats_t t = {};
×
301
   for (int i = 0; i < MAX_THREADS; i++) {
×
302
      s.locks     += relaxed_load(&(lock_stats[i].locks));
×
303
      s.contended += relaxed_load(&(lock_stats[i].contended));
×
304
      s.parks     += relaxed_load(&(lock_stats[i].parks));
×
305
      s.spins     += relaxed_load(&(lock_stats[i].spins));
×
306
      s.spurious  += relaxed_load(&(lock_stats[i].spurious));
×
307
      s.retries   += relaxed_load(&(lock_stats[i].retries));
×
308

309
      t.comp   += relaxed_load(&(workq_stats[i].comp));
×
310
      t.steals += relaxed_load(&(workq_stats[i].steals));
×
311
      t.wakes  += relaxed_load(&(workq_stats[i].wakes));
×
312
   }
313

314
   printf("\nLock statistics:\n");
×
315
   printf("\tTotal locks      : %"PRIi64"\n", s.locks);
×
316
   printf("\tContended        : %"PRIi64" (%.1f%%)\n",
×
317
          s.contended, 100.0 * ((double)s.contended / (double)s.locks));
×
318
   printf("\tParked           : %"PRIi64" (%.1f%%)\n",
×
319
          s.parks, 100.0 * ((double)s.parks / (double)s.locks));
×
320
   printf("\tAverage spins    : %.1f\n", (double)s.spins / (double)s.retries);
×
321
   printf("\tSpurious wakeups : %"PRIi64"\n", s.spurious);
×
322

323
   printf("\nWork queue statistics:\n");
×
324
   printf("\tCompleted tasks  : %"PRIi64"\n", t.comp);
×
325
   printf("\tSteals           : %"PRIi64"\n", t.steals);
×
326
   printf("\tWakeups          : %"PRIi64"\n", t.wakes);
×
327
}
×
328
#endif
329

330
#ifdef __MINGW32__
331
static inline void platform_mutex_lock(LPCRITICAL_SECTION lpcs)
332
{
333
   EnterCriticalSection(lpcs);
334
}
335

336
static inline void platform_mutex_unlock(LPCRITICAL_SECTION lpcs)
337
{
338
   LeaveCriticalSection(lpcs);
339
}
340

341
static inline void platform_cond_broadcast(PCONDITION_VARIABLE pcv)
342
{
343
   WakeAllConditionVariable(pcv);
344
}
345

346
static inline void platform_cond_wait(PCONDITION_VARIABLE pcv,
347
                                      LPCRITICAL_SECTION lpcs)
348
{
349
   SleepConditionVariableCS(pcv, lpcs, INFINITE);
350
}
351
#else
352
static inline void platform_mutex_lock(pthread_mutex_t *mtx)
7,219✔
353
{
354
   PTHREAD_CHECK(pthread_mutex_lock, mtx);
7,219✔
355
}
7,219✔
356

357
static inline void platform_mutex_unlock(pthread_mutex_t *mtx)
7,219✔
358
{
359
   PTHREAD_CHECK(pthread_mutex_unlock, mtx);
7,219✔
360
}
7,219✔
361

362
static inline void platform_cond_broadcast(pthread_cond_t *cond)
5,705✔
363
{
364
   PTHREAD_CHECK(pthread_cond_broadcast, cond);
5,705✔
365
}
5,705✔
366

367
static inline void platform_cond_wait(pthread_cond_t *cond,
1,650✔
368
                                      pthread_mutex_t *mtx)
369
{
370
   PTHREAD_CHECK(pthread_cond_wait, cond, mtx);
1,650✔
371
}
1,650✔
372
#endif
373

374
static void join_worker_threads(void)
4,631✔
375
{
376
   SCOPED_A(nvc_thread_t *) join_list = AINIT;
9,262✔
377

378
   for (int i = 1; i < MAX_THREADS; i++) {
296,384✔
379
      nvc_thread_t *t = atomic_load(&threads[i]);
291,753✔
380
      if (t != NULL)
291,753✔
381
         APUSH(join_list, t);
291,753✔
382
   }
383

384
   // Lock the wake mutex here to avoid races with workers sleeping
385
   platform_mutex_lock(&wakelock);
4,631✔
386
   {
387
      atomic_store(&should_stop, true);
4,631✔
388
      platform_cond_broadcast(&wake_workers);
4,631✔
389
   }
390
   platform_mutex_unlock(&wakelock);
4,631✔
391

392
   for (int i = 0; i < join_list.count; i++) {
4,687✔
393
      nvc_thread_t *t = join_list.items[i];
56✔
394

395
      switch (relaxed_load(&t->kind)) {
56✔
396
      case WORKER_THREAD:
56✔
397
         thread_join(t);
56✔
398
         continue;  // Freed thread struct
56✔
399
      case USER_THREAD:
×
400
      case MAIN_THREAD:
401
         fatal_trace("leaked a user thread: %s", t->name);
×
402
      }
403
   }
404

405
   assert(atomic_load(&running_threads) == 1);
4,631✔
406

407
#ifdef DEBUG
408
   for (int i = 0; i < PARKING_BAYS; i++)
301,015✔
409
      assert(parking_bays[i].parked == 0);
296,384✔
410
#endif
411
}
4,631✔
412

413
static nvc_thread_t *thread_new(thread_fn_t fn, void *arg,
4,696✔
414
                                thread_kind_t kind, char *name)
415
{
416
   nvc_thread_t *thread = xcalloc(sizeof(nvc_thread_t));
4,696✔
417
   thread->name     = name;
4,696✔
418
   thread->fn       = fn;
4,696✔
419
   thread->arg      = arg;
4,696✔
420
   thread->kind     = kind;
4,696✔
421
   thread->rngstate = rand();
4,696✔
422

423
   atomic_store(&thread->queue.age.bits, 0);
4,696✔
424
   atomic_store(&thread->queue.bot, 0);
4,696✔
425

426
   int id = 0;
4,696✔
427
   for (; id < MAX_THREADS; id++) {
4,822✔
428
      if (relaxed_load(&(threads[id])) != NULL)
4,822✔
429
         continue;
126✔
430
      else if (atomic_cas(&(threads[id]), NULL, thread))
4,696✔
431
         break;
432
   }
433

434
   if (id == MAX_THREADS)
4,696✔
435
      fatal_trace("cannot create more than %d threads", MAX_THREADS);
×
436

437
   thread->id = id;
4,696✔
438

439
   unsigned max = relaxed_load(&max_thread_id);
4,696✔
440
   while (max < id) {
4,696✔
441
      if (__atomic_cas(&max_thread_id, &max, id))
70✔
442
         break;
443
   }
444

445
   atomic_add(&running_threads, 1);
4,696✔
446
   return thread;
4,696✔
447
}
448

449
#ifdef POSIX_SUSPEND
450
static void unmask_fatal_signals(sigset_t *mask)
6,650✔
451
{
452
   sigdelset(mask, SIGQUIT);
6,650✔
453
   sigdelset(mask, SIGABRT);
6,650✔
454
   sigdelset(mask, SIGTERM);
6,650✔
455
}
6,650✔
456
#endif
457

458
#ifdef __APPLE__
459
static void reset_mach_ports(void)
460
{
461
   for (int i = 0; i < MAX_THREADS; i++) {
462
      nvc_thread_t *t = atomic_load(&(threads[i]));
463
      if (t == NULL)
464
         continue;
465

466
      // Mach ports are not valid after fork
467
      t->port = pthread_mach_thread_np(t->handle);
468
   }
469
}
470
#endif
471

472
void thread_init(void)
4,626✔
473
{
474
   assert(my_thread == NULL);
4,626✔
475

476
   my_thread = thread_new(NULL, NULL, MAIN_THREAD, xstrdup("main thread"));
4,626✔
477

478
#ifdef __MINGW32__
479
   my_thread->handle = GetCurrentThread();
480
#else
481
   my_thread->handle = pthread_self();
4,626✔
482
#endif
483

484
#ifdef __APPLE__
485
   my_thread->port = pthread_mach_thread_np(my_thread->handle);
486
   pthread_atfork(NULL, NULL, reset_mach_ports);
487
#endif
488

489
#ifdef __MINGW32__
490
   InitializeCriticalSectionAndSpinCount(&wakelock, LOCK_SPINS);
491
   InitializeConditionVariable(&wake_workers);
492

493
   for (int i = 0; i < PARKING_BAYS; i++) {
494
      parking_bay_t *bay = &(parking_bays[i]);
495
      InitializeCriticalSectionAndSpinCount(&(bay->mutex), LOCK_SPINS);
496
      InitializeConditionVariable(&(bay->cond));
497
   }
498
#endif
499

500
   assert(my_thread->id == 0);
4,626✔
501

502
   const char *max_env = getenv("NVC_MAX_THREADS");
4,626✔
503
   if (max_env != NULL)
4,626✔
504
      max_workers = MAX(1, MIN(atoi(max_env), MAX_THREADS));
×
505
   else
506
      max_workers = DEFAULT_THREADS;
4,626✔
507

508
   const int num_cpus = nvc_nprocs();
4,626✔
509
   max_workers = MIN(num_cpus, max_workers);
4,626✔
510

511
   const char *jobs_env = getenv("NVC_CONCURRENT_JOBS");
4,626✔
512
   if (jobs_env != NULL) {
4,626✔
513
      const int num_jobs = MAX(1, atoi(jobs_env));
×
514
      const int limit = (int)round((double)num_cpus / (double)num_jobs);
×
515
      max_workers = MAX(1, MIN(max_workers, limit));
×
516
   }
517

518
   assert(max_workers > 0);
4,626✔
519

520
#ifdef DEBUG
521
   if (getenv("NVC_THREAD_VERBOSE") != NULL)
4,626✔
522
      atexit(print_lock_stats);
×
523
#endif
524

525
   atexit(join_worker_threads);
4,626✔
526

527
#ifdef POSIX_SUSPEND
528
   sem_init(&stop_sem, 0, 0);
4,626✔
529

530
   struct sigaction sa = {
4,626✔
531
      .sa_sigaction = suspend_handler,
532
      .sa_flags = SA_RESTART | SA_SIGINFO
533
   };
534
   sigfillset(&sa.sa_mask);
4,626✔
535
   unmask_fatal_signals(&sa.sa_mask);
4,626✔
536

537
   sigaction(SIGSUSPEND, &sa, NULL);
4,626✔
538
   sigaction(SIGRESUME, &sa, NULL);
4,626✔
539

540
   sigset_t mask;
4,626✔
541
   PTHREAD_CHECK(pthread_sigmask, SIG_SETMASK, NULL, &mask);
4,626✔
542

543
   sigdelset(&mask, SIGSUSPEND);
4,626✔
544
   sigaddset(&mask, SIGRESUME);
4,626✔
545

546
   PTHREAD_CHECK(pthread_sigmask, SIG_SETMASK, &mask, NULL);
4,626✔
547
#endif
548
}
4,626✔
549

550
int thread_id(void)
75,603,051✔
551
{
552
   assert(my_thread != NULL);
75,603,051✔
553
   return my_thread->id;
75,603,051✔
554
}
555

556
bool thread_attached(void)
45✔
557
{
558
   return my_thread != NULL;
45✔
559
}
560

561
void thread_sleep(int usec)
×
562
{
563
   usleep(usec);
×
564
}
×
565

566
static void *thread_wrapper(void *arg)
70✔
567
{
568
   assert(my_thread == NULL);
70✔
569
   my_thread = arg;
70✔
570

571
   void *result = (*my_thread->fn)(my_thread->arg);
70✔
572

573
   // Avoid races with stop_world
574
   SCOPED_LOCK(stop_lock);
70✔
575

576
   assert(threads[my_thread->id] == my_thread);
70✔
577
   atomic_store(&(threads[my_thread->id]),  NULL);
70✔
578

579
   atomic_add(&running_threads, -1);
70✔
580
   return result;
70✔
581
}
582

583
#ifdef __MINGW32__
584
static DWORD win32_thread_wrapper(LPVOID param)
585
{
586
   void *ret = thread_wrapper(param);
587
   atomic_store(&(my_thread->retval), ret);
588
   return 0;
589
}
590
#endif
591

592
static void thread_start(nvc_thread_t *thread)
70✔
593
{
594
   assert_lock_held(&stop_lock);
70✔
595

596
#ifdef __MINGW32__
597
   if ((thread->handle = CreateThread(NULL, 0, win32_thread_wrapper,
598
                                      thread, 0, NULL)) == NULL)
599
      fatal_errno("CreateThread");
600
#else
601
   PTHREAD_CHECK(pthread_create, &(thread->handle), NULL,
70✔
602
                 thread_wrapper, thread);
603
#endif
604

605
#ifdef __APPLE__
606
   thread->port = pthread_mach_thread_np(thread->handle);
607
#endif
608
}
70✔
609

610
nvc_thread_t *thread_create(thread_fn_t fn, void *arg, const char *fmt, ...)
14✔
611
{
612
   va_list ap;
14✔
613
   va_start(ap, fmt);
14✔
614
   char *name = xvasprintf(fmt, ap);
14✔
615
   va_end(ap);
14✔
616

617
   // Avoid races with stop_world
618
   SCOPED_LOCK(stop_lock);
14✔
619

620
   nvc_thread_t *thread = thread_new(fn, arg, USER_THREAD, name);
14✔
621
   thread_start(thread);
14✔
622

623
   return thread;
14✔
624
}
625

626
void *thread_join(nvc_thread_t *thread)
70✔
627
{
628
   if (thread == my_thread || thread->kind == MAIN_THREAD)
70✔
UNCOV
629
      fatal_trace("cannot join self or main thread");
×
630

631
   void *retval = NULL;
70✔
632
#ifdef __MINGW32__
633
   if (WaitForSingleObject(thread->handle, INFINITE) == WAIT_FAILED)
634
      fatal_errno("WaitForSingleObject failed for thread %s", thread->name);
635

636
   retval = atomic_load(&(thread->retval));
637
#else
638
   PTHREAD_CHECK(pthread_join, thread->handle, &retval);
70✔
639
#endif
640

641
   async_free(thread->name);
70✔
642
   async_free(thread);
70✔
643

644
   return retval;
70✔
645
}
646

647
nvc_thread_t *get_thread(int id)
27,072✔
648
{
649
   assert(id >= 0 && id < MAX_THREADS);
27,072✔
650
   return atomic_load(&threads[id]);
27,072✔
651
}
652

653
static inline parking_bay_t *parking_bay_for(void *cookie)
2,501✔
654
{
655
   return &(parking_bays[mix_bits_64(cookie) % PARKING_BAYS]);
2,501✔
656
}
657

658
static void thread_park(void *cookie, park_fn_t fn)
1,570✔
659
{
660
   parking_bay_t *bay = parking_bay_for(cookie);
1,570✔
661

662
   platform_mutex_lock(&(bay->mutex));
1,570✔
663
   {
664
      if ((*fn)(bay, cookie)) {
1,570✔
665
         bay->parked++;
1,563✔
666
         platform_cond_wait(&(bay->cond), &(bay->mutex));
1,563✔
667
         assert(bay->parked > 0);
1,563✔
668
         bay->parked--;
1,563✔
669
      }
670
   }
671
   platform_mutex_unlock(&(bay->mutex));
1,570✔
672
}
1,570✔
673

674
static void thread_unpark(void *cookie, unpark_fn_t fn)
931✔
675
{
676
   parking_bay_t *bay = parking_bay_for(cookie);
931✔
677

678
   if (fn != NULL) {
931✔
679
      platform_mutex_lock(&(bay->mutex));
931✔
680
      {
681
         (*fn)(bay, cookie);
931✔
682
      }
683
      platform_mutex_unlock(&(bay->mutex));
931✔
684
   }
685

686
   // Do not use pthread_cond_signal here as multiple threads parked in
687
   // this bay may be waiting on different cookies
688
   platform_cond_broadcast(&(bay->cond));
931✔
689
}
931✔
690

691
void spin_wait(void)
58,887,582✔
692
{
693
#ifdef __x86_64__
694
   __asm__ volatile ("pause");
58,887,582✔
695
#elif defined __aarch64__
696
   // YIELD is a no-op on most AArch64 cores so also do an ISB to stall
697
   // the pipeline for a bit
698
   __asm__ volatile ("yield; isb");
699
#endif
700
}
58,887,582✔
701

702
static bool lock_park_cb(parking_bay_t *bay, void *cookie)
1,570✔
703
{
704
   nvc_lock_t *lock = cookie;
1,570✔
705

706
   // This is called with the park mutex held: check the lock is still
707
   // owned by someone and the park bit is still set
708
   return relaxed_load(lock) == (IS_LOCKED | HAS_PARKED);
1,570✔
709
}
710

711
static void lock_unpark_cb(parking_bay_t *bay, void *cookie)
931✔
712
{
713
   nvc_lock_t *lock = cookie;
931✔
714

715
   assert(relaxed_load(lock) == (IS_LOCKED | HAS_PARKED));
931✔
716

717
   // Unlock must have release semantics
718
   atomic_store(lock, (bay->parked > 0 ? HAS_PARKED : 0));
931✔
719
}
931✔
720

721
void nvc_lock(nvc_lock_t *lock)
70,256,310✔
722
{
723
   LOCK_EVENT(locks, 1);
70,256,310✔
724
   TSAN_PRE_LOCK(lock);
70,256,310✔
725

726
   int8_t state = 0;
70,256,310✔
727
   if (likely(__atomic_cas(lock, &state, state | IS_LOCKED)))
70,256,310✔
728
      goto locked;  // Fast path: acquired the lock without contention
70,234,243✔
729

730
   LOCK_EVENT(contended, 1);
22,067✔
731

732
   for (;;) {
47,538✔
733
      LOCK_EVENT(retries, 1);
47,538✔
734

735
      // Spin a few times waiting for the owner to release the lock
736
      // before parking
737
      int spins = 0;
738
      for (; (state & IS_LOCKED) && spins < LOCK_SPINS;
250,523✔
739
           spins++, state = relaxed_load(lock))
202,985✔
740
         spin_wait();
202,985✔
741

742
      if (state & IS_LOCKED) {
47,538✔
743
         // Ignore failures here as we will check the lock state again
744
         // in the callback with the park mutex held
745
         atomic_cas(lock, IS_LOCKED, IS_LOCKED | HAS_PARKED);
1,570✔
746

747
         LOCK_EVENT(parks, 1);
1,570✔
748
         thread_park(lock, lock_park_cb);
1,570✔
749

750
         if ((state = relaxed_load(lock)) & IS_LOCKED) {
1,570✔
751
            // Someone else grabbed the lock before our thread was unparked
752
            LOCK_EVENT(spurious, 1);
1,349✔
753
            continue;
1,349✔
754
         }
755
      }
756
      else
757
         LOCK_EVENT(spins, spins);
45,968✔
758

759
      assert(!(state & IS_LOCKED));
46,189✔
760

761
      // If we get here then we've seen the lock in an unowned state:
762
      // attempt to grab it with a CAS
763
      if (__atomic_cas(lock, &state, state | IS_LOCKED))
46,189✔
764
         goto locked;
22,067✔
765
   }
766

767
 locked:
70,256,310✔
768
   TSAN_POST_LOCK(lock);
70,256,310✔
769
}
70,256,310✔
770

771
void nvc_unlock(nvc_lock_t *lock)
70,256,307✔
772
{
773
   TSAN_PRE_UNLOCK(lock);
70,256,307✔
774

775
   // Fast path: unlock assuming no parked waiters
776
   if (likely(atomic_cas(lock, IS_LOCKED, 0)))
70,256,307✔
777
      goto unlocked;
70,255,376✔
778

779
   // If we get here then we must own the lock with at least one parked
780
   // waiting thread
781
   assert(relaxed_load(lock) == (IS_LOCKED | HAS_PARKED));
931✔
782

783
   // Lock released in callback
784
   thread_unpark(lock, lock_unpark_cb);
931✔
785

786
 unlocked:
70,256,307✔
787
   TSAN_POST_UNLOCK(lock);
70,256,307✔
788
}
70,256,307✔
789

790
#ifdef DEBUG
791
void assert_lock_held(nvc_lock_t *lock)
161,452✔
792
{
793
   int8_t state = relaxed_load(lock);
161,452✔
794
   if (unlikely(!(state & IS_LOCKED)))
161,452✔
UNCOV
795
      fatal_trace("expected lock at %p to be held", lock);
×
796
}
161,452✔
797
#endif
798

799
void __scoped_unlock(nvc_lock_t **plock)
70,205,352✔
800
{
801
   nvc_unlock(*plock);
70,205,352✔
802
}
70,205,352✔
803

804
static void push_bot(threadq_t *tq, const task_t *tasks, size_t count)
92✔
805
{
806
   const abp_idx_t bot = relaxed_load(&tq->bot);
92✔
807
   assert(bot + count <= THREADQ_SIZE);
92✔
808

809
   memcpy(tq->deque + bot, tasks, count * sizeof(task_t));
92✔
810
   store_release(&tq->bot, bot + count);
92✔
811
}
92✔
812

813
static bool pop_bot(threadq_t *tq, task_t *task)
249✔
814
{
815
   const abp_idx_t old_bot = relaxed_load(&tq->bot);
249✔
816
   if (old_bot == 0)
249✔
817
      return false;
818

819
   const abp_idx_t new_bot = old_bot - 1;
195✔
820
   atomic_store(&tq->bot, new_bot);
195✔
821

822
   *task = tq->deque[new_bot];
195✔
823

824
   const abp_age_t old_age = { .bits = atomic_load(&tq->age.bits) };
195✔
825
   if (new_bot > old_age.top)
195✔
826
      return true;
827

828
   atomic_store(&tq->bot, 0);
92✔
829

830
   const abp_age_t new_age = { .top = 0, .tag = old_age.tag + 1 };
92✔
831
   if (new_bot == old_age.top) {
92✔
832
      if (atomic_cas(&tq->age.bits, old_age.bits, new_age.bits))
54✔
833
         return true;
834
   }
835

836
   atomic_store(&tq->age.bits, new_age.bits);
38✔
837
   return false;
38✔
838
}
839

840
__attribute__((no_sanitize("thread")))
841
static bool pop_top(threadq_t *tq, task_t *task)
80✔
842
{
843
   const abp_age_t old_age = { .bits = atomic_load(&tq->age.bits) };
80✔
844
   const abp_idx_t bot = atomic_load(&tq->bot);
80✔
845

846
   if (bot <= old_age.top)
80✔
847
      return false;
848

849
   // This triggers a TSAN false-positive: we will never read from *task
850
   // if the CAS fails below, so it's safe to ignore
851
   *task = tq->deque[old_age.top];
73✔
852

853
   const abp_age_t new_age = {
73✔
854
      .tag = old_age.tag,
855
      .top = old_age.top + 1
73✔
856
   };
857

858
   return atomic_cas(&tq->age.bits, old_age.bits, new_age.bits);
73✔
859
}
860

861
static void globalq_put(globalq_t *gq, const task_t *tasks, size_t count)
143✔
862
{
863
   assert_lock_held(&gq->lock);
143✔
864

865
   if (gq->wptr == gq->rptr)
143✔
866
      gq->wptr = gq->rptr = 0;
85✔
867

868
   if (gq->wptr + count > gq->max) {
143✔
869
      gq->max = next_power_of_2(gq->wptr + count);
38✔
870
      gq->tasks = xrealloc_array(gq->tasks, gq->max, sizeof(task_t));
38✔
871
   }
872

873
   memcpy(gq->tasks + gq->wptr, tasks, count * sizeof(task_t));
143✔
874
   gq->wptr += count;
143✔
875
}
143✔
876

877
__attribute__((no_sanitize("thread")))
878
static bool globalq_unlocked_empty(globalq_t *gq)
446✔
879
{
880
   return relaxed_load(&gq->wptr) == relaxed_load(&gq->rptr);
446✔
881
}
882

883
static size_t globalq_take(globalq_t *gq, threadq_t *tq)
446✔
884
{
885
   if (globalq_unlocked_empty(gq))
446✔
886
      return 0;
887

888
   const int nthreads = relaxed_load(&running_threads);
106✔
889

890
   SCOPED_LOCK(gq->lock);
212✔
891

892
   if (gq->wptr == gq->rptr)
106✔
893
      return 0;
894

895
   const int remain = gq->wptr - gq->rptr;
92✔
896
   const int share = gq->wptr / nthreads;
92✔
897
   const int take = MIN(remain, MAX(MIN_TAKE, MIN(THREADQ_SIZE, share)));
92✔
898
   const int from = gq->rptr;
92✔
899

900
   gq->rptr += take;
92✔
901

902
   push_bot(tq, gq->tasks + from, take);
92✔
903
   return take;
92✔
904
}
905

906
static void execute_task(task_t *task)
1,304✔
907
{
908
   (*task->fn)(task->context, task->arg);
1,304✔
909

910
   if (task->workq != NULL) {
1,304✔
911
      entryq_t *eq = &(task->workq->entryqs[my_thread->id]);
1,193✔
912

913
      const entryq_ptr_t cur = { .bits = relaxed_load(&eq->comp.bits) };
1,193✔
914
      const int epoch = atomic_load(&task->workq->epoch);
1,193✔
915
      const int count = cur.epoch == epoch ? cur.count : 0;
1,193✔
916
      const entryq_ptr_t next = { .count = count + 1, .epoch = epoch };
1,193✔
917
      store_release(&eq->comp.bits, next.bits);
1,193✔
918
   }
919
   else
920
      atomic_add(&async_pending, -1);
111✔
921
}
1,304✔
922

923
static bool globalq_poll(globalq_t *gq, threadq_t *tq)
446✔
924
{
925
   int ntasks;
446✔
926
   if ((ntasks = globalq_take(gq, tq))) {
446✔
927
      task_t task;
92✔
928
      int comp = 0;
92✔
929
      for (; pop_bot(tq, &task); comp++)
341✔
930
         execute_task(&task);
157✔
931

932
      WORKQ_EVENT(comp, comp);
92✔
933
      return true;
92✔
934
   }
935
   else
936
      return false;
937
}
938

939
workq_t *workq_new(void *context)
1,106✔
940
{
941
   if (my_thread->kind != MAIN_THREAD)
1,106✔
UNCOV
942
      fatal_trace("work queues can only be created by the main thread");
×
943

944
   workq_t *wq = xcalloc(sizeof(workq_t));
1,106✔
945
   wq->state    = IDLE;
1,106✔
946
   wq->context  = context;
1,106✔
947
   wq->parallel = max_workers > 1;
1,106✔
948
   wq->epoch    = 1;
1,106✔
949

950
   return wq;
1,106✔
951
}
952

UNCOV
953
void workq_not_thread_safe(workq_t *wq)
×
954
{
955
   wq->parallel = false;
×
UNCOV
956
}
×
957

958
void workq_free(workq_t *wq)
1,106✔
959
{
960
   if (my_thread->kind != MAIN_THREAD)
1,106✔
UNCOV
961
      fatal_trace("work queues can only be freed by the main thread");
×
962

963
   assert(wq->state == IDLE);
1,106✔
964

965
   for (int i = 0; i < MAX_THREADS; i++)
71,890✔
966
      free(wq->entryqs[i].tasks);
70,784✔
967

968
   free(wq);
1,106✔
969
}
1,106✔
970

971
void workq_do(workq_t *wq, task_fn_t fn, void *arg)
1,193✔
972
{
973
   assert(wq->state == IDLE);
1,193✔
974

975
   entryq_t *eq = &(wq->entryqs[my_thread->id]);
1,193✔
976

977
   const entryq_ptr_t cur = { .bits = relaxed_load(&eq->wptr.bits) };
1,193✔
978
   const int epoch = atomic_load(&wq->epoch);
1,193✔
979
   const int wptr = cur.epoch == epoch ? cur.count : 0;
1,193✔
980

981
   if (wptr == eq->queuesz) {
1,193✔
982
      eq->queuesz = MAX(eq->queuesz * 2, 64);
1,106✔
983
      eq->tasks = xrealloc_array(eq->tasks, eq->queuesz, sizeof(task_t));
1,106✔
984
   }
985

986
   eq->tasks[wptr] = (task_t){ fn, wq->context, arg, wq };
1,193✔
987

988
   const entryq_ptr_t next = { .count = wptr + 1, .epoch = epoch };
1,193✔
989
   store_release(&eq->wptr.bits, next.bits);
1,193✔
990

991
   unsigned maxthread = relaxed_load(&wq->maxthread);
1,193✔
992
   while (maxthread < my_thread->id) {
1,193✔
UNCOV
993
      if (__atomic_cas(&wq->maxthread, &maxthread, my_thread->id))
×
994
         break;
995
   }
996
}
1,193✔
997

998
void workq_scan(workq_t *wq, scan_fn_t fn, void *arg)
×
999
{
UNCOV
1000
   const int maxthread = relaxed_load(&wq->maxthread);
×
1001
   for (int i = 0; i <= maxthread; i++) {
×
UNCOV
1002
      entryq_t *eq = &(wq->entryqs[my_thread->id]);
×
UNCOV
1003
      const entryq_ptr_t wptr = { .bits = load_acquire(&eq->wptr.bits) };
×
UNCOV
1004
      for (int j = 0; j < wptr.count; j++)
×
UNCOV
1005
         (*fn)(wq->context, eq->tasks[j].arg, arg);
×
1006
   }
UNCOV
1007
}
×
1008

1009
static int estimate_depth(threadq_t *tq)
1,056✔
1010
{
1011
   const abp_age_t age = { .bits = relaxed_load(&tq->age.bits) };
1,056✔
1012
   const abp_idx_t bot = relaxed_load(&tq->bot);
1,056✔
1013

1014
   return bot <= age.top ? 0 : bot - age.top;
1,056✔
1015
}
1016

1017
static threadq_t *get_thread_queue(int id)
1,056✔
1018
{
1019
   assert(id < MAX_THREADS);
1,056✔
1020

1021
   nvc_thread_t *t = atomic_load(&(threads[id]));
1,056✔
1022
   if (t == NULL)
1,056✔
1023
      return NULL;
1024

1025
   return &(t->queue);
1,056✔
1026
}
1027

1028
static uint32_t fast_rand(void)
331✔
1029
{
1030
   uint32_t state = my_thread->rngstate;
331✔
1031
   state ^= (state << 13);
331✔
1032
   state ^= (state >> 17);
331✔
1033
   state ^= (state << 5);
331✔
1034
   return (my_thread->rngstate = state);
331✔
1035
}
1036

1037
static threadq_t *find_victim(void)
370✔
1038
{
1039
   threadq_t *last = get_thread_queue(my_thread->victim);
370✔
1040
   if (last != NULL && estimate_depth(last) > 0)
370✔
1041
      return last;
1042

1043
   const int maxthread = relaxed_load(&max_thread_id);
331✔
1044
   const int start = fast_rand() % (maxthread + 1);
331✔
1045
   int idx = start;
331✔
1046
   do {
998✔
1047
      if (idx != my_thread->id) {
998✔
1048
         threadq_t *q = get_thread_queue(idx);
686✔
1049
         if (q != NULL && estimate_depth(q) > 0) {
686✔
1050
            my_thread->victim = idx;
41✔
1051
            return q;
41✔
1052
         }
1053
      }
1054
   } while ((idx = (idx + 1) % (maxthread + 1)) != start);
957✔
1055

1056
   return NULL;
1057
}
1058

1059
static bool steal_task(void)
370✔
1060
{
1061
   threadq_t *tq = find_victim();
370✔
1062
   if (tq == NULL)
370✔
1063
      return false;
1064

1065
   task_t task;
80✔
1066
   if (pop_top(tq, &task)) {
80✔
1067
      WORKQ_EVENT(steals, 1);
73✔
1068
      execute_task(&task);
73✔
1069
      WORKQ_EVENT(comp, 1);
73✔
1070
      return true;
73✔
1071
   }
1072

1073
   return false;
1074
}
1075

1076
static void progressive_backoff(void)
60,513,529✔
1077
{
1078
   if (my_thread->spins++ < YIELD_SPINS)
60,513,529✔
1079
      spin_wait();
58,679,792✔
1080
   else {
1081
#ifdef __MINGW32__
1082
      SwitchToThread();
1083
#else
1084
      sched_yield();
1,833,737✔
1085
#endif
1086
      my_thread->spins = 0;
1,833,737✔
1087
   }
1088
}
60,513,529✔
1089

1090
static void *worker_thread(void *arg)
56✔
1091
{
1092
   mspace_stack_limit(MSPACE_CURRENT_FRAME);
56✔
1093

1094
   do {
392✔
1095
      if (globalq_poll(&globalq, &(my_thread->queue)) || steal_task())
392✔
1096
         my_thread->spins = 0;  // Did work
127✔
1097
      else if (my_thread->spins++ < 2)
265✔
1098
         spin_wait();
178✔
1099
      else {
1100
         platform_mutex_lock(&wakelock);
87✔
1101
         {
1102
            if (!relaxed_load(&should_stop))
87✔
1103
               platform_cond_wait(&wake_workers, &wakelock);
87✔
1104
         }
1105
         platform_mutex_unlock(&wakelock);
87✔
1106
      }
1107
   } while (likely(!relaxed_load(&should_stop)));
392✔
1108

1109
   return NULL;
56✔
1110
}
1111

1112
static void create_workers(int needed)
143✔
1113
{
1114
   assert(my_thread->kind == MAIN_THREAD);
143✔
1115

1116
   if (relaxed_load(&should_stop))
143✔
1117
      return;
1118

1119
   while (relaxed_load(&running_threads) < MIN(max_workers, needed)) {
199✔
1120
      static int counter = 0;
56✔
1121
      char *name = xasprintf("worker thread %d", atomic_add(&counter, 1));
56✔
1122
      SCOPED_LOCK(stop_lock);   // Avoid races with stop_world
112✔
1123
      nvc_thread_t *thread =
56✔
1124
         thread_new(worker_thread, NULL, WORKER_THREAD, name);
56✔
1125
      thread_start(thread);
56✔
1126
   }
1127

1128
   platform_cond_broadcast(&wake_workers);
143✔
1129
}
1130

1131
void workq_start(workq_t *wq)
1,106✔
1132
{
1133
   assert(my_thread->kind == MAIN_THREAD);
1,106✔
1134

1135
   const int epoch = relaxed_load(&wq->epoch);
1,106✔
1136
   const int maxthread = relaxed_load(&wq->maxthread);
1,106✔
1137

1138
   assert(wq->state == IDLE);
1,106✔
1139
   wq->state = START;
1,106✔
1140

1141
   int nserial = 0, nparallel = 0;
1,106✔
1142
   for (int i = 0; i <= maxthread; i++) {
2,212✔
1143
      entryq_t *eq = &wq->entryqs[i];
1,106✔
1144
      const entryq_ptr_t wptr = { .bits = load_acquire(&eq->wptr.bits) };
1,106✔
1145
      if (wptr.epoch == epoch) {
1,106✔
1146
         // Only bump epoch if there are tasks to run
1147
         if (nserial + nparallel == 0)
1,106✔
1148
            atomic_add(&wq->epoch, 1);
1,106✔
1149

1150
         assert(wptr.count > 0);
1,106✔
1151

1152
         if (i == maxthread && nserial + nparallel == 0 && wptr.count == 1) {
1,106✔
1153
            execute_task(&(eq->tasks[0]));   // Only one task in total
1,074✔
1154
            nserial++;
1,074✔
1155
         }
1156
         else if (wq->parallel) {
32✔
1157
            if (nparallel == 0)
32✔
1158
               nvc_lock(&globalq.lock);   // Lazily acquire lock
32✔
1159
            globalq_put(&globalq, eq->tasks, wptr.count);
32✔
1160
            nparallel += wptr.count;
32✔
1161
         }
1162
         else {
UNCOV
1163
            for (int j = 0; j < wptr.count; j++)
×
1164
               execute_task(&(eq->tasks[j]));
×
1165
            nserial += wptr.count;
×
1166
         }
1167
      }
1168
   }
1169

1170
   if (wq->parallel && nparallel > 0) {
1,106✔
1171
      nvc_unlock(&globalq.lock);
32✔
1172
      create_workers(nparallel);
32✔
1173
   }
1174
}
1,106✔
1175

1176
static int workq_outstanding(workq_t *wq)
60,377,416✔
1177
{
1178
   assert(wq->state == START);
60,377,416✔
1179

1180
   const int epoch = atomic_load(&wq->epoch);
60,377,416✔
1181
   const int maxthread = relaxed_load(&max_thread_id);
60,377,416✔
1182

1183
   int pending = 0;
60,377,416✔
1184
   for (int i = 0; i <= maxthread; i++) {
223,190,398✔
1185
      entryq_t *eq = &(wq->entryqs[i]);
162,812,982✔
1186

1187
      const entryq_ptr_t wptr = { .bits = load_acquire(&eq->wptr.bits) };
162,812,982✔
1188
      if (wptr.epoch == epoch - 1)
162,812,982✔
1189
         pending += wptr.count;
60,377,416✔
1190

1191
      const entryq_ptr_t comp = { .bits = load_acquire(&eq->comp.bits) };
162,812,982✔
1192
      if (comp.epoch == epoch)
162,812,982✔
1193
         pending -= comp.count;
98,200,705✔
1194
   }
1195

1196
   assert(pending >= 0);
60,377,416✔
1197
   return pending;
60,377,416✔
1198
}
1199

1200
static void workq_parallel_drain(workq_t *wq)
1,106✔
1201
{
1202
   if (workq_outstanding(wq) > 0) {
1,106✔
1203
      while (globalq_poll(&globalq, &(my_thread->queue)));
53✔
1204
      while (steal_task());
48✔
1205

1206
      while (workq_outstanding(wq) > 0)
60,376,310✔
1207
         progressive_backoff();
60,376,278✔
1208
   }
1209

1210
   wq->state = IDLE;
1,106✔
1211
}
1,106✔
1212

1213
void workq_drain(workq_t *wq)
1,106✔
1214
{
1215
   if (my_thread->kind != MAIN_THREAD)
1,106✔
UNCOV
1216
      fatal_trace("workq_drain can only be called from the main thread");
×
1217

1218
   if (wq->parallel)
1,106✔
1219
      workq_parallel_drain(wq);
1,106✔
1220
   else {
UNCOV
1221
      assert(wq->state == START);
×
1222
      wq->state = IDLE;
×
1223
   }
1224
}
1,106✔
1225

1226
void async_do(task_fn_t fn, void *context, void *arg)
111✔
1227
{
1228
   if (max_workers == 1)
111✔
UNCOV
1229
      (*fn)(context, arg);   // Single CPU
×
1230
   else {
1231
      const int npending = atomic_add(&async_pending, 1);
111✔
1232
      create_workers(npending + 1 /* Do not count main thread */);
111✔
1233

1234
      task_t tasks[1] = {
111✔
1235
         { fn, context, arg, NULL }
1236
      };
1237
      SCOPED_LOCK(globalq.lock);
222✔
1238
      globalq_put(&globalq, tasks, 1);
111✔
1239
   }
1240
}
111✔
1241

1242
void async_barrier(void)
8,108✔
1243
{
1244
   while (atomic_load(&async_pending) > 0) {
16,217✔
1245
      if (!globalq_poll(&globalq, &(my_thread->queue)))
1✔
UNCOV
1246
         progressive_backoff();
×
1247
   }
1248
}
8,108✔
1249

1250
void async_free(void *ptr)
6,090✔
1251
{
1252
   if (relaxed_load(&running_threads) == 1)
6,090✔
1253
      free(ptr);
6,073✔
1254
   else if (ptr != NULL) {
1255
      // TODO: free when all threads in quiescent state
1256
   }
6,090✔
1257
}
6,090✔
1258

1259
#ifdef POSIX_SUSPEND
1260
static void suspend_handler(int sig, siginfo_t *info, void *context)
4,048✔
1261
{
1262
   if (info->si_pid != getpid())
4,048✔
1263
      return;   // Not sent by us, ignore it
2,024✔
1264
   else if (sig == SIGRESUME)
4,048✔
1265
      return;
1266

1267
   const int olderrno = errno;
2,024✔
1268

1269
   if (my_thread != NULL) {
2,024✔
1270
      struct cpu_state cpu;
2,024✔
1271
      fill_cpu_state(&cpu, (ucontext_t *)context);
2,024✔
1272

1273
      stop_world_fn_t callback = atomic_load(&stop_callback);
2,024✔
1274
      void *arg = atomic_load(&stop_arg);
2,024✔
1275

1276
      (*callback)(my_thread->id, &cpu, arg);
2,024✔
1277
   }
1278

1279
   sem_post(&stop_sem);
2,024✔
1280

1281
   sigset_t mask;
2,024✔
1282
   sigfillset(&mask);
2,024✔
1283
   unmask_fatal_signals(&mask);
2,024✔
1284
   sigdelset(&mask, SIGRESUME);
2,024✔
1285

1286
   sigsuspend(&mask);
2,024✔
1287

1288
   sem_post(&stop_sem);
2,024✔
1289

1290
   errno = olderrno;
2,024✔
1291
}
1292
#endif
1293

1294
void stop_world(stop_world_fn_t callback, void *arg)
923✔
1295
{
1296
   nvc_lock(&stop_lock);
923✔
1297

1298
   atomic_store(&stop_callback, callback);
923✔
1299
   atomic_store(&stop_arg, arg);
923✔
1300

1301
#ifdef __MINGW32__
1302
   const int maxthread = relaxed_load(&max_thread_id);
1303
   for (int i = 0; i <= maxthread; i++) {
1304
      nvc_thread_t *thread = atomic_load(&threads[i]);
1305
      if (thread == NULL || thread == my_thread)
1306
         continue;
1307

1308
      if (SuspendThread(thread->handle) != 0)
1309
         fatal_errno("SuspendThread");
1310

1311
      CONTEXT context;
1312
      context.ContextFlags = CONTEXT_INTEGER | CONTEXT_CONTROL;
1313
      if (!GetThreadContext(thread->handle, &context))
1314
         fatal_errno("GetThreadContext");
1315

1316
      struct cpu_state cpu;
1317
      fill_cpu_state(&cpu, &context);
1318

1319
      (*callback)(thread->id, &cpu, arg);
1320
   }
1321
#elif defined __APPLE__
1322
   const int maxthread = relaxed_load(&max_thread_id);
1323
   for (int i = 0; i <= maxthread; i++) {
1324
      nvc_thread_t *thread = atomic_load(&threads[i]);
1325
      if (thread == NULL || thread == my_thread)
1326
         continue;
1327

1328
      assert(thread->port != MACH_PORT_NULL);
1329

1330
      kern_return_t kern_result;
1331
      do {
1332
         kern_result = thread_suspend(thread->port);
1333
      } while (kern_result == KERN_ABORTED);
1334

1335
      if (kern_result != KERN_SUCCESS)
1336
         fatal_trace("failed to suspend thread %d (%d)", i, kern_result);
1337

1338
#ifdef __aarch64__
1339
      arm_thread_state64_t state;
1340
      mach_msg_type_number_t count = ARM_THREAD_STATE64_COUNT;
1341
      thread_state_flavor_t flavor = ARM_THREAD_STATE64;
1342
#else
1343
      x86_thread_state64_t state;
1344
      mach_msg_type_number_t count = x86_THREAD_STATE64_COUNT;
1345
      thread_state_flavor_t flavor = x86_THREAD_STATE64;
1346
#endif
1347
      kern_result = thread_get_state(thread->port, flavor,
1348
                                     (natural_t *)&state, &count);
1349
      if (kern_result != KERN_SUCCESS)
1350
         fatal_trace("failed to get thread %d state (%d)", i, kern_result);
1351

1352
      // Fake a ucontext_t that we can pass to fill_cpu_state
1353
      ucontext_t uc;
1354
      typeof(*uc.uc_mcontext) mc;
1355
      uc.uc_mcontext = &mc;
1356
      mc.__ss = state;
1357

1358
      struct cpu_state cpu;
1359
      fill_cpu_state(&cpu, &uc);
1360

1361
      (*callback)(thread->id, &cpu, arg);
1362
   }
1363
#elif defined __SANITIZE_THREAD__
1364
   // https://github.com/google/sanitizers/issues/1179
1365
   fatal_trace("stop_world is not supported with tsan");
1366
#else
1367
   assert(sem_trywait(&stop_sem) == -1 && errno == EAGAIN);
923✔
1368

1369
   int signalled = 0;
923✔
1370
   const int maxthread = relaxed_load(&max_thread_id);
923✔
1371
   for (int i = 0; i <= maxthread; i++) {
4,309✔
1372
      nvc_thread_t *thread = atomic_load(&threads[i]);
3,386✔
1373
      if (thread == NULL || thread == my_thread)
3,386✔
1374
         continue;
1,362✔
1375

1376
      PTHREAD_CHECK(pthread_kill, thread->handle, SIGSUSPEND);
2,024✔
1377
      signalled++;
2,024✔
1378
   }
1379

1380
   struct timespec ts;
923✔
1381
   if (clock_gettime(CLOCK_REALTIME, &ts) != 0)
923✔
UNCOV
1382
      fatal_errno("clock_gettime");
×
1383

1384
   ts.tv_sec += SUSPEND_TIMEOUT;
923✔
1385

1386
   for (; signalled > 0; signalled--) {
2,947✔
1387
      if (sem_timedwait(&stop_sem, &ts) != 0)
2,024✔
UNCOV
1388
         fatal_trace("timeout waiting for %d threads to suspend", signalled);
×
1389
   }
1390

1391
   assert(sem_trywait(&stop_sem) == -1 && errno == EAGAIN);
923✔
1392
#endif
1393

1394
   struct cpu_state cpu;
923✔
1395
   capture_registers(&cpu);
923✔
1396

1397
   (*callback)(my_thread->id, &cpu, arg);
923✔
1398
}
923✔
1399

1400
void start_world(void)
923✔
1401
{
1402
   assert_lock_held(&stop_lock);
923✔
1403

1404
   const int maxthread = relaxed_load(&max_thread_id);
923✔
1405

1406
#ifdef __MINGW32__
1407
   for (int i = 0; i <= maxthread; i++) {
1408
      nvc_thread_t *thread = atomic_load(&threads[i]);
1409
      if (thread == NULL || thread == my_thread)
1410
         continue;
1411

1412
      if (ResumeThread(thread->handle) != 1)
1413
         fatal_errno("ResumeThread");
1414
   }
1415
#elif defined __APPLE__
1416
   for (int i = 0; i <= maxthread; i++) {
1417
      nvc_thread_t *thread = atomic_load(&threads[i]);
1418
      if (thread == NULL || thread == my_thread)
1419
         continue;
1420

1421
      kern_return_t kern_result;
1422
      do {
1423
         kern_result = thread_resume(thread->port);
1424
      } while (kern_result == KERN_ABORTED);
1425

1426
      if (kern_result != KERN_SUCCESS)
1427
         fatal_trace("failed to resume thread %d (%d)", i, kern_result);
1428
   }
1429
#else
1430
   assert(sem_trywait(&stop_sem) == -1 && errno == EAGAIN);
923✔
1431

1432
   int signalled = 0;
1433
   for (int i = 0; i <= maxthread; i++) {
4,309✔
1434
      nvc_thread_t *thread = atomic_load(&threads[i]);
3,386✔
1435
      if (thread == NULL || thread == my_thread)
3,386✔
1436
         continue;
1,362✔
1437

1438
      PTHREAD_CHECK(pthread_kill, thread->handle, SIGRESUME);
2,024✔
1439
      signalled++;
2,024✔
1440
   }
1441

1442
   struct timespec ts;
923✔
1443
   if (clock_gettime(CLOCK_REALTIME, &ts) != 0)
923✔
UNCOV
1444
      fatal_errno("clock_gettime");
×
1445

1446
   ts.tv_sec += SUSPEND_TIMEOUT;
923✔
1447

1448
   for (; signalled > 0; signalled--) {
2,947✔
1449
      if (sem_timedwait(&stop_sem, &ts) != 0)
2,024✔
UNCOV
1450
         fatal_trace("timeout waiting for %d threads to resume", signalled);
×
1451
   }
1452

1453
   assert(sem_trywait(&stop_sem) == -1 && errno == EAGAIN);
923✔
1454
#endif
1455

1456
   nvc_unlock(&stop_lock);
923✔
1457
}
923✔
1458

1459
void thread_wx_mode(wx_mode_t mode)
14,909✔
1460
{
1461
#ifdef __APPLE__
1462
   pthread_jit_write_protect_np(mode == WX_EXECUTE);
1463
#else
1464
   // Could use Intel memory protection keys here
1465
#endif
1466
}
14,909✔
1467

1468
barrier_t *barrier_new(int count)
1✔
1469
{
1470
   barrier_t *b = xcalloc(sizeof(barrier_t));
1✔
1471
   b->count = count;
1✔
1472
   return b;
1✔
1473
}
1474

1475
void barrier_free(barrier_t *b)
1✔
1476
{
1477
   free(b);
1✔
1478
}
1✔
1479

1480
void barrier_wait(barrier_t *b)
88✔
1481
{
1482
   const int count = relaxed_load(&b->count);
88✔
1483
   const int passed = relaxed_load(&b->passed);
88✔
1484

1485
   if (atomic_fetch_add(&b->reached, 1) == count - 1) {
88✔
1486
      // Last thread to pass barrier
1487
      relaxed_store(&b->reached, 0);
22✔
1488
      store_release(&b->passed, passed + 1);
22✔
1489
   }
1490
   else {
1491
      while (load_acquire(&b->passed) == passed)
137,317✔
1492
         progressive_backoff();
137,251✔
1493
   }
1494
}
88✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc