• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nickg / nvc / 13741295555

08 Mar 2025 07:54PM UTC coverage: 92.319% (+0.08%) from 92.236%
13741295555

push

github

nickg
Pass mir_context_t around explicitly

33 of 38 new or added lines in 3 files covered. (86.84%)

542 existing lines in 5 files now uncovered.

68074 of 73738 relevant lines covered (92.32%)

433184.62 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.11
/src/thread.c
1
//
2
//  Copyright (C) 2021-2023  Nick Gasson
3
//
4
//  This program is free software: you can redistribute it and/or modify
5
//  it under the terms of the GNU General Public License as published by
6
//  the Free Software Foundation, either version 3 of the License, or
7
//  (at your option) any later version.
8
//
9
//  This program is distributed in the hope that it will be useful,
10
//  but WITHOUT ANY WARRANTY; without even the implied warranty of
11
//  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
//  GNU General Public License for more details.
13
//
14
//  You should have received a copy of the GNU General Public License
15
//  along with this program.  If not, see <http://www.gnu.org/licenses/>.
16
//
17

18
#include "util.h"
19
#include "array.h"
20
#include "cpustate.h"
21
#include "rt/mspace.h"
22
#include "thread.h"
23

24
#include <assert.h>
25
#include <errno.h>
26
#include <math.h>
27
#include <signal.h>
28
#include <stdlib.h>
29
#include <string.h>
30
#include <inttypes.h>
31
#include <unistd.h>
32
#include <semaphore.h>
33

34
#ifdef HAVE_PTHREAD
35
#include <pthread.h>
36
#elif !defined __MINGW32__
37
#error missing pthread support
38
#endif
39

40
#ifdef __SANITIZE_THREAD__
41
#include <sanitizer/tsan_interface.h>
42
#endif
43

44
#if defined __MINGW32__
45
#define WIN32_LEAN_AND_MEAN
46
#include <windows.h>
47
#elif defined __APPLE__
48
#define task_t __task_t
49
#include <mach/thread_act.h>
50
#include <mach/machine.h>
51
#undef task_t
52
#endif
53

54
#define LOCK_SPINS      64
55
#define YIELD_SPINS     32
56
#define MIN_TAKE        8
57
#define PARKING_BAYS    64
58
#define SUSPEND_TIMEOUT 1
59

60
#if !defined __MINGW32__ && !defined __APPLE__
61
#define POSIX_SUSPEND 1
62
#define SIGSUSPEND    SIGRTMIN
63
#define SIGRESUME     (SIGRTMIN + 1)
64
#endif
65

66
typedef struct {
67
   int64_t locks;
68
   int64_t spins;
69
   int64_t contended;
70
   int64_t parks;
71
   int64_t spurious;
72
   int64_t retries;
73
} __attribute__((aligned(64))) lock_stats_t;
74

75
STATIC_ASSERT(sizeof(lock_stats_t) == 64)
76

77
#ifdef DEBUG
78
#define LOCK_EVENT(what, n) do {                  \
79
      if (likely(my_thread != NULL))              \
80
         lock_stats[my_thread->id].what += (n);   \
81
   } while (0)
82

83
#define WORKQ_EVENT(what, n) do {                 \
84
      if (likely(my_thread != NULL))              \
85
         workq_stats[my_thread->id].what += (n);  \
86
   } while (0)
87
#else
88
#define LOCK_EVENT(what, n) (void)(n)
89
#define WORKQ_EVENT(what, n) (void)(n)
90
#endif
91

92
#ifdef __SANITIZE_THREAD__
93
#define TSAN_PRE_LOCK(addr) \
94
   __tsan_mutex_pre_lock(addr, __tsan_mutex_linker_init);
95
#define TSAN_POST_LOCK(addr) \
96
   __tsan_mutex_post_lock(addr, __tsan_mutex_linker_init, 0);
97
#define TSAN_PRE_UNLOCK(addr) \
98
   __tsan_mutex_pre_unlock(addr, __tsan_mutex_linker_init);
99
#define TSAN_POST_UNLOCK(addr) \
100
   __tsan_mutex_post_unlock(addr, __tsan_mutex_linker_init);
101
#else
102
#define TSAN_PRE_LOCK(addr)
103
#define TSAN_POST_LOCK(addr)
104
#define TSAN_PRE_UNLOCK(addr)
105
#define TSAN_POST_UNLOCK(addr)
106
#endif
107

108
#ifndef PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP
109
#define PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP PTHREAD_MUTEX_INITIALIZER
110
#endif
111

112
#define PTHREAD_CHECK(op, ...) do {             \
113
      if (unlikely(op(__VA_ARGS__) != 0))       \
114
         fatal_errno(#op);                      \
115
   } while (0)
116

117
// Lock implementation based on WTF::Lock in WebKit
118
//   https://webkit.org/blog/6161/locking-in-webkit/
119

120
typedef enum {
121
   IS_LOCKED  = (1 << 0),
122
   HAS_PARKED = (1 << 1)
123
} lock_bits_t;
124

125
typedef struct {
126
#ifdef __MINGW32__
127
   CRITICAL_SECTION   mutex;
128
   CONDITION_VARIABLE cond;
129
#else
130
   pthread_mutex_t    mutex;
131
   pthread_cond_t     cond;
132
#endif
133
   int                parked;
134
} __attribute__((aligned(64))) parking_bay_t;
135

136
typedef bool (*park_fn_t)(parking_bay_t *, void *);
137
typedef void (*unpark_fn_t)(parking_bay_t *, void *);
138

139
typedef struct {
140
   task_fn_t  fn;
141
   void      *context;
142
   void      *arg;
143
   workq_t   *workq;
144
} task_t;
145

146
// Work sealing task queue is based on:
147
//   Arora, N. S., Blumofe, R. D., and Plaxton, C. G.
148
//   Thread scheduling for multiprogrammed multiprocessors.
149
//   Theory of Computing Systems 34, 2 (2001), 115-144.
150

151
typedef uint32_t abp_idx_t;
152
typedef uint32_t abp_tag_t;
153

154
typedef union {
155
   struct {
156
      abp_idx_t top;
157
      abp_tag_t tag;
158
   };
159
   uint64_t bits;
160
} abp_age_t;
161

162
STATIC_ASSERT(sizeof(abp_age_t) <= 8);
163

164
#define THREADQ_SIZE 256
165

166
typedef struct {
167
   task_t    deque[THREADQ_SIZE];
168
   abp_age_t age;
169
   abp_idx_t bot;
170
} __attribute__((aligned(64))) threadq_t;
171

172
typedef enum { IDLE, START } workq_state_t;
173

174
typedef union {
175
   struct {
176
      uint32_t count;
177
      uint32_t epoch;
178
   };
179
   uint64_t bits;
180
} entryq_ptr_t;
181

182
STATIC_ASSERT(sizeof(entryq_ptr_t) <= 8);
183

184
typedef struct {
185
   task_t       *tasks;
186
   unsigned      queuesz;
187
   entryq_ptr_t  wptr;
188
   entryq_ptr_t  comp;
189
} __attribute__((aligned(64))) entryq_t;
190

191
STATIC_ASSERT(sizeof(entryq_t) == 64);
192

193
struct _workq {
194
   void          *context;
195
   workq_state_t  state;
196
   unsigned       epoch;
197
   unsigned       maxthread;
198
   bool           parallel;
199
   entryq_t       entryqs[MAX_THREADS];
200
};
201

202
typedef struct {
203
   int64_t comp;
204
   int64_t steals;
205
   int64_t wakes;
206
} __attribute__((aligned(64))) workq_stats_t;
207

208
STATIC_ASSERT(sizeof(workq_stats_t) == 64);
209

210
typedef enum {
211
   MAIN_THREAD,
212
   USER_THREAD,
213
   WORKER_THREAD,
214
} thread_kind_t;
215

216
struct _nvc_thread {
217
   unsigned        id;
218
   thread_kind_t   kind;
219
   unsigned        spins;
220
   threadq_t       queue;
221
   char           *name;
222
   thread_fn_t     fn;
223
   void           *arg;
224
   int             victim;
225
   uint32_t        rngstate;
226
#ifdef __MINGW32__
227
   HANDLE          handle;
228
   void           *retval;
229
#else
230
   pthread_t       handle;
231
#endif
232
#ifdef __APPLE__
233
   thread_port_t   port;
234
#endif
235
};
236

237
typedef struct {
238
   nvc_lock_t   lock;
239
   task_t      *tasks;
240
   unsigned     wptr;
241
   unsigned     rptr;
242
   unsigned     max;
243
} globalq_t;
244

245
typedef struct _barrier {
246
   unsigned count;
247
   unsigned reached;
248
   unsigned passed;
249
} __attribute__((aligned(64))) barrier_t;
250

251
static parking_bay_t parking_bays[PARKING_BAYS] = {
252
#ifndef __MINGW32__
253
   [0 ... PARKING_BAYS - 1] = {
254
      PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP,
255
      PTHREAD_COND_INITIALIZER
256
   }
257
#endif
258
};
259

260
static nvc_thread_t    *threads[MAX_THREADS];
261
static unsigned         max_workers = 0;
262
static int              running_threads = 0;
263
static unsigned         max_thread_id = 0;
264
static bool             should_stop = false;
265
static globalq_t        globalq __attribute__((aligned(64)));
266
static int              async_pending __attribute__((aligned(64))) = 0;
267
static nvc_lock_t       stop_lock = 0;
268
static stop_world_fn_t  stop_callback = NULL;
269
static void            *stop_arg = NULL;
270

271
#ifdef __MINGW32__
272
static CONDITION_VARIABLE wake_workers = CONDITION_VARIABLE_INIT;
273
static CRITICAL_SECTION   wakelock;
274
#else
275
static pthread_cond_t     wake_workers = PTHREAD_COND_INITIALIZER;
276
static pthread_mutex_t    wakelock = PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP;
277
#endif
278

279
#ifdef POSIX_SUSPEND
280
static sem_t stop_sem;
281
#endif
282

283
#ifdef DEBUG
284
static lock_stats_t  lock_stats[MAX_THREADS];
285
static workq_stats_t workq_stats[MAX_THREADS];
286
#endif
287

288
static __thread nvc_thread_t *my_thread = NULL;
289

290
static parking_bay_t *parking_bay_for(void *cookie);
291

292
#ifdef POSIX_SUSPEND
293
static void suspend_handler(int sig, siginfo_t *info, void *context);
294
#endif
295

296
#ifdef DEBUG
297
static void print_lock_stats(void)
×
298
{
299
   lock_stats_t s = {};
×
300
   workq_stats_t t = {};
×
301
   for (int i = 0; i < MAX_THREADS; i++) {
×
302
      s.locks     += relaxed_load(&(lock_stats[i].locks));
×
303
      s.contended += relaxed_load(&(lock_stats[i].contended));
×
304
      s.parks     += relaxed_load(&(lock_stats[i].parks));
×
305
      s.spins     += relaxed_load(&(lock_stats[i].spins));
×
306
      s.spurious  += relaxed_load(&(lock_stats[i].spurious));
×
307
      s.retries   += relaxed_load(&(lock_stats[i].retries));
×
308

309
      t.comp   += relaxed_load(&(workq_stats[i].comp));
×
310
      t.steals += relaxed_load(&(workq_stats[i].steals));
×
311
      t.wakes  += relaxed_load(&(workq_stats[i].wakes));
×
312
   }
313

314
   printf("\nLock statistics:\n");
×
315
   printf("\tTotal locks      : %"PRIi64"\n", s.locks);
×
316
   printf("\tContended        : %"PRIi64" (%.1f%%)\n",
×
317
          s.contended, 100.0 * ((double)s.contended / (double)s.locks));
×
318
   printf("\tParked           : %"PRIi64" (%.1f%%)\n",
×
319
          s.parks, 100.0 * ((double)s.parks / (double)s.locks));
×
320
   printf("\tAverage spins    : %.1f\n", (double)s.spins / (double)s.retries);
×
321
   printf("\tSpurious wakeups : %"PRIi64"\n", s.spurious);
×
322

323
   printf("\nWork queue statistics:\n");
×
324
   printf("\tCompleted tasks  : %"PRIi64"\n", t.comp);
×
325
   printf("\tSteals           : %"PRIi64"\n", t.steals);
×
326
   printf("\tWakeups          : %"PRIi64"\n", t.wakes);
×
327
}
×
328
#endif
329

330
#ifdef __MINGW32__
331
static inline void platform_mutex_lock(LPCRITICAL_SECTION lpcs)
332
{
333
   EnterCriticalSection(lpcs);
334
}
335

336
static inline void platform_mutex_unlock(LPCRITICAL_SECTION lpcs)
337
{
338
   LeaveCriticalSection(lpcs);
339
}
340

341
static inline void platform_cond_broadcast(PCONDITION_VARIABLE pcv)
342
{
343
   WakeAllConditionVariable(pcv);
344
}
345

346
static inline void platform_cond_wait(PCONDITION_VARIABLE pcv,
347
                                      LPCRITICAL_SECTION lpcs)
348
{
349
   SleepConditionVariableCS(pcv, lpcs, INFINITE);
350
}
351
#else
352
static inline void platform_mutex_lock(pthread_mutex_t *mtx)
8,641✔
353
{
354
   PTHREAD_CHECK(pthread_mutex_lock, mtx);
8,641✔
355
}
8,641✔
356

357
static inline void platform_mutex_unlock(pthread_mutex_t *mtx)
8,641✔
358
{
359
   PTHREAD_CHECK(pthread_mutex_unlock, mtx);
8,641✔
360
}
8,641✔
361

362
static inline void platform_cond_broadcast(pthread_cond_t *cond)
6,735✔
363
{
364
   PTHREAD_CHECK(pthread_cond_broadcast, cond);
6,735✔
365
}
6,735✔
366

367
static inline void platform_cond_wait(pthread_cond_t *cond,
2,032✔
368
                                      pthread_mutex_t *mtx)
369
{
370
   PTHREAD_CHECK(pthread_cond_wait, cond, mtx);
2,032✔
371
}
2,032✔
372
#endif
373

374
static void join_worker_threads(void)
5,152✔
375
{
376
   SCOPED_A(nvc_thread_t *) join_list = AINIT;
10,304✔
377

378
   for (int i = 1; i < MAX_THREADS; i++) {
329,728✔
379
      nvc_thread_t *t = atomic_load(&threads[i]);
324,576✔
380
      if (t != NULL)
324,576✔
381
         APUSH(join_list, t);
324,576✔
382
   }
383

384
   // Lock the wake mutex here to avoid races with workers sleeping
385
   platform_mutex_lock(&wakelock);
5,152✔
386
   {
387
      atomic_store(&should_stop, true);
5,152✔
388
      platform_cond_broadcast(&wake_workers);
5,152✔
389
   }
390
   platform_mutex_unlock(&wakelock);
5,152✔
391

392
   for (int i = 0; i < join_list.count; i++) {
5,212✔
393
      nvc_thread_t *t = join_list.items[i];
60✔
394

395
      switch (relaxed_load(&t->kind)) {
60✔
396
      case WORKER_THREAD:
60✔
397
         thread_join(t);
60✔
398
         continue;  // Freed thread struct
60✔
399
      case USER_THREAD:
×
400
      case MAIN_THREAD:
401
         fatal_trace("leaked a user thread: %s", t->name);
402
      }
403
   }
404

405
   assert(atomic_load(&running_threads) == 1);
5,152✔
406

407
#ifdef DEBUG
408
   for (int i = 0; i < PARKING_BAYS; i++)
334,880✔
409
      assert(parking_bays[i].parked == 0);
329,728✔
410
#endif
411
}
5,152✔
412

413
static nvc_thread_t *thread_new(thread_fn_t fn, void *arg,
5,219✔
414
                                thread_kind_t kind, char *name)
415
{
416
   nvc_thread_t *thread = xcalloc(sizeof(nvc_thread_t));
5,219✔
417
   thread->name     = name;
5,219✔
418
   thread->fn       = fn;
5,219✔
419
   thread->arg      = arg;
5,219✔
420
   thread->kind     = kind;
5,219✔
421
   thread->rngstate = rand();
5,219✔
422

423
   atomic_store(&thread->queue.age.bits, 0);
5,219✔
424
   atomic_store(&thread->queue.bot, 0);
5,219✔
425

426
   int id = 0;
5,219✔
427
   for (; id < MAX_THREADS; id++) {
5,349✔
428
      if (relaxed_load(&(threads[id])) != NULL)
5,349✔
429
         continue;
130✔
430
      else if (atomic_cas(&(threads[id]), NULL, thread))
5,219✔
431
         break;
432
   }
433

434
   if (id == MAX_THREADS)
5,219✔
435
      fatal_trace("cannot create more than %d threads", MAX_THREADS);
436

437
   thread->id = id;
5,219✔
438

439
   unsigned max = relaxed_load(&max_thread_id);
5,219✔
440
   while (max < id) {
5,219✔
441
      if (__atomic_cas(&max_thread_id, &max, id))
74✔
442
         break;
443
   }
444

445
   atomic_add(&running_threads, 1);
5,219✔
446
   return thread;
5,219✔
447
}
448

449
#ifdef POSIX_SUSPEND
450
static void unmask_fatal_signals(sigset_t *mask)
7,188✔
451
{
452
   sigdelset(mask, SIGQUIT);
7,188✔
453
   sigdelset(mask, SIGABRT);
7,188✔
454
   sigdelset(mask, SIGTERM);
7,188✔
455
}
7,188✔
456
#endif
457

458
#ifdef __APPLE__
459
static void reset_mach_ports(void)
460
{
461
   for (int i = 0; i < MAX_THREADS; i++) {
462
      nvc_thread_t *t = atomic_load(&(threads[i]));
463
      if (t == NULL)
464
         continue;
465

466
      // Mach ports are not valid after fork
467
      t->port = pthread_mach_thread_np(t->handle);
468
   }
469
}
470
#endif
471

472
void thread_init(void)
5,145✔
473
{
474
   assert(my_thread == NULL);
5,145✔
475

476
   my_thread = thread_new(NULL, NULL, MAIN_THREAD, xstrdup("main thread"));
5,145✔
477

478
#ifdef __MINGW32__
479
   my_thread->handle = GetCurrentThread();
480
#else
481
   my_thread->handle = pthread_self();
5,145✔
482
#endif
483

484
#ifdef __APPLE__
485
   my_thread->port = pthread_mach_thread_np(my_thread->handle);
486
   pthread_atfork(NULL, NULL, reset_mach_ports);
487
#endif
488

489
#ifdef __MINGW32__
490
   InitializeCriticalSectionAndSpinCount(&wakelock, LOCK_SPINS);
491
   InitializeConditionVariable(&wake_workers);
492

493
   for (int i = 0; i < PARKING_BAYS; i++) {
494
      parking_bay_t *bay = &(parking_bays[i]);
495
      InitializeCriticalSectionAndSpinCount(&(bay->mutex), LOCK_SPINS);
496
      InitializeConditionVariable(&(bay->cond));
497
   }
498
#endif
499

500
   assert(my_thread->id == 0);
5,145✔
501

502
   const char *max_env = getenv("NVC_MAX_THREADS");
5,145✔
503
   if (max_env != NULL)
5,145✔
504
      max_workers = MAX(1, MIN(atoi(max_env), MAX_THREADS));
×
505
   else
506
      max_workers = DEFAULT_THREADS;
5,145✔
507

508
   const int num_cpus = nvc_nprocs();
5,145✔
509
   max_workers = MIN(num_cpus, max_workers);
5,145✔
510

511
   const char *jobs_env = getenv("NVC_CONCURRENT_JOBS");
5,145✔
512
   if (jobs_env != NULL) {
5,145✔
513
      const int num_jobs = MAX(1, atoi(jobs_env));
×
514
      const int limit = (int)round((double)num_cpus / (double)num_jobs);
×
515
      max_workers = MAX(1, MIN(max_workers, limit));
×
516
   }
517

518
   assert(max_workers > 0);
5,145✔
519

520
#ifdef DEBUG
521
   if (getenv("NVC_THREAD_VERBOSE") != NULL)
5,145✔
522
      atexit(print_lock_stats);
×
523
#endif
524

525
   atexit(join_worker_threads);
5,145✔
526

527
#ifdef POSIX_SUSPEND
528
   sem_init(&stop_sem, 0, 0);
5,145✔
529

530
   struct sigaction sa = {
5,145✔
531
      .sa_sigaction = suspend_handler,
532
      .sa_flags = SA_RESTART | SA_SIGINFO
533
   };
534
   sigfillset(&sa.sa_mask);
5,145✔
535
   unmask_fatal_signals(&sa.sa_mask);
5,145✔
536

537
   sigaction(SIGSUSPEND, &sa, NULL);
5,145✔
538
   sigaction(SIGRESUME, &sa, NULL);
5,145✔
539

540
   sigset_t mask;
5,145✔
541
   PTHREAD_CHECK(pthread_sigmask, SIG_SETMASK, NULL, &mask);
5,145✔
542

543
   sigdelset(&mask, SIGSUSPEND);
5,145✔
544
   sigaddset(&mask, SIGRESUME);
5,145✔
545

546
   PTHREAD_CHECK(pthread_sigmask, SIG_SETMASK, &mask, NULL);
5,145✔
547
#endif
548
}
5,145✔
549

550
int thread_id(void)
52,523,633✔
551
{
552
   assert(my_thread != NULL);
52,523,633✔
553
   return my_thread->id;
52,523,633✔
554
}
555

556
bool thread_attached(void)
37✔
557
{
558
   return my_thread != NULL;
37✔
559
}
560

561
void thread_sleep(int usec)
×
562
{
563
   usleep(usec);
×
564
}
×
565

566
static void *thread_wrapper(void *arg)
74✔
567
{
568
   assert(my_thread == NULL);
74✔
569
   my_thread = arg;
74✔
570

571
   void *result = (*my_thread->fn)(my_thread->arg);
74✔
572

573
   // Avoid races with stop_world
574
   SCOPED_LOCK(stop_lock);
74✔
575

576
   assert(threads[my_thread->id] == my_thread);
74✔
577
   atomic_store(&(threads[my_thread->id]),  NULL);
74✔
578

579
   atomic_add(&running_threads, -1);
74✔
580
   return result;
74✔
581
}
582

583
#ifdef __MINGW32__
584
static DWORD win32_thread_wrapper(LPVOID param)
585
{
586
   void *ret = thread_wrapper(param);
587
   atomic_store(&(my_thread->retval), ret);
588
   return 0;
589
}
590
#endif
591

592
static void thread_start(nvc_thread_t *thread)
74✔
593
{
594
   assert_lock_held(&stop_lock);
74✔
595

596
#ifdef __MINGW32__
597
   if ((thread->handle = CreateThread(NULL, 0, win32_thread_wrapper,
598
                                      thread, 0, NULL)) == NULL)
599
      fatal_errno("CreateThread");
600
#else
601
   PTHREAD_CHECK(pthread_create, &(thread->handle), NULL,
74✔
602
                 thread_wrapper, thread);
603
#endif
604

605
#ifdef __APPLE__
606
   thread->port = pthread_mach_thread_np(thread->handle);
607
#endif
608
}
74✔
609

610
nvc_thread_t *thread_create(thread_fn_t fn, void *arg, const char *fmt, ...)
14✔
611
{
612
   va_list ap;
14✔
613
   va_start(ap, fmt);
14✔
614
   char *name = xvasprintf(fmt, ap);
14✔
615
   va_end(ap);
14✔
616

617
   // Avoid races with stop_world
618
   SCOPED_LOCK(stop_lock);
14✔
619

620
   nvc_thread_t *thread = thread_new(fn, arg, USER_THREAD, name);
14✔
621
   thread_start(thread);
14✔
622

623
   return thread;
14✔
624
}
625

626
void *thread_join(nvc_thread_t *thread)
74✔
627
{
628
   if (thread == my_thread || thread->kind == MAIN_THREAD)
74✔
629
      fatal_trace("cannot join self or main thread");
630

631
   void *retval = NULL;
74✔
632
#ifdef __MINGW32__
633
   if (WaitForSingleObject(thread->handle, INFINITE) == WAIT_FAILED)
634
      fatal_errno("WaitForSingleObject failed for thread %s", thread->name);
635

636
   retval = atomic_load(&(thread->retval));
637
#else
638
   PTHREAD_CHECK(pthread_join, thread->handle, &retval);
74✔
639
#endif
640

641
   async_free(thread->name);
74✔
642
   async_free(thread);
74✔
643

644
   return retval;
74✔
645
}
646

647
nvc_thread_t *get_thread(int id)
24,704✔
648
{
649
   assert(id >= 0 && id < MAX_THREADS);
24,704✔
650
   return atomic_load(&threads[id]);
24,704✔
651
}
652

653
static inline parking_bay_t *parking_bay_for(void *cookie)
3,405✔
654
{
655
   return &(parking_bays[mix_bits_64(cookie) % PARKING_BAYS]);
3,405✔
656
}
657

658
static void thread_park(void *cookie, park_fn_t fn)
1,960✔
659
{
660
   parking_bay_t *bay = parking_bay_for(cookie);
1,960✔
661

662
   platform_mutex_lock(&(bay->mutex));
1,960✔
663
   {
664
      if ((*fn)(bay, cookie)) {
1,960✔
665
         bay->parked++;
1,948✔
666
         platform_cond_wait(&(bay->cond), &(bay->mutex));
1,948✔
667
         assert(bay->parked > 0);
1,948✔
668
         bay->parked--;
1,948✔
669
      }
670
   }
671
   platform_mutex_unlock(&(bay->mutex));
1,960✔
672
}
1,960✔
673

674
static void thread_unpark(void *cookie, unpark_fn_t fn)
1,445✔
675
{
676
   parking_bay_t *bay = parking_bay_for(cookie);
1,445✔
677

678
   if (fn != NULL) {
1,445✔
679
      platform_mutex_lock(&(bay->mutex));
1,445✔
680
      {
681
         (*fn)(bay, cookie);
1,445✔
682
      }
683
      platform_mutex_unlock(&(bay->mutex));
1,445✔
684
   }
685

686
   // Do not use pthread_cond_signal here as multiple threads parked in
687
   // this bay may be waiting on different cookies
688
   platform_cond_broadcast(&(bay->cond));
1,445✔
689
}
1,445✔
690

691
void spin_wait(void)
56,226,421✔
692
{
693
#if defined ARCH_X86_64
694
   __asm__ volatile ("pause");
56,226,421✔
695
#elif defined ARCH_ARM64
696
   // YIELD is a no-op on most AArch64 cores so also do an ISB to stall
697
   // the pipeline for a bit
698
   __asm__ volatile ("yield; isb");
699
#endif
700
}
56,226,421✔
701

702
static bool lock_park_cb(parking_bay_t *bay, void *cookie)
1,960✔
703
{
704
   nvc_lock_t *lock = cookie;
1,960✔
705

706
   // This is called with the park mutex held: check the lock is still
707
   // owned by someone and the park bit is still set
708
   return relaxed_load(lock) == (IS_LOCKED | HAS_PARKED);
1,960✔
709
}
710

711
static void lock_unpark_cb(parking_bay_t *bay, void *cookie)
1,445✔
712
{
713
   nvc_lock_t *lock = cookie;
1,445✔
714

715
   assert(relaxed_load(lock) == (IS_LOCKED | HAS_PARKED));
1,445✔
716

717
   // Unlock must have release semantics
718
   atomic_store(lock, (bay->parked > 0 ? HAS_PARKED : 0));
1,445✔
719
}
1,445✔
720

721
void nvc_lock(nvc_lock_t *lock)
47,974,004✔
722
{
723
   LOCK_EVENT(locks, 1);
47,974,004✔
724
   TSAN_PRE_LOCK(lock);
47,974,004✔
725

726
   int8_t state = 0;
47,974,004✔
727
   if (likely(__atomic_cas(lock, &state, state | IS_LOCKED)))
47,974,004✔
728
      goto locked;  // Fast path: acquired the lock without contention
47,969,080✔
729

730
   LOCK_EVENT(contended, 1);
4,924✔
731

732
   for (;;) {
22,582✔
733
      LOCK_EVENT(retries, 1);
22,582✔
734

735
      // Spin a few times waiting for the owner to release the lock
736
      // before parking
737
      int spins = 0;
738
      for (; (state & IS_LOCKED) && spins < LOCK_SPINS;
198,627✔
739
           spins++, state = relaxed_load(lock))
176,045✔
740
         spin_wait();
176,045✔
741

742
      if (state & IS_LOCKED) {
22,582✔
743
         // Ignore failures here as we will check the lock state again
744
         // in the callback with the park mutex held
745
         atomic_cas(lock, IS_LOCKED, IS_LOCKED | HAS_PARKED);
1,960✔
746

747
         LOCK_EVENT(parks, 1);
1,960✔
748
         thread_park(lock, lock_park_cb);
1,960✔
749

750
         if ((state = relaxed_load(lock)) & IS_LOCKED) {
1,960✔
751
            // Someone else grabbed the lock before our thread was unparked
752
            LOCK_EVENT(spurious, 1);
1,545✔
753
            continue;
1,545✔
754
         }
755
      }
756
      else
757
         LOCK_EVENT(spins, spins);
20,622✔
758

759
      assert(!(state & IS_LOCKED));
21,037✔
760

761
      // If we get here then we've seen the lock in an unowned state:
762
      // attempt to grab it with a CAS
763
      if (__atomic_cas(lock, &state, state | IS_LOCKED))
21,037✔
764
         goto locked;
4,924✔
765
   }
766

767
 locked:
47,974,004✔
768
   TSAN_POST_LOCK(lock);
47,974,004✔
769
}
47,974,004✔
770

771
void nvc_unlock(nvc_lock_t *lock)
47,974,001✔
772
{
773
   TSAN_PRE_UNLOCK(lock);
47,974,001✔
774

775
   // Fast path: unlock assuming no parked waiters
776
   if (likely(atomic_cas(lock, IS_LOCKED, 0)))
47,974,001✔
777
      goto unlocked;
47,972,556✔
778

779
   // If we get here then we must own the lock with at least one parked
780
   // waiting thread
781
   assert(relaxed_load(lock) == (IS_LOCKED | HAS_PARKED));
1,445✔
782

783
   // Lock released in callback
784
   thread_unpark(lock, lock_unpark_cb);
1,445✔
785

786
 unlocked:
47,974,001✔
787
   TSAN_POST_UNLOCK(lock);
47,974,001✔
788
}
47,974,001✔
789

790
#ifdef DEBUG
791
void assert_lock_held(nvc_lock_t *lock)
177,170✔
792
{
793
   int8_t state = relaxed_load(lock);
177,170✔
794
   if (unlikely(!(state & IS_LOCKED)))
177,170✔
795
      fatal_trace("expected lock at %p to be held", lock);
796
}
177,170✔
797
#endif
798

799
void __scoped_unlock(nvc_lock_t **plock)
47,923,080✔
800
{
801
   nvc_unlock(*plock);
47,923,080✔
802
}
47,923,080✔
803

804
static void push_bot(threadq_t *tq, const task_t *tasks, size_t count)
87✔
805
{
806
   const abp_idx_t bot = relaxed_load(&tq->bot);
87✔
807
   assert(bot + count <= THREADQ_SIZE);
87✔
808

809
   memcpy(tq->deque + bot, tasks, count * sizeof(task_t));
87✔
810
   store_release(&tq->bot, bot + count);
87✔
811
}
87✔
812

813
static bool pop_bot(threadq_t *tq, task_t *task)
238✔
814
{
815
   const abp_idx_t old_bot = relaxed_load(&tq->bot);
238✔
816
   if (old_bot == 0)
238✔
817
      return false;
818

819
   const abp_idx_t new_bot = old_bot - 1;
190✔
820
   atomic_store(&tq->bot, new_bot);
190✔
821

822
   *task = tq->deque[new_bot];
190✔
823

824
   const abp_age_t old_age = { .bits = atomic_load(&tq->age.bits) };
190✔
825
   if (new_bot > old_age.top)
190✔
826
      return true;
827

828
   atomic_store(&tq->bot, 0);
87✔
829

830
   const abp_age_t new_age = { .top = 0, .tag = old_age.tag + 1 };
87✔
831
   if (new_bot == old_age.top) {
87✔
832
      if (atomic_cas(&tq->age.bits, old_age.bits, new_age.bits))
48✔
833
         return true;
834
   }
835

836
   atomic_store(&tq->age.bits, new_age.bits);
39✔
837
   return false;
39✔
838
}
839

840
__attribute__((no_sanitize("thread")))
841
static bool pop_top(threadq_t *tq, task_t *task)
80✔
842
{
843
   const abp_age_t old_age = { .bits = atomic_load(&tq->age.bits) };
80✔
844
   const abp_idx_t bot = atomic_load(&tq->bot);
80✔
845

846
   if (bot <= old_age.top)
80✔
847
      return false;
848

849
   // This triggers a TSAN false-positive: we will never read from *task
850
   // if the CAS fails below, so it's safe to ignore
851
   *task = tq->deque[old_age.top];
77✔
852

853
   const abp_age_t new_age = {
77✔
854
      .tag = old_age.tag,
855
      .top = old_age.top + 1
77✔
856
   };
857

858
   return atomic_cas(&tq->age.bits, old_age.bits, new_age.bits);
77✔
859
}
860

861
static void globalq_put(globalq_t *gq, const task_t *tasks, size_t count)
138✔
862
{
863
   assert_lock_held(&gq->lock);
138✔
864

865
   if (gq->wptr == gq->rptr)
138✔
866
      gq->wptr = gq->rptr = 0;
79✔
867

868
   if (gq->wptr + count > gq->max) {
138✔
869
      gq->max = next_power_of_2(gq->wptr + count);
43✔
870
      gq->tasks = xrealloc_array(gq->tasks, gq->max, sizeof(task_t));
43✔
871
   }
872

873
   memcpy(gq->tasks + gq->wptr, tasks, count * sizeof(task_t));
138✔
874
   gq->wptr += count;
138✔
875
}
138✔
876

877
__attribute__((no_sanitize("thread")))
878
static bool globalq_unlocked_empty(globalq_t *gq)
445✔
879
{
880
   return relaxed_load(&gq->wptr) == relaxed_load(&gq->rptr);
445✔
881
}
882

883
static size_t globalq_take(globalq_t *gq, threadq_t *tq)
445✔
884
{
885
   if (globalq_unlocked_empty(gq))
445✔
886
      return 0;
887

888
   const int nthreads = relaxed_load(&running_threads);
104✔
889

890
   SCOPED_LOCK(gq->lock);
208✔
891

892
   if (gq->wptr == gq->rptr)
104✔
893
      return 0;
894

895
   const int remain = gq->wptr - gq->rptr;
87✔
896
   const int share = gq->wptr / nthreads;
87✔
897
   const int take = MIN(remain, MAX(MIN_TAKE, MIN(THREADQ_SIZE, share)));
87✔
898
   const int from = gq->rptr;
87✔
899

900
   gq->rptr += take;
87✔
901

902
   push_bot(tq, gq->tasks + from, take);
87✔
903
   return take;
87✔
904
}
905

906
static void execute_task(task_t *task)
1,408✔
907
{
908
   (*task->fn)(task->context, task->arg);
1,408✔
909

910
   if (task->workq != NULL) {
1,408✔
911
      entryq_t *eq = &(task->workq->entryqs[my_thread->id]);
1,305✔
912

913
      const entryq_ptr_t cur = { .bits = relaxed_load(&eq->comp.bits) };
1,305✔
914
      const int epoch = atomic_load(&task->workq->epoch);
1,305✔
915
      const int count = cur.epoch == epoch ? cur.count : 0;
1,305✔
916
      const entryq_ptr_t next = { .count = count + 1, .epoch = epoch };
1,305✔
917
      store_release(&eq->comp.bits, next.bits);
1,305✔
918
   }
919
   else
920
      atomic_add(&async_pending, -1);
103✔
921
}
1,408✔
922

923
static bool globalq_poll(globalq_t *gq, threadq_t *tq)
445✔
924
{
925
   int ntasks;
445✔
926
   if ((ntasks = globalq_take(gq, tq))) {
445✔
927
      task_t task;
87✔
928
      int comp = 0;
87✔
929
      for (; pop_bot(tq, &task); comp++)
325✔
930
         execute_task(&task);
151✔
931

932
      WORKQ_EVENT(comp, comp);
87✔
933
      return true;
87✔
934
   }
935
   else
936
      return false;
937
}
938

939
workq_t *workq_new(void *context)
1,215✔
940
{
941
   if (my_thread->kind != MAIN_THREAD)
1,215✔
942
      fatal_trace("work queues can only be created by the main thread");
943

944
   workq_t *wq = xcalloc(sizeof(workq_t));
1,215✔
945
   wq->state    = IDLE;
1,215✔
946
   wq->context  = context;
1,215✔
947
   wq->parallel = max_workers > 1;
1,215✔
948
   wq->epoch    = 1;
1,215✔
949

950
   return wq;
1,215✔
951
}
952

953
void workq_not_thread_safe(workq_t *wq)
×
954
{
955
   wq->parallel = false;
×
956
}
×
957

958
void workq_free(workq_t *wq)
1,215✔
959
{
960
   if (my_thread->kind != MAIN_THREAD)
1,215✔
961
      fatal_trace("work queues can only be freed by the main thread");
962

963
   assert(wq->state == IDLE);
1,215✔
964

965
   for (int i = 0; i < MAX_THREADS; i++)
78,975✔
966
      free(wq->entryqs[i].tasks);
77,760✔
967

968
   free(wq);
1,215✔
969
}
1,215✔
970

971
void workq_do(workq_t *wq, task_fn_t fn, void *arg)
1,305✔
972
{
973
   assert(wq->state == IDLE);
1,305✔
974

975
   entryq_t *eq = &(wq->entryqs[my_thread->id]);
1,305✔
976

977
   const entryq_ptr_t cur = { .bits = relaxed_load(&eq->wptr.bits) };
1,305✔
978
   const int epoch = atomic_load(&wq->epoch);
1,305✔
979
   const int wptr = cur.epoch == epoch ? cur.count : 0;
1,305✔
980

981
   if (wptr == eq->queuesz) {
1,305✔
982
      eq->queuesz = MAX(eq->queuesz * 2, 64);
1,215✔
983
      eq->tasks = xrealloc_array(eq->tasks, eq->queuesz, sizeof(task_t));
1,215✔
984
   }
985

986
   eq->tasks[wptr] = (task_t){ fn, wq->context, arg, wq };
1,305✔
987

988
   const entryq_ptr_t next = { .count = wptr + 1, .epoch = epoch };
1,305✔
989
   store_release(&eq->wptr.bits, next.bits);
1,305✔
990

991
   unsigned maxthread = relaxed_load(&wq->maxthread);
1,305✔
992
   while (maxthread < my_thread->id) {
1,305✔
993
      if (__atomic_cas(&wq->maxthread, &maxthread, my_thread->id))
×
994
         break;
995
   }
996
}
1,305✔
997

998
void workq_scan(workq_t *wq, scan_fn_t fn, void *arg)
×
999
{
1000
   const int maxthread = relaxed_load(&wq->maxthread);
×
1001
   for (int i = 0; i <= maxthread; i++) {
×
1002
      entryq_t *eq = &(wq->entryqs[my_thread->id]);
×
1003
      const entryq_ptr_t wptr = { .bits = load_acquire(&eq->wptr.bits) };
×
1004
      for (int j = 0; j < wptr.count; j++)
×
1005
         (*fn)(wq->context, eq->tasks[j].arg, arg);
×
1006
   }
1007
}
×
1008

1009
static int estimate_depth(threadq_t *tq)
1,047✔
1010
{
1011
   const abp_age_t age = { .bits = relaxed_load(&tq->age.bits) };
1,047✔
1012
   const abp_idx_t bot = relaxed_load(&tq->bot);
1,047✔
1013

1014
   return bot <= age.top ? 0 : bot - age.top;
1,047✔
1015
}
1016

1017
static threadq_t *get_thread_queue(int id)
1,047✔
1018
{
1019
   assert(id < MAX_THREADS);
1,047✔
1020

1021
   nvc_thread_t *t = atomic_load(&(threads[id]));
1,047✔
1022
   if (t == NULL)
1,047✔
1023
      return NULL;
1024

1025
   return &(t->queue);
1,047✔
1026
}
1027

1028
static uint32_t fast_rand(void)
325✔
1029
{
1030
   uint32_t state = my_thread->rngstate;
325✔
1031
   state ^= (state << 13);
325✔
1032
   state ^= (state >> 17);
325✔
1033
   state ^= (state << 5);
325✔
1034
   return (my_thread->rngstate = state);
325✔
1035
}
1036

1037
static threadq_t *find_victim(void)
372✔
1038
{
1039
   threadq_t *last = get_thread_queue(my_thread->victim);
372✔
1040
   if (last != NULL && estimate_depth(last) > 0)
372✔
1041
      return last;
1042

1043
   const int maxthread = relaxed_load(&max_thread_id);
325✔
1044
   const int start = fast_rand() % (maxthread + 1);
325✔
1045
   int idx = start;
325✔
1046
   do {
994✔
1047
      if (idx != my_thread->id) {
994✔
1048
         threadq_t *q = get_thread_queue(idx);
675✔
1049
         if (q != NULL && estimate_depth(q) > 0) {
675✔
1050
            my_thread->victim = idx;
33✔
1051
            return q;
33✔
1052
         }
1053
      }
1054
   } while ((idx = (idx + 1) % (maxthread + 1)) != start);
961✔
1055

1056
   return NULL;
1057
}
1058

1059
static bool steal_task(void)
372✔
1060
{
1061
   threadq_t *tq = find_victim();
372✔
1062
   if (tq == NULL)
372✔
1063
      return false;
1064

1065
   task_t task;
80✔
1066
   if (pop_top(tq, &task)) {
80✔
1067
      WORKQ_EVENT(steals, 1);
77✔
1068
      execute_task(&task);
77✔
1069
      WORKQ_EVENT(comp, 1);
77✔
1070
      return true;
77✔
1071
   }
1072

1073
   return false;
1074
}
1075

1076
static void progressive_backoff(void)
57,796,445✔
1077
{
1078
   if (my_thread->spins++ < YIELD_SPINS)
57,796,445✔
1079
      spin_wait();
56,045,050✔
1080
   else {
1081
#ifdef __MINGW32__
1082
      SwitchToThread();
1083
#else
1084
      sched_yield();
1,751,395✔
1085
#endif
1086
      my_thread->spins = 0;
1,751,395✔
1087
   }
1088
}
57,796,445✔
1089

1090
static void *worker_thread(void *arg)
60✔
1091
{
1092
   mspace_stack_limit(MSPACE_CURRENT_FRAME);
60✔
1093

1094
   do {
384✔
1095
      if (globalq_poll(&globalq, &(my_thread->queue)) || steal_task())
384✔
1096
         my_thread->spins = 0;  // Did work
124✔
1097
      else if (my_thread->spins++ < 2)
260✔
1098
         spin_wait();
176✔
1099
      else {
1100
         platform_mutex_lock(&wakelock);
84✔
1101
         {
1102
            if (!relaxed_load(&should_stop))
84✔
1103
               platform_cond_wait(&wake_workers, &wakelock);
84✔
1104
         }
1105
         platform_mutex_unlock(&wakelock);
84✔
1106
      }
1107
   } while (likely(!relaxed_load(&should_stop)));
384✔
1108

1109
   return NULL;
60✔
1110
}
1111

1112
static void create_workers(int needed)
138✔
1113
{
1114
   assert(my_thread->kind == MAIN_THREAD);
138✔
1115

1116
   if (relaxed_load(&should_stop))
138✔
1117
      return;
1118

1119
   while (relaxed_load(&running_threads) < MIN(max_workers, needed)) {
198✔
1120
      static int counter = 0;
60✔
1121
      char *name = xasprintf("worker thread %d", atomic_add(&counter, 1));
60✔
1122
      SCOPED_LOCK(stop_lock);   // Avoid races with stop_world
120✔
1123
      nvc_thread_t *thread =
60✔
1124
         thread_new(worker_thread, NULL, WORKER_THREAD, name);
60✔
1125
      thread_start(thread);
60✔
1126
   }
1127

1128
   platform_cond_broadcast(&wake_workers);
138✔
1129
}
1130

1131
void workq_start(workq_t *wq)
1,215✔
1132
{
1133
   assert(my_thread->kind == MAIN_THREAD);
1,215✔
1134

1135
   const int epoch = relaxed_load(&wq->epoch);
1,215✔
1136
   const int maxthread = relaxed_load(&wq->maxthread);
1,215✔
1137

1138
   assert(wq->state == IDLE);
1,215✔
1139
   wq->state = START;
1,215✔
1140

1141
   int nserial = 0, nparallel = 0;
1,215✔
1142
   for (int i = 0; i <= maxthread; i++) {
2,430✔
1143
      entryq_t *eq = &wq->entryqs[i];
1,215✔
1144
      const entryq_ptr_t wptr = { .bits = load_acquire(&eq->wptr.bits) };
1,215✔
1145
      if (wptr.epoch == epoch) {
1,215✔
1146
         // Only bump epoch if there are tasks to run
1147
         if (nserial + nparallel == 0)
1,215✔
1148
            atomic_add(&wq->epoch, 1);
1,215✔
1149

1150
         assert(wptr.count > 0);
1,215✔
1151

1152
         if (i == maxthread && nserial + nparallel == 0 && wptr.count == 1) {
1,215✔
1153
            execute_task(&(eq->tasks[0]));   // Only one task in total
1,180✔
1154
            nserial++;
1,180✔
1155
         }
1156
         else if (wq->parallel) {
35✔
1157
            if (nparallel == 0)
35✔
1158
               nvc_lock(&globalq.lock);   // Lazily acquire lock
35✔
1159
            globalq_put(&globalq, eq->tasks, wptr.count);
35✔
1160
            nparallel += wptr.count;
35✔
1161
         }
1162
         else {
1163
            for (int j = 0; j < wptr.count; j++)
×
1164
               execute_task(&(eq->tasks[j]));
×
1165
            nserial += wptr.count;
×
1166
         }
1167
      }
1168
   }
1169

1170
   if (wq->parallel && nparallel > 0) {
1,215✔
1171
      nvc_unlock(&globalq.lock);
35✔
1172
      create_workers(nparallel);
35✔
1173
   }
1174
}
1,215✔
1175

1176
static int workq_outstanding(workq_t *wq)
57,782,182✔
1177
{
1178
   assert(wq->state == START);
57,782,182✔
1179

1180
   const int epoch = atomic_load(&wq->epoch);
57,782,182✔
1181
   const int maxthread = relaxed_load(&max_thread_id);
57,782,182✔
1182

1183
   int pending = 0;
57,782,182✔
1184
   for (int i = 0; i <= maxthread; i++) {
211,916,601✔
1185
      entryq_t *eq = &(wq->entryqs[i]);
154,134,419✔
1186

1187
      const entryq_ptr_t wptr = { .bits = load_acquire(&eq->wptr.bits) };
154,134,419✔
1188
      if (wptr.epoch == epoch - 1)
154,134,419✔
1189
         pending += wptr.count;
57,782,182✔
1190

1191
      const entryq_ptr_t comp = { .bits = load_acquire(&eq->comp.bits) };
154,134,419✔
1192
      if (comp.epoch == epoch)
154,134,419✔
1193
         pending -= comp.count;
98,709,701✔
1194
   }
1195

1196
   assert(pending >= 0);
57,782,182✔
1197
   return pending;
57,782,182✔
1198
}
1199

1200
static void workq_parallel_drain(workq_t *wq)
1,215✔
1201
{
1202
   if (workq_outstanding(wq) > 0) {
1,215✔
1203
      while (globalq_poll(&globalq, &(my_thread->queue)));
59✔
1204
      while (steal_task());
49✔
1205

1206
      while (workq_outstanding(wq) > 0)
57,780,967✔
1207
         progressive_backoff();
57,780,932✔
1208
   }
1209

1210
   wq->state = IDLE;
1,215✔
1211
}
1,215✔
1212

1213
void workq_drain(workq_t *wq)
1,215✔
1214
{
1215
   if (my_thread->kind != MAIN_THREAD)
1,215✔
1216
      fatal_trace("workq_drain can only be called from the main thread");
1217

1218
   if (wq->parallel)
1,215✔
1219
      workq_parallel_drain(wq);
1,215✔
1220
   else {
1221
      assert(wq->state == START);
×
1222
      wq->state = IDLE;
×
1223
   }
1224
}
1,215✔
1225

1226
void async_do(task_fn_t fn, void *context, void *arg)
103✔
1227
{
1228
   if (max_workers == 1)
103✔
1229
      (*fn)(context, arg);   // Single CPU
×
1230
   else {
1231
      const int npending = atomic_add(&async_pending, 1);
103✔
1232
      create_workers(npending + 1 /* Do not count main thread */);
103✔
1233

1234
      task_t tasks[1] = {
103✔
1235
         { fn, context, arg, NULL }
1236
      };
1237
      SCOPED_LOCK(globalq.lock);
206✔
1238
      globalq_put(&globalq, tasks, 1);
103✔
1239
   }
1240
}
103✔
1241

1242
void async_barrier(void)
8,872✔
1243
{
1244
   while (atomic_load(&async_pending) > 0) {
17,746✔
1245
      if (!globalq_poll(&globalq, &(my_thread->queue)))
2✔
UNCOV
1246
         progressive_backoff();
×
1247
   }
1248
}
8,872✔
1249

1250
void async_free(void *ptr)
7,464✔
1251
{
1252
   if (ptr == NULL)
7,464✔
1253
      return;
1254

1255
   // TODO: free when all threads in quiescent state
1256
}
1257

1258
#ifdef POSIX_SUSPEND
1259
static void suspend_handler(int sig, siginfo_t *info, void *context)
4,086✔
1260
{
1261
   if (info->si_pid != getpid())
4,086✔
1262
      return;   // Not sent by us, ignore it
2,043✔
1263
   else if (sig == SIGRESUME)
4,086✔
1264
      return;
1265

1266
   const int olderrno = errno;
2,043✔
1267

1268
   if (my_thread != NULL) {
2,043✔
1269
      struct cpu_state cpu;
2,043✔
1270
      fill_cpu_state(&cpu, (ucontext_t *)context);
2,043✔
1271

1272
      stop_world_fn_t callback = atomic_load(&stop_callback);
2,043✔
1273
      void *arg = atomic_load(&stop_arg);
2,043✔
1274

1275
      (*callback)(my_thread->id, &cpu, arg);
2,043✔
1276
   }
1277

1278
   sem_post(&stop_sem);
2,043✔
1279

1280
   sigset_t mask;
2,043✔
1281
   sigfillset(&mask);
2,043✔
1282
   unmask_fatal_signals(&mask);
2,043✔
1283
   sigdelset(&mask, SIGRESUME);
2,043✔
1284

1285
   sigsuspend(&mask);
2,043✔
1286

1287
   sem_post(&stop_sem);
2,043✔
1288

1289
   errno = olderrno;
2,043✔
1290
}
1291
#endif
1292

1293
void stop_world(stop_world_fn_t callback, void *arg)
886✔
1294
{
1295
   nvc_lock(&stop_lock);
886✔
1296

1297
   atomic_store(&stop_callback, callback);
886✔
1298
   atomic_store(&stop_arg, arg);
886✔
1299

1300
#ifdef __MINGW32__
1301
   const int maxthread = relaxed_load(&max_thread_id);
1302
   for (int i = 0; i <= maxthread; i++) {
1303
      nvc_thread_t *thread = atomic_load(&threads[i]);
1304
      if (thread == NULL || thread == my_thread)
1305
         continue;
1306

1307
      if (SuspendThread(thread->handle) != 0)
1308
         fatal_errno("SuspendThread");
1309

1310
      CONTEXT context;
1311
      context.ContextFlags = CONTEXT_INTEGER | CONTEXT_CONTROL;
1312
      if (!GetThreadContext(thread->handle, &context))
1313
         fatal_errno("GetThreadContext");
1314

1315
      struct cpu_state cpu;
1316
      fill_cpu_state(&cpu, &context);
1317

1318
      (*callback)(thread->id, &cpu, arg);
1319
   }
1320
#elif defined __APPLE__
1321
   const int maxthread = relaxed_load(&max_thread_id);
1322
   for (int i = 0; i <= maxthread; i++) {
1323
      nvc_thread_t *thread = atomic_load(&threads[i]);
1324
      if (thread == NULL || thread == my_thread)
1325
         continue;
1326

1327
      assert(thread->port != MACH_PORT_NULL);
1328

1329
      kern_return_t kern_result;
1330
      do {
1331
         kern_result = thread_suspend(thread->port);
1332
      } while (kern_result == KERN_ABORTED);
1333

1334
      if (kern_result != KERN_SUCCESS)
1335
         fatal_trace("failed to suspend thread %d (%d)", i, kern_result);
1336

1337
#ifdef ARCH_ARM64
1338
      arm_thread_state64_t state;
1339
      mach_msg_type_number_t count = ARM_THREAD_STATE64_COUNT;
1340
      thread_state_flavor_t flavor = ARM_THREAD_STATE64;
1341
#else
1342
      x86_thread_state64_t state;
1343
      mach_msg_type_number_t count = x86_THREAD_STATE64_COUNT;
1344
      thread_state_flavor_t flavor = x86_THREAD_STATE64;
1345
#endif
1346
      kern_result = thread_get_state(thread->port, flavor,
1347
                                     (natural_t *)&state, &count);
1348
      if (kern_result != KERN_SUCCESS)
1349
         fatal_trace("failed to get thread %d state (%d)", i, kern_result);
1350

1351
      // Fake a ucontext_t that we can pass to fill_cpu_state
1352
      ucontext_t uc;
1353
      typeof(*uc.uc_mcontext) mc;
1354
      uc.uc_mcontext = &mc;
1355
      mc.__ss = state;
1356

1357
      struct cpu_state cpu;
1358
      fill_cpu_state(&cpu, &uc);
1359

1360
      (*callback)(thread->id, &cpu, arg);
1361
   }
1362
#elif defined __SANITIZE_THREAD__
1363
   // https://github.com/google/sanitizers/issues/1179
1364
   fatal_trace("stop_world is not supported with tsan");
1365
#else
1366
   assert(sem_trywait(&stop_sem) == -1 && errno == EAGAIN);
886✔
1367

1368
   int signalled = 0;
886✔
1369
   const int maxthread = relaxed_load(&max_thread_id);
886✔
1370
   for (int i = 0; i <= maxthread; i++) {
4,191✔
1371
      nvc_thread_t *thread = atomic_load(&threads[i]);
3,305✔
1372
      if (thread == NULL || thread == my_thread)
3,305✔
1373
         continue;
1,262✔
1374

1375
      PTHREAD_CHECK(pthread_kill, thread->handle, SIGSUSPEND);
2,043✔
1376
      signalled++;
2,043✔
1377
   }
1378

1379
   struct timespec ts;
886✔
1380
   if (clock_gettime(CLOCK_REALTIME, &ts) != 0)
886✔
1381
      fatal_errno("clock_gettime");
×
1382

1383
   ts.tv_sec += SUSPEND_TIMEOUT;
886✔
1384

1385
   for (; signalled > 0; signalled--) {
2,929✔
1386
      if (sem_timedwait(&stop_sem, &ts) != 0)
2,043✔
1387
         fatal_trace("timeout waiting for %d threads to suspend", signalled);
1388
   }
1389

1390
   assert(sem_trywait(&stop_sem) == -1 && errno == EAGAIN);
886✔
1391
#endif
1392

1393
   struct cpu_state cpu;
886✔
1394
   capture_registers(&cpu);
886✔
1395

1396
   (*callback)(my_thread->id, &cpu, arg);
886✔
1397
}
886✔
1398

1399
void start_world(void)
886✔
1400
{
1401
   assert_lock_held(&stop_lock);
886✔
1402

1403
   const int maxthread = relaxed_load(&max_thread_id);
886✔
1404

1405
#ifdef __MINGW32__
1406
   for (int i = 0; i <= maxthread; i++) {
1407
      nvc_thread_t *thread = atomic_load(&threads[i]);
1408
      if (thread == NULL || thread == my_thread)
1409
         continue;
1410

1411
      if (ResumeThread(thread->handle) != 1)
1412
         fatal_errno("ResumeThread");
1413
   }
1414
#elif defined __APPLE__
1415
   for (int i = 0; i <= maxthread; i++) {
1416
      nvc_thread_t *thread = atomic_load(&threads[i]);
1417
      if (thread == NULL || thread == my_thread)
1418
         continue;
1419

1420
      kern_return_t kern_result;
1421
      do {
1422
         kern_result = thread_resume(thread->port);
1423
      } while (kern_result == KERN_ABORTED);
1424

1425
      if (kern_result != KERN_SUCCESS)
1426
         fatal_trace("failed to resume thread %d (%d)", i, kern_result);
1427
   }
1428
#else
1429
   assert(sem_trywait(&stop_sem) == -1 && errno == EAGAIN);
886✔
1430

1431
   int signalled = 0;
1432
   for (int i = 0; i <= maxthread; i++) {
4,191✔
1433
      nvc_thread_t *thread = atomic_load(&threads[i]);
3,305✔
1434
      if (thread == NULL || thread == my_thread)
3,305✔
1435
         continue;
1,262✔
1436

1437
      PTHREAD_CHECK(pthread_kill, thread->handle, SIGRESUME);
2,043✔
1438
      signalled++;
2,043✔
1439
   }
1440

1441
   struct timespec ts;
886✔
1442
   if (clock_gettime(CLOCK_REALTIME, &ts) != 0)
886✔
1443
      fatal_errno("clock_gettime");
×
1444

1445
   ts.tv_sec += SUSPEND_TIMEOUT;
886✔
1446

1447
   for (; signalled > 0; signalled--) {
2,929✔
1448
      if (sem_timedwait(&stop_sem, &ts) != 0)
2,043✔
1449
         fatal_trace("timeout waiting for %d threads to resume", signalled);
1450
   }
1451

1452
   assert(sem_trywait(&stop_sem) == -1 && errno == EAGAIN);
886✔
1453
#endif
1454

1455
   nvc_unlock(&stop_lock);
886✔
1456
}
886✔
1457

1458
void thread_wx_mode(wx_mode_t mode)
16,157✔
1459
{
1460
#ifdef __APPLE__
1461
   pthread_jit_write_protect_np(mode == WX_EXECUTE);
1462
#else
1463
   // Could use Intel memory protection keys here
1464
#endif
1465
}
16,157✔
1466

1467
barrier_t *barrier_new(int count)
1✔
1468
{
1469
   barrier_t *b = xcalloc(sizeof(barrier_t));
1✔
1470
   b->count = count;
1✔
1471
   return b;
1✔
1472
}
1473

1474
void barrier_free(barrier_t *b)
1✔
1475
{
1476
   free(b);
1✔
1477
}
1✔
1478

1479
void barrier_wait(barrier_t *b)
88✔
1480
{
1481
   const int count = relaxed_load(&b->count);
88✔
1482
   const int passed = relaxed_load(&b->passed);
88✔
1483

1484
   if (atomic_fetch_add(&b->reached, 1) == count - 1) {
88✔
1485
      // Last thread to pass barrier
1486
      relaxed_store(&b->reached, 0);
22✔
1487
      store_release(&b->passed, passed + 1);
22✔
1488
   }
1489
   else {
1490
      while (load_acquire(&b->passed) == passed)
15,579✔
1491
         progressive_backoff();
15,513✔
1492
   }
1493
}
88✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc