• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nickg / nvc / 6710403832

31 Oct 2023 05:20PM UTC coverage: 91.274% (+0.01%) from 91.262%
6710403832

push

github

nickg
Add NEVER_WAITS attribute to optimise code generation for procedures

89 of 89 new or added lines in 4 files covered. (100.0%)

49799 of 54560 relevant lines covered (91.27%)

597917.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.61
/src/thread.c
1
//
2
//  Copyright (C) 2021-2023  Nick Gasson
3
//
4
//  This program is free software: you can redistribute it and/or modify
5
//  it under the terms of the GNU General Public License as published by
6
//  the Free Software Foundation, either version 3 of the License, or
7
//  (at your option) any later version.
8
//
9
//  This program is distributed in the hope that it will be useful,
10
//  but WITHOUT ANY WARRANTY; without even the implied warranty of
11
//  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
//  GNU General Public License for more details.
13
//
14
//  You should have received a copy of the GNU General Public License
15
//  along with this program.  If not, see <http://www.gnu.org/licenses/>.
16
//
17

18
#include "util.h"
19
#include "array.h"
20
#include "cpustate.h"
21
#include "rt/mspace.h"
22
#include "thread.h"
23

24
#include <assert.h>
25
#include <errno.h>
26
#include <signal.h>
27
#include <stdlib.h>
28
#include <string.h>
29
#include <inttypes.h>
30
#include <unistd.h>
31
#include <semaphore.h>
32

33
#ifdef HAVE_PTHREAD
34
#include <pthread.h>
35
#else
36
#error missing pthread support
37
#endif
38

39
#ifdef __SANITIZE_THREAD__
40
#include <sanitizer/tsan_interface.h>
41
#endif
42

43
#if defined __MINGW32__
44
#define WIN32_LEAN_AND_MEAN
45
#include <windows.h>
46
#elif defined __APPLE__
47
#define task_t __task_t
48
#include <mach/thread_act.h>
49
#include <mach/machine.h>
50
#undef task_t
51
#endif
52

53
#define LOCK_SPINS      15
54
#define YIELD_SPINS     32
55
#define MIN_TAKE        8
56
#define PARKING_BAYS    64
57
#define SUSPEND_TIMEOUT 1
58

59
#if !defined __MINGW32__ && !defined __APPLE__
60
#define POSIX_SUSPEND 1
61
#define SIGSUSPEND    SIGRTMIN
62
#define SIGRESUME     (SIGRTMIN + 1)
63
#endif
64

65
typedef struct {
66
   int64_t locks;
67
   int64_t spins;
68
   int64_t contended;
69
   int64_t parks;
70
   int64_t spurious;
71
   int64_t retries;
72
} __attribute__((aligned(64))) lock_stats_t;
73

74
STATIC_ASSERT(sizeof(lock_stats_t) == 64)
75

76
#ifdef DEBUG
77
#define LOCK_EVENT(what, n) do {                  \
78
      if (likely(my_thread != NULL))              \
79
         lock_stats[my_thread->id].what += (n);   \
80
   } while (0)
81

82
#define WORKQ_EVENT(what, n) do {                 \
83
      if (likely(my_thread != NULL))              \
84
         workq_stats[my_thread->id].what += (n);  \
85
   } while (0)
86
#else
87
#define LOCK_EVENT(what, n)
88
#define WORKQ_EVENT(what, n)
89
#endif
90

91
#ifdef __SANITIZE_THREAD__
92
#define TSAN_PRE_LOCK(addr) \
93
   __tsan_mutex_pre_lock(addr, __tsan_mutex_linker_init);
94
#define TSAN_POST_LOCK(addr) \
95
   __tsan_mutex_post_lock(addr, __tsan_mutex_linker_init, 0);
96
#define TSAN_PRE_UNLOCK(addr) \
97
   __tsan_mutex_pre_unlock(addr, __tsan_mutex_linker_init);
98
#define TSAN_POST_UNLOCK(addr) \
99
   __tsan_mutex_post_unlock(addr, __tsan_mutex_linker_init);
100
#else
101
#define TSAN_PRE_LOCK(addr)
102
#define TSAN_POST_LOCK(addr)
103
#define TSAN_PRE_UNLOCK(addr)
104
#define TSAN_POST_UNLOCK(addr)
105
#endif
106

107
#define PTHREAD_CHECK(op, ...) do {             \
108
      if (unlikely(op(__VA_ARGS__) != 0))       \
109
         fatal_errno(#op);                      \
110
   } while (0)
111

112
// Lock implementation based on WTF::Lock in WebKit
113
//   https://webkit.org/blog/6161/locking-in-webkit/
114

115
typedef enum {
116
   IS_LOCKED  = (1 << 0),
117
   HAS_PARKED = (1 << 1)
118
} lock_bits_t;
119

120
typedef struct {
121
   pthread_mutex_t mutex;
122
   pthread_cond_t  cond;
123
   int             parked;
124
} __attribute__((aligned(64))) parking_bay_t;
125

126
typedef bool (*park_fn_t)(parking_bay_t *, void *);
127
typedef void (*unpark_fn_t)(parking_bay_t *, void *);
128

129
typedef struct {
130
   task_fn_t  fn;
131
   void      *context;
132
   void      *arg;
133
   workq_t   *workq;
134
} task_t;
135

136
// Work sealing task queue is based on:
137
//   Arora, N. S., Blumofe, R. D., and Plaxton, C. G.
138
//   Thread scheduling for multiprogrammed multiprocessors.
139
//   Theory of Computing Systems 34, 2 (2001), 115-144.
140

141
typedef uint32_t abp_idx_t;
142
typedef uint32_t abp_tag_t;
143

144
typedef union {
145
   struct {
146
      abp_idx_t top;
147
      abp_tag_t tag;
148
   };
149
   uint64_t bits;
150
} abp_age_t;
151

152
STATIC_ASSERT(sizeof(abp_age_t) <= 8);
153

154
#define THREADQ_SIZE 256
155

156
typedef struct {
157
   task_t    deque[THREADQ_SIZE];
158
   abp_age_t age;
159
   abp_idx_t bot;
160
} __attribute__((aligned(64))) threadq_t;
161

162
typedef enum { IDLE, START } workq_state_t;
163

164
typedef union {
165
   struct {
166
      uint32_t count;
167
      uint32_t epoch;
168
   };
169
   uint64_t bits;
170
} entryq_ptr_t;
171

172
STATIC_ASSERT(sizeof(entryq_ptr_t) <= 8);
173

174
typedef struct {
175
   task_t       *tasks;
176
   unsigned      queuesz;
177
   entryq_ptr_t  wptr;
178
   entryq_ptr_t  comp;
179
} __attribute__((aligned(64))) entryq_t;
180

181
STATIC_ASSERT(sizeof(entryq_t) == 64);
182

183
struct _workq {
184
   void          *context;
185
   workq_state_t  state;
186
   unsigned       epoch;
187
   unsigned       maxthread;
188
   bool           parallel;
189
   entryq_t       entryqs[MAX_THREADS];
190
};
191

192
typedef struct {
193
   int64_t comp;
194
   int64_t steals;
195
   int64_t wakes;
196
} __attribute__((aligned(64))) workq_stats_t;
197

198
STATIC_ASSERT(sizeof(workq_stats_t) == 64);
199

200
typedef enum {
201
   MAIN_THREAD,
202
   USER_THREAD,
203
   WORKER_THREAD,
204
} thread_kind_t;
205

206
struct _nvc_thread {
207
   unsigned        id;
208
   thread_kind_t   kind;
209
   unsigned        spins;
210
   threadq_t       queue;
211
   char           *name;
212
   pthread_t       handle;
213
   thread_fn_t     fn;
214
   void           *arg;
215
   int             victim;
216
   uint32_t        rngstate;
217
#ifdef __APPLE__
218
   thread_port_t   port;
219
#endif
220
};
221

222
typedef struct {
223
   nvc_lock_t   lock;
224
   task_t      *tasks;
225
   unsigned     wptr;
226
   unsigned     rptr;
227
   unsigned     max;
228
} globalq_t;
229

230
typedef struct _barrier {
231
   unsigned count;
232
   unsigned reached;
233
   unsigned passed;
234
} __attribute__((aligned(64))) barrier_t;
235

236
static parking_bay_t parking_bays[PARKING_BAYS] = {
237
   [0 ... PARKING_BAYS - 1] = {
238
      PTHREAD_MUTEX_INITIALIZER,
239
      PTHREAD_COND_INITIALIZER
240
   }
241
};
242

243
static nvc_thread_t    *threads[MAX_THREADS];
244
static unsigned         max_workers = 0;
245
static int              running_threads = 0;
246
static unsigned         max_thread_id = 0;
247
static bool             should_stop = false;
248
static globalq_t        globalq __attribute__((aligned(64)));
249
static int              async_pending __attribute__((aligned(64))) = 0;
250
static nvc_lock_t       stop_lock = 0;
251
static stop_world_fn_t  stop_callback = NULL;
252
static void            *stop_arg = NULL;
253
static pthread_cond_t   wake_workers = PTHREAD_COND_INITIALIZER;
254
static pthread_mutex_t  wakelock = PTHREAD_MUTEX_INITIALIZER;
255
#ifdef POSIX_SUSPEND
256
static sem_t            stop_sem;
257
#endif
258

259
#ifdef DEBUG
260
static lock_stats_t  lock_stats[MAX_THREADS];
261
static workq_stats_t workq_stats[MAX_THREADS];
262
#endif
263

264
static __thread nvc_thread_t *my_thread = NULL;
265

266
static parking_bay_t *parking_bay_for(void *cookie);
267

268
#ifdef POSIX_SUSPEND
269
static void suspend_handler(int sig, siginfo_t *info, void *context);
270
#endif
271

272
#ifdef DEBUG
273
static void print_lock_stats(void)
×
274
{
275
   lock_stats_t s = {};
×
276
   workq_stats_t t = {};
×
277
   for (int i = 0; i < MAX_THREADS; i++) {
×
278
      s.locks     += relaxed_load(&(lock_stats[i].locks));
×
279
      s.contended += relaxed_load(&(lock_stats[i].contended));
×
280
      s.parks     += relaxed_load(&(lock_stats[i].parks));
×
281
      s.spins     += relaxed_load(&(lock_stats[i].spins));
×
282
      s.spurious  += relaxed_load(&(lock_stats[i].spurious));
×
283
      s.retries   += relaxed_load(&(lock_stats[i].retries));
×
284

285
      t.comp   += relaxed_load(&(workq_stats[i].comp));
×
286
      t.steals += relaxed_load(&(workq_stats[i].steals));
×
287
      t.wakes  += relaxed_load(&(workq_stats[i].wakes));
×
288
   }
289

290
   printf("\nLock statistics:\n");
×
291
   printf("\tTotal locks      : %"PRIi64"\n", s.locks);
×
292
   printf("\tContended        : %"PRIi64" (%.1f%%)\n",
×
293
          s.contended, 100.0 * ((double)s.contended / (double)s.locks));
×
294
   printf("\tParked           : %"PRIi64" (%.1f%%)\n",
×
295
          s.parks, 100.0 * ((double)s.parks / (double)s.locks));
×
296
   printf("\tAverage spins    : %.1f\n", (double)s.spins / (double)s.retries);
×
297
   printf("\tSpurious wakeups : %"PRIi64"\n", s.spurious);
×
298

299
   printf("\nWork queue statistics:\n");
×
300
   printf("\tCompleted tasks  : %"PRIi64"\n", t.comp);
×
301
   printf("\tSteals           : %"PRIi64"\n", t.steals);
×
302
   printf("\tWakeups          : %"PRIi64"\n", t.wakes);
×
303
}
×
304
#endif
305

306
static void join_worker_threads(void)
4,129✔
307
{
308
   SCOPED_A(nvc_thread_t *) join_list = AINIT;
8,258✔
309

310
   for (int i = 1; i < MAX_THREADS; i++) {
264,256✔
311
      nvc_thread_t *t = atomic_load(&threads[i]);
260,127✔
312
      if (t != NULL)
260,127✔
313
         APUSH(join_list, t);
260,127✔
314
   }
315

316
   // Lock the wake mutex here to avoid races with workers sleeping
317
   PTHREAD_CHECK(pthread_mutex_lock, &wakelock);
4,129✔
318
   {
319
      atomic_store(&should_stop, true);
4,129✔
320
      PTHREAD_CHECK(pthread_cond_broadcast, &wake_workers);
4,129✔
321
   }
322
   PTHREAD_CHECK(pthread_mutex_unlock, &wakelock);
4,129✔
323

324
   for (int i = 0; i < join_list.count; i++) {
4,158✔
325
      nvc_thread_t *t = join_list.items[i];
29✔
326

327
      switch (relaxed_load(&t->kind)) {
29✔
328
      case WORKER_THREAD:
29✔
329
         thread_join(t);
29✔
330
         continue;  // Freed thread struct
29✔
331
      case USER_THREAD:
×
332
      case MAIN_THREAD:
333
         fatal_trace("leaked a user thread: %s", t->name);
×
334
      }
335
   }
336

337
   assert(atomic_load(&running_threads) == 1);
4,129✔
338
}
4,129✔
339

340
static nvc_thread_t *thread_new(thread_fn_t fn, void *arg,
4,172✔
341
                                thread_kind_t kind, char *name)
342
{
343
   nvc_thread_t *thread = xcalloc(sizeof(nvc_thread_t));
4,172✔
344
   thread->name     = name;
4,172✔
345
   thread->fn       = fn;
4,172✔
346
   thread->arg      = arg;
4,172✔
347
   thread->kind     = kind;
4,172✔
348
   thread->rngstate = rand();
4,172✔
349

350
   atomic_store(&thread->queue.age.bits, 0);
4,172✔
351
   atomic_store(&thread->queue.bot, 0);
4,172✔
352

353
   int id = 0;
4,172✔
354
   for (; id < MAX_THREADS; id++) {
4,235✔
355
      if (relaxed_load(&(threads[id])) != NULL)
4,235✔
356
         continue;
63✔
357
      else if (atomic_cas(&(threads[id]), NULL, thread))
4,172✔
358
         break;
359
   }
360

361
   if (id == MAX_THREADS)
4,172✔
362
      fatal_trace("cannot create more than %d threads", MAX_THREADS);
×
363

364
   thread->id = id;
4,172✔
365

366
   unsigned max = relaxed_load(&max_thread_id);
4,172✔
367
   while (max < id) {
4,172✔
368
      if (__atomic_cas(&max_thread_id, &max, id))
41✔
369
         break;
370
   }
371

372
   atomic_add(&running_threads, 1);
4,172✔
373
   return thread;
4,172✔
374
}
375

376
#ifdef POSIX_SUSPEND
377
static void unmask_fatal_signals(sigset_t *mask)
5,788✔
378
{
379
   sigdelset(mask, SIGQUIT);
5,788✔
380
   sigdelset(mask, SIGABRT);
5,788✔
381
   sigdelset(mask, SIGTERM);
5,788✔
382
}
5,788✔
383
#endif
384

385
#ifdef __APPLE__
386
static void reset_mach_ports(void)
387
{
388
   for (int i = 0; i < MAX_THREADS; i++) {
389
      nvc_thread_t *t = atomic_load(&(threads[i]));
390
      if (t == NULL)
391
         continue;
392

393
      // Mach ports are not valid after fork
394
      t->port = pthread_mach_thread_np(t->handle);
395
   }
396
}
397
#endif
398

399
void thread_init(void)
4,129✔
400
{
401
   assert(my_thread == NULL);
4,129✔
402

403
   my_thread = thread_new(NULL, NULL, MAIN_THREAD, xstrdup("main thread"));
4,129✔
404
   my_thread->handle = pthread_self();
4,129✔
405

406
#ifdef __APPLE__
407
   my_thread->port = pthread_mach_thread_np(my_thread->handle);
408
   pthread_atfork(NULL, NULL, reset_mach_ports);
409
#endif
410

411
   assert(my_thread->id == 0);
4,129✔
412

413
   const char *env = getenv("NVC_MAX_THREADS");
4,129✔
414
   if (env != NULL)
4,129✔
415
      max_workers = MAX(1, MIN(atoi(env), MAX_THREADS));
×
416
   else
417
      max_workers = MIN(nvc_nprocs(), DEFAULT_THREADS);
4,129✔
418
   assert(max_workers > 0);
4,129✔
419

420
#ifdef DEBUG
421
   if (getenv("NVC_THREAD_VERBOSE") != NULL)
4,129✔
422
      atexit(print_lock_stats);
×
423
#endif
424

425
   atexit(join_worker_threads);
4,129✔
426

427
#ifdef POSIX_SUSPEND
428
   sem_init(&stop_sem, 0, 0);
4,129✔
429

430
   struct sigaction sa = {
4,129✔
431
      .sa_sigaction = suspend_handler,
432
      .sa_flags = SA_RESTART | SA_SIGINFO
433
   };
434
   sigfillset(&sa.sa_mask);
4,129✔
435
   unmask_fatal_signals(&sa.sa_mask);
4,129✔
436

437
   sigaction(SIGSUSPEND, &sa, NULL);
4,129✔
438
   sigaction(SIGRESUME, &sa, NULL);
4,129✔
439

440
   sigset_t mask;
4,129✔
441
   PTHREAD_CHECK(pthread_sigmask, SIG_SETMASK, NULL, &mask);
4,129✔
442

443
   sigdelset(&mask, SIGSUSPEND);
4,129✔
444
   sigaddset(&mask, SIGRESUME);
4,129✔
445

446
   PTHREAD_CHECK(pthread_sigmask, SIG_SETMASK, &mask, NULL);
4,129✔
447
#endif
448
}
4,129✔
449

450
int thread_id(void)
74,576,800✔
451
{
452
   assert(my_thread != NULL);
74,576,800✔
453
   return my_thread->id;
74,576,800✔
454
}
455

456
bool thread_attached(void)
45✔
457
{
458
   return my_thread != NULL;
45✔
459
}
460

461
void thread_sleep(int usec)
×
462
{
463
   usleep(usec);
×
464
}
×
465

466
static void *thread_wrapper(void *arg)
43✔
467
{
468
   assert(my_thread == NULL);
43✔
469
   my_thread = arg;
43✔
470

471
   void *result = (*my_thread->fn)(my_thread->arg);
43✔
472

473
   // Avoid races with stop_world
474
   SCOPED_LOCK(stop_lock);
43✔
475

476
   assert(threads[my_thread->id] == my_thread);
43✔
477
   atomic_store(&(threads[my_thread->id]),  NULL);
43✔
478

479
   atomic_add(&running_threads, -1);
43✔
480
   return result;
43✔
481
}
482

483
nvc_thread_t *thread_create(thread_fn_t fn, void *arg, const char *fmt, ...)
14✔
484
{
485
   va_list ap;
14✔
486
   va_start(ap, fmt);
14✔
487
   char *name = xvasprintf(fmt, ap);
14✔
488
   va_end(ap);
14✔
489

490
   // Avoid races with stop_world
491
   SCOPED_LOCK(stop_lock);
14✔
492

493
   nvc_thread_t *thread = thread_new(fn, arg, USER_THREAD, name);
14✔
494

495
   PTHREAD_CHECK(pthread_create, &(thread->handle), NULL,
14✔
496
                 thread_wrapper, thread);
497

498
#ifdef __APPLE__
499
   thread->port = pthread_mach_thread_np(thread->handle);
500
#endif
501

502
   return thread;
14✔
503
}
504

505
void *thread_join(nvc_thread_t *thread)
43✔
506
{
507
   if (thread == my_thread || thread->kind == MAIN_THREAD)
43✔
508
      fatal_trace("cannot join self or main thread");
×
509

510
   void *retval = NULL;
43✔
511
   PTHREAD_CHECK(pthread_join, thread->handle, &retval);
43✔
512

513
   async_free(thread->name);
43✔
514
   async_free(thread);
43✔
515

516
   return retval;
43✔
517
}
518

519
nvc_thread_t *get_thread(int id)
28,480✔
520
{
521
   assert(id >= 0 && id < MAX_THREADS);
28,480✔
522
   return atomic_load(&threads[id]);
28,480✔
523
}
524

525
static inline parking_bay_t *parking_bay_for(void *cookie)
1,745✔
526
{
527
   return &(parking_bays[mix_bits_64(cookie) % PARKING_BAYS]);
1,745✔
528
}
529

530
static void thread_park(void *cookie, park_fn_t fn)
1,176✔
531
{
532
   parking_bay_t *bay = parking_bay_for(cookie);
1,176✔
533

534
   PTHREAD_CHECK(pthread_mutex_lock, &(bay->mutex));
1,176✔
535
   {
536
      if ((*fn)(bay, cookie)) {
1,176✔
537
         bay->parked++;
1,174✔
538
         PTHREAD_CHECK(pthread_cond_wait, &(bay->cond), &(bay->mutex));
1,174✔
539
         assert(bay->parked > 0);
1,174✔
540
         bay->parked--;
1,174✔
541
      }
542
   }
543
   PTHREAD_CHECK(pthread_mutex_unlock, &(bay->mutex));
1,176✔
544
}
1,176✔
545

546
static void thread_unpark(void *cookie, unpark_fn_t fn)
569✔
547
{
548
   parking_bay_t *bay = parking_bay_for(cookie);
569✔
549

550
   if (fn != NULL) {
569✔
551
      PTHREAD_CHECK(pthread_mutex_lock, &(bay->mutex));
569✔
552
      {
553
         (*fn)(bay, cookie);
569✔
554
      }
555
      PTHREAD_CHECK(pthread_mutex_unlock, &(bay->mutex));
569✔
556
   }
557

558
   // Do not use pthread_cond_signal here as multiple threads parked in
559
   // this bay may be waiting on different cookies
560
   PTHREAD_CHECK(pthread_cond_broadcast, &(bay->cond));
569✔
561
}
569✔
562

563
void spin_wait(void)
12,971,300✔
564
{
565
#ifdef __x86_64__
566
   __asm__ volatile ("pause");
12,971,300✔
567
#elif defined __aarch64__
568
   // YIELD is a no-op on most AArch64 cores so also do an ISB to stall
569
   // the pipeline for a bit
570
   __asm__ volatile ("yield; isb");
571
#endif
572
}
12,971,300✔
573

574
static bool lock_park_cb(parking_bay_t *bay, void *cookie)
1,176✔
575
{
576
   nvc_lock_t *lock = cookie;
1,176✔
577

578
   // This is called with the park mutex held: check the lock is still
579
   // owned by someone and the park bit is still set
580
   return relaxed_load(lock) == (IS_LOCKED | HAS_PARKED);
1,176✔
581
}
582

583
static void lock_unpark_cb(parking_bay_t *bay, void *cookie)
569✔
584
{
585
   nvc_lock_t *lock = cookie;
569✔
586

587
   assert(relaxed_load(lock) == (IS_LOCKED | HAS_PARKED));
569✔
588

589
   // Unlock must have release semantics
590
   atomic_store(lock, (bay->parked > 0 ? HAS_PARKED : 0));
569✔
591
}
569✔
592

593
void nvc_lock(nvc_lock_t *lock)
69,986,000✔
594
{
595
   LOCK_EVENT(locks, 1);
69,986,000✔
596
   TSAN_PRE_LOCK(lock);
69,986,000✔
597

598
   int8_t state = relaxed_load(lock);
69,986,000✔
599
   if (state & IS_LOCKED)
69,986,000✔
600
      LOCK_EVENT(contended, 1);
148✔
601
   else if (likely(__atomic_cas(lock, &state, state | IS_LOCKED)))
69,985,900✔
602
      goto locked;  // Fast path: acquired the lock without contention
69,985,900✔
603

604
   for (;;) {
1,216✔
605
      LOCK_EVENT(retries, 1);
1,216✔
606

607
      // Spin a few times waiting for the owner to release the lock
608
      // before parking
609
      int spins = 0;
610
      for (; (state & IS_LOCKED) && spins < LOCK_SPINS;
19,104✔
611
           spins++, state = relaxed_load(lock))
17,888✔
612
         spin_wait();
17,888✔
613

614
      if (spins == LOCK_SPINS) {
1,216✔
615
         // Ignore failures here as we will check the lock state again
616
         // in the callback with the park mutex held
617
         atomic_cas(lock, IS_LOCKED, IS_LOCKED | HAS_PARKED);
1,176✔
618

619
         LOCK_EVENT(parks, 1);
1,176✔
620
         thread_park(lock, lock_park_cb);
1,176✔
621

622
         if ((state = relaxed_load(lock)) & IS_LOCKED) {
1,176✔
623
            // Someone else grabbed the lock before our thread was unparked
624
            LOCK_EVENT(spurious, 1);
1,066✔
625
            continue;
1,066✔
626
         }
627
      }
628
      else
629
         LOCK_EVENT(spins, spins);
40✔
630

631
      assert(!(state & IS_LOCKED));
150✔
632

633
      // If we get here then we've seen the lock in an unowned state:
634
      // attempt to grab it with a CAS
635
      if (__atomic_cas(lock, &state, state | IS_LOCKED))
150✔
636
         goto locked;
150✔
637
   }
638

639
 locked:
69,986,000✔
640
   TSAN_POST_LOCK(lock);
69,986,000✔
641
}
69,986,000✔
642

643
void nvc_unlock(nvc_lock_t *lock)
69,986,000✔
644
{
645
   TSAN_PRE_UNLOCK(lock);
69,986,000✔
646

647
   // Fast path: unlock assuming no parked waiters
648
   if (likely(atomic_cas(lock, IS_LOCKED, 0)))
69,986,000✔
649
      goto unlocked;
69,985,500✔
650

651
   // If we get here then we must own the lock with at least one parked
652
   // waiting thread
653
   assert(relaxed_load(lock) == (IS_LOCKED | HAS_PARKED));
569✔
654

655
   // Lock released in callback
656
   thread_unpark(lock, lock_unpark_cb);
569✔
657

658
 unlocked:
69,986,000✔
659
   TSAN_POST_UNLOCK(lock);
69,986,000✔
660
}
69,986,000✔
661

662
#ifdef DEBUG
663
void assert_lock_held(nvc_lock_t *lock)
136,313✔
664
{
665
   int8_t state = relaxed_load(lock);
136,313✔
666
   if (unlikely(!(state & IS_LOCKED)))
136,313✔
667
      fatal_trace("expected lock at %p to be held", lock);
×
668
}
136,313✔
669
#endif
670

671
void __scoped_unlock(nvc_lock_t **plock)
69,935,100✔
672
{
673
   nvc_unlock(*plock);
69,935,100✔
674
}
69,935,100✔
675

676
static void push_bot(threadq_t *tq, const task_t *tasks, size_t count)
68✔
677
{
678
   const abp_idx_t bot = relaxed_load(&tq->bot);
68✔
679
   assert(bot + count <= THREADQ_SIZE);
68✔
680

681
   memcpy(tq->deque + bot, tasks, count * sizeof(task_t));
68✔
682
   store_release(&tq->bot, bot + count);
68✔
683
}
68✔
684

685
static bool pop_bot(threadq_t *tq, task_t *task)
236✔
686
{
687
   const abp_idx_t old_bot = relaxed_load(&tq->bot);
236✔
688
   if (old_bot == 0)
236✔
689
      return false;
690

691
   const abp_idx_t new_bot = old_bot - 1;
192✔
692
   atomic_store(&tq->bot, new_bot);
192✔
693

694
   *task = tq->deque[new_bot];
192✔
695

696
   const abp_age_t old_age = { .bits = atomic_load(&tq->age.bits) };
192✔
697
   if (new_bot > old_age.top)
192✔
698
      return true;
699

700
   atomic_store(&tq->bot, 0);
68✔
701

702
   const abp_age_t new_age = { .top = 0, .tag = old_age.tag + 1 };
68✔
703
   if (new_bot == old_age.top) {
68✔
704
      if (atomic_cas(&tq->age.bits, old_age.bits, new_age.bits))
44✔
705
         return true;
706
   }
707

708
   atomic_store(&tq->age.bits, new_age.bits);
24✔
709
   return false;
24✔
710
}
711

712
__attribute__((no_sanitize("thread")))
713
static bool pop_top(threadq_t *tq, task_t *task)
39✔
714
{
715
   const abp_age_t old_age = { .bits = atomic_load(&tq->age.bits) };
39✔
716
   const abp_idx_t bot = atomic_load(&tq->bot);
39✔
717

718
   if (bot <= old_age.top)
39✔
719
      return false;
720

721
   // This triggers a TSAN false-positive: we will never read from *task
722
   // if the CAS fails below, so it's safe to ignore
723
   *task = tq->deque[old_age.top];
39✔
724

725
   const abp_age_t new_age = {
39✔
726
      .tag = old_age.tag,
727
      .top = old_age.top + 1
39✔
728
   };
729

730
   return atomic_cas(&tq->age.bits, old_age.bits, new_age.bits);
39✔
731
}
732

733
static void globalq_put(globalq_t *gq, const task_t *tasks, size_t count)
138✔
734
{
735
   assert_lock_held(&gq->lock);
138✔
736

737
   if (gq->wptr == gq->rptr)
138✔
738
      gq->wptr = gq->rptr = 0;
64✔
739

740
   if (gq->wptr + count > gq->max) {
138✔
741
      gq->max = next_power_of_2(gq->wptr + count);
36✔
742
      gq->tasks = xrealloc_array(gq->tasks, gq->max, sizeof(task_t));
36✔
743
   }
744

745
   memcpy(gq->tasks + gq->wptr, tasks, count * sizeof(task_t));
138✔
746
   gq->wptr += count;
138✔
747
}
138✔
748

749
__attribute__((no_sanitize("thread")))
750
static bool globalq_unlocked_empty(globalq_t *gq)
250✔
751
{
752
   return relaxed_load(&gq->wptr) == relaxed_load(&gq->rptr);
250✔
753
}
754

755
static size_t globalq_take(globalq_t *gq, threadq_t *tq)
250✔
756
{
757
   if (globalq_unlocked_empty(gq))
250✔
758
      return 0;
759

760
   const int nthreads = relaxed_load(&running_threads);
69✔
761

762
   SCOPED_LOCK(gq->lock);
138✔
763

764
   if (gq->wptr == gq->rptr)
69✔
765
      return 0;
766

767
   const int remain = gq->wptr - gq->rptr;
68✔
768
   const int share = gq->wptr / nthreads;
68✔
769
   const int take = MIN(remain, MAX(MIN_TAKE, MIN(THREADQ_SIZE, share)));
68✔
770
   const int from = gq->rptr;
68✔
771

772
   gq->rptr += take;
68✔
773

774
   push_bot(tq, gq->tasks + from, take);
68✔
775
   return take;
68✔
776
}
777

778
static void execute_task(task_t *task)
1,341✔
779
{
780
   (*task->fn)(task->context, task->arg);
1,341✔
781

782
   if (task->workq != NULL) {
1,341✔
783
      entryq_t *eq = &(task->workq->entryqs[my_thread->id]);
1,230✔
784

785
      const entryq_ptr_t cur = { .bits = relaxed_load(&eq->comp.bits) };
1,230✔
786
      const int epoch = atomic_load(&task->workq->epoch);
1,230✔
787
      const int count = cur.epoch == epoch ? cur.count : 0;
1,230✔
788
      const entryq_ptr_t next = { .count = count + 1, .epoch = epoch };
1,230✔
789
      store_release(&eq->comp.bits, next.bits);
1,230✔
790
   }
791
   else
792
      atomic_add(&async_pending, -1);
111✔
793
}
1,341✔
794

795
static bool globalq_poll(globalq_t *gq, threadq_t *tq)
250✔
796
{
797
   int ntasks;
250✔
798
   if ((ntasks = globalq_take(gq, tq))) {
250✔
799
      task_t task;
68✔
800
      int comp = 0;
68✔
801
      for (; pop_bot(tq, &task); comp++)
304✔
802
         execute_task(&task);
168✔
803

804
      WORKQ_EVENT(comp, comp);
68✔
805
      return true;
68✔
806
   }
807
   else
808
      return false;
809
}
810

811
workq_t *workq_new(void *context)
1,161✔
812
{
813
   if (my_thread->kind != MAIN_THREAD)
1,161✔
814
      fatal_trace("work queues can only be created by the main thread");
×
815

816
   workq_t *wq = xcalloc(sizeof(workq_t));
1,161✔
817
   wq->state    = IDLE;
1,161✔
818
   wq->context  = context;
1,161✔
819
   wq->parallel = max_workers > 1;
1,161✔
820
   wq->epoch    = 1;
1,161✔
821

822
   return wq;
1,161✔
823
}
824

825
void workq_not_thread_safe(workq_t *wq)
×
826
{
827
   wq->parallel = false;
×
828
}
×
829

830
void workq_free(workq_t *wq)
1,161✔
831
{
832
   if (my_thread->kind != MAIN_THREAD)
1,161✔
833
      fatal_trace("work queues can only be freed by the main thread");
×
834

835
   assert(wq->state == IDLE);
1,161✔
836

837
   for (int i = 0; i < MAX_THREADS; i++)
75,465✔
838
      free(wq->entryqs[i].tasks);
74,304✔
839

840
   free(wq);
1,161✔
841
}
1,161✔
842

843
void workq_do(workq_t *wq, task_fn_t fn, void *arg)
1,230✔
844
{
845
   assert(wq->state == IDLE);
1,230✔
846

847
   entryq_t *eq = &(wq->entryqs[my_thread->id]);
1,230✔
848

849
   const entryq_ptr_t cur = { .bits = relaxed_load(&eq->wptr.bits) };
1,230✔
850
   const int epoch = atomic_load(&wq->epoch);
1,230✔
851
   const int wptr = cur.epoch == epoch ? cur.count : 0;
1,230✔
852

853
   if (wptr == eq->queuesz) {
1,230✔
854
      eq->queuesz = MAX(eq->queuesz * 2, 64);
1,161✔
855
      eq->tasks = xrealloc_array(eq->tasks, eq->queuesz, sizeof(task_t));
1,161✔
856
   }
857

858
   eq->tasks[wptr] = (task_t){ fn, wq->context, arg, wq };
1,230✔
859

860
   const entryq_ptr_t next = { .count = wptr + 1, .epoch = epoch };
1,230✔
861
   store_release(&eq->wptr.bits, next.bits);
1,230✔
862

863
   unsigned maxthread = relaxed_load(&wq->maxthread);
1,230✔
864
   while (maxthread < my_thread->id) {
1,230✔
865
      if (__atomic_cas(&wq->maxthread, &maxthread, my_thread->id))
×
866
         break;
867
   }
868
}
1,230✔
869

870
void workq_scan(workq_t *wq, scan_fn_t fn, void *arg)
×
871
{
872
   const int maxthread = relaxed_load(&wq->maxthread);
×
873
   for (int i = 0; i <= maxthread; i++) {
×
874
      entryq_t *eq = &(wq->entryqs[my_thread->id]);
×
875
      const entryq_ptr_t wptr = { .bits = load_acquire(&eq->wptr.bits) };
×
876
      for (int j = 0; j < wptr.count; j++)
×
877
         (*fn)(wq->context, eq->tasks[j].arg, arg);
×
878
   }
879
}
×
880

881
static int estimate_depth(threadq_t *tq)
323✔
882
{
883
   const abp_age_t age = { .bits = relaxed_load(&tq->age.bits) };
323✔
884
   const abp_idx_t bot = relaxed_load(&tq->bot);
323✔
885

886
   return bot <= age.top ? 0 : bot - age.top;
323✔
887
}
888

889
static threadq_t *get_thread_queue(int id)
323✔
890
{
891
   assert(id < MAX_THREADS);
323✔
892

893
   nvc_thread_t *t = atomic_load(&(threads[id]));
323✔
894
   if (t == NULL)
323✔
895
      return NULL;
896

897
   return &(t->queue);
323✔
898
}
899

900
static uint32_t fast_rand(void)
142✔
901
{
902
   uint32_t state = my_thread->rngstate;
142✔
903
   state ^= (state << 13);
142✔
904
   state ^= (state >> 17);
142✔
905
   state ^= (state << 5);
142✔
906
   return (my_thread->rngstate = state);
142✔
907
}
908

909
static threadq_t *find_victim(void)
181✔
910
{
911
   threadq_t *last = get_thread_queue(my_thread->victim);
181✔
912
   if (last != NULL && estimate_depth(last) > 0)
181✔
913
      return last;
914

915
   const int maxthread = relaxed_load(&max_thread_id);
142✔
916
   const int start = fast_rand() % (maxthread + 1);
142✔
917
   int idx = start;
142✔
918
   do {
284✔
919
      if (idx != my_thread->id) {
284✔
920
         threadq_t *q = get_thread_queue(idx);
142✔
921
         if (q != NULL && estimate_depth(q) > 0) {
142✔
922
            my_thread->victim = idx;
×
923
            return q;
×
924
         }
925
      }
926
   } while ((idx = (idx + 1) % (maxthread + 1)) != start);
284✔
927

928
   return NULL;
929
}
930

931
static bool steal_task(void)
181✔
932
{
933
   threadq_t *tq = find_victim();
181✔
934
   if (tq == NULL)
181✔
935
      return false;
936

937
   task_t task;
39✔
938
   if (pop_top(tq, &task)) {
39✔
939
      WORKQ_EVENT(steals, 1);
39✔
940
      execute_task(&task);
39✔
941
      WORKQ_EVENT(comp, 1);
39✔
942
      return true;
39✔
943
   }
944

945
   return false;
946
}
947

948
static void progressive_backoff(void)
13,353,800✔
949
{
950
   if (my_thread->spins++ < YIELD_SPINS)
13,353,800✔
951
      spin_wait();
12,949,200✔
952
   else {
953
      sched_yield();
404,658✔
954
      my_thread->spins = 0;
404,658✔
955
   }
956
}
13,353,800✔
957

958
static void *worker_thread(void *arg)
29✔
959
{
960
   mspace_stack_limit(MSPACE_CURRENT_FRAME);
29✔
961

962
   do {
195✔
963
      if (globalq_poll(&globalq, &(my_thread->queue)) || steal_task())
195✔
964
         my_thread->spins = 0;  // Did work
80✔
965
      else if (my_thread->spins++ < 2)
115✔
966
         spin_wait();
76✔
967
      else {
968
         PTHREAD_CHECK(pthread_mutex_lock, &wakelock);
39✔
969
         {
970
            if (!relaxed_load(&should_stop))
39✔
971
               PTHREAD_CHECK(pthread_cond_wait, &wake_workers, &wakelock);
39✔
972
         }
973
         PTHREAD_CHECK(pthread_mutex_unlock, &wakelock);
39✔
974
      }
975
   } while (likely(!relaxed_load(&should_stop)));
195✔
976

977
   return NULL;
29✔
978
}
979

980
static void create_workers(int needed)
138✔
981
{
982
   assert(my_thread->kind == MAIN_THREAD);
138✔
983

984
   if (relaxed_load(&should_stop))
138✔
985
      return;
986

987
   while (relaxed_load(&running_threads) < MIN(max_workers, needed)) {
167✔
988
      static int counter = 0;
29✔
989
      char *name = xasprintf("worker thread %d", atomic_add(&counter, 1));
29✔
990
      nvc_thread_t *thread =
29✔
991
         thread_new(worker_thread, NULL, WORKER_THREAD, name);
29✔
992

993
      PTHREAD_CHECK(pthread_create, &(thread->handle), NULL,
167✔
994
                    thread_wrapper, thread);
995

996
#ifdef __APPLE__
997
      thread->port = pthread_mach_thread_np(thread->handle);
998
#endif
999
   }
1000

1001
   PTHREAD_CHECK(pthread_cond_broadcast, &wake_workers);
138✔
1002
}
1003

1004
void workq_start(workq_t *wq)
1,161✔
1005
{
1006
   assert(my_thread->kind == MAIN_THREAD);
1,161✔
1007

1008
   const int epoch = relaxed_load(&wq->epoch);
1,161✔
1009
   const int maxthread = relaxed_load(&wq->maxthread);
1,161✔
1010

1011
   assert(wq->state == IDLE);
1,161✔
1012
   wq->state = START;
1,161✔
1013

1014
   int nserial = 0, nparallel = 0;
1,161✔
1015
   for (int i = 0; i <= maxthread; i++) {
2,322✔
1016
      entryq_t *eq = &wq->entryqs[i];
1,161✔
1017
      const entryq_ptr_t wptr = { .bits = load_acquire(&eq->wptr.bits) };
1,161✔
1018
      if (wptr.epoch == epoch) {
1,161✔
1019
         // Only bump epoch if there are tasks to run
1020
         if (nserial + nparallel == 0)
1,161✔
1021
            atomic_add(&wq->epoch, 1);
1,161✔
1022

1023
         assert(wptr.count > 0);
1,161✔
1024

1025
         if (i == maxthread && nserial + nparallel == 0 && wptr.count == 1) {
1,161✔
1026
            execute_task(&(eq->tasks[0]));   // Only one task in total
1,134✔
1027
            nserial++;
1,134✔
1028
         }
1029
         else if (wq->parallel) {
27✔
1030
            if (nparallel == 0)
27✔
1031
               nvc_lock(&globalq.lock);   // Lazily acquire lock
27✔
1032
            globalq_put(&globalq, eq->tasks, wptr.count);
27✔
1033
            nparallel += wptr.count;
27✔
1034
         }
1035
         else {
1036
            for (int j = 0; j < wptr.count; j++)
×
1037
               execute_task(&(eq->tasks[j]));
×
1038
            nserial += wptr.count;
×
1039
         }
1040
      }
1041
   }
1042

1043
   if (wq->parallel && nparallel > 0) {
1,161✔
1044
      nvc_unlock(&globalq.lock);
27✔
1045
      create_workers(nparallel);
27✔
1046
   }
1047
}
1,161✔
1048

1049
static int workq_outstanding(workq_t *wq)
13,215,100✔
1050
{
1051
   assert(wq->state == START);
13,215,100✔
1052

1053
   const int epoch = atomic_load(&wq->epoch);
13,215,100✔
1054
   const int maxthread = relaxed_load(&max_thread_id);
13,215,100✔
1055

1056
   int pending = 0;
13,215,100✔
1057
   for (int i = 0; i <= maxthread; i++) {
39,644,100✔
1058
      entryq_t *eq = &(wq->entryqs[i]);
26,429,000✔
1059

1060
      const entryq_ptr_t wptr = { .bits = load_acquire(&eq->wptr.bits) };
26,429,000✔
1061
      if (wptr.epoch == epoch - 1)
26,429,000✔
1062
         pending += wptr.count;
13,215,100✔
1063

1064
      const entryq_ptr_t comp = { .bits = load_acquire(&eq->comp.bits) };
26,429,000✔
1065
      if (comp.epoch == epoch)
26,429,000✔
1066
         pending -= comp.count;
19,911,700✔
1067
   }
1068

1069
   assert(pending >= 0);
13,215,100✔
1070
   return pending;
13,215,100✔
1071
}
1072

1073
static void workq_parallel_drain(workq_t *wq)
1,161✔
1074
{
1075
   if (workq_outstanding(wq) > 0) {
1,161✔
1076
      while (globalq_poll(&globalq, &(my_thread->queue)));
54✔
1077
      while (steal_task());
27✔
1078

1079
      while (workq_outstanding(wq) > 0)
13,213,900✔
1080
         progressive_backoff();
13,213,900✔
1081
   }
1082

1083
   wq->state = IDLE;
1,161✔
1084
}
1,161✔
1085

1086
void workq_drain(workq_t *wq)
1,161✔
1087
{
1088
   if (my_thread->kind != MAIN_THREAD)
1,161✔
1089
      fatal_trace("workq_drain can only be called from the main thread");
×
1090

1091
   if (wq->parallel)
1,161✔
1092
      workq_parallel_drain(wq);
1,161✔
1093
   else {
1094
      assert(wq->state == START);
×
1095
      wq->state = IDLE;
×
1096
   }
1097
}
1,161✔
1098

1099
void async_do(task_fn_t fn, void *context, void *arg)
111✔
1100
{
1101
   if (max_workers == 1)
111✔
1102
      (*fn)(context, arg);   // Single CPU
×
1103
   else {
1104
      const int npending = atomic_add(&async_pending, 1);
111✔
1105
      create_workers(npending + 1 /* Do not count main thread */);
111✔
1106

1107
      task_t tasks[1] = {
111✔
1108
         { fn, context, arg, NULL }
1109
      };
1110
      SCOPED_LOCK(globalq.lock);
222✔
1111
      globalq_put(&globalq, tasks, 1);
111✔
1112
   }
1113
}
111✔
1114

1115
void async_barrier(void)
7,224✔
1116
{
1117
   while (atomic_load(&async_pending) > 0) {
14,449✔
1118
      if (!globalq_poll(&globalq, &(my_thread->queue)))
1✔
1119
         progressive_backoff();
1✔
1120
   }
1121
}
7,224✔
1122

1123
void async_free(void *ptr)
5,247✔
1124
{
1125
   if (relaxed_load(&running_threads) == 1)
5,247✔
1126
      free(ptr);
5,234✔
1127
   else if (ptr != NULL) {
1128
      // TODO: free when all threads in quiescent state
1129
   }
5,247✔
1130
}
5,247✔
1131

1132
#ifdef POSIX_SUSPEND
1133
static void suspend_handler(int sig, siginfo_t *info, void *context)
3,318✔
1134
{
1135
   if (info->si_pid != getpid())
3,318✔
1136
      return;   // Not sent by us, ignore it
1,659✔
1137
   else if (sig == SIGRESUME)
3,318✔
1138
      return;
1139

1140
   const int olderrno = errno;
1,659✔
1141

1142
   if (my_thread != NULL) {
1,659✔
1143
      struct cpu_state cpu;
1,659✔
1144
      fill_cpu_state(&cpu, (ucontext_t *)context);
1,659✔
1145

1146
      stop_world_fn_t callback = atomic_load(&stop_callback);
1,659✔
1147
      void *arg = atomic_load(&stop_arg);
1,659✔
1148

1149
      (*callback)(my_thread->id, &cpu, arg);
1,659✔
1150
   }
1151

1152
   sem_post(&stop_sem);
1,659✔
1153

1154
   sigset_t mask;
1,659✔
1155
   sigfillset(&mask);
1,659✔
1156
   unmask_fatal_signals(&mask);
1,659✔
1157
   sigdelset(&mask, SIGRESUME);
1,659✔
1158

1159
   sigsuspend(&mask);
1,659✔
1160

1161
   sem_post(&stop_sem);
1,659✔
1162

1163
   errno = olderrno;
1,659✔
1164
}
1165
#endif
1166

1167
void stop_world(stop_world_fn_t callback, void *arg)
945✔
1168
{
1169
   nvc_lock(&stop_lock);
945✔
1170

1171
   atomic_store(&stop_callback, callback);
945✔
1172
   atomic_store(&stop_arg, arg);
945✔
1173

1174
#ifdef __MINGW32__
1175
   const int maxthread = relaxed_load(&max_thread_id);
1176
   for (int i = 0; i <= maxthread; i++) {
1177
      nvc_thread_t *thread = atomic_load(&threads[i]);
1178
      if (thread == NULL || thread == my_thread)
1179
         continue;
1180

1181
      HANDLE h = pthread_gethandle(thread->handle);
1182
      if (SuspendThread(h) != 0)
1183
         fatal_errno("SuspendThread");
1184

1185
      CONTEXT context;
1186
      context.ContextFlags = CONTEXT_INTEGER | CONTEXT_CONTROL;
1187
      if (!GetThreadContext(h, &context))
1188
         fatal_errno("GetThreadContext");
1189

1190
      struct cpu_state cpu;
1191
      fill_cpu_state(&cpu, &context);
1192

1193
      (*callback)(thread->id, &cpu, arg);
1194
   }
1195
#elif defined __APPLE__
1196
   const int maxthread = relaxed_load(&max_thread_id);
1197
   for (int i = 0; i <= maxthread; i++) {
1198
      nvc_thread_t *thread = atomic_load(&threads[i]);
1199
      if (thread == NULL || thread == my_thread)
1200
         continue;
1201

1202
      assert(thread->port != MACH_PORT_NULL);
1203

1204
      kern_return_t kern_result;
1205
      do {
1206
         kern_result = thread_suspend(thread->port);
1207
      } while (kern_result == KERN_ABORTED);
1208

1209
      if (kern_result != KERN_SUCCESS)
1210
         fatal_trace("failed to suspend thread %d (%d)", i, kern_result);
1211

1212
#ifdef __aarch64__
1213
      arm_thread_state64_t state;
1214
      mach_msg_type_number_t count = ARM_THREAD_STATE64_COUNT;
1215
      thread_state_flavor_t flavor = ARM_THREAD_STATE64;
1216
#else
1217
      x86_thread_state64_t state;
1218
      mach_msg_type_number_t count = x86_THREAD_STATE64_COUNT;
1219
      thread_state_flavor_t flavor = x86_THREAD_STATE64;
1220
#endif
1221
      kern_result = thread_get_state(thread->port, flavor,
1222
                                     (natural_t *)&state, &count);
1223
      if (kern_result != KERN_SUCCESS)
1224
         fatal_trace("failed to get thread %d state (%d)", i, kern_result);
1225

1226
      // Fake a ucontext_t that we can pass to fill_cpu_state
1227
      ucontext_t uc;
1228
      typeof(*uc.uc_mcontext) mc;
1229
      uc.uc_mcontext = &mc;
1230
      mc.__ss = state;
1231

1232
      struct cpu_state cpu;
1233
      fill_cpu_state(&cpu, &uc);
1234

1235
      (*callback)(thread->id, &cpu, arg);
1236
   }
1237
#elif defined __SANITIZE_THREAD__
1238
   // https://github.com/google/sanitizers/issues/1179
1239
   fatal_trace("stop_world is not supported with tsan");
1240
#else
1241
   assert(sem_trywait(&stop_sem) == -1 && errno == EAGAIN);
945✔
1242

1243
   int signalled = 0;
945✔
1244
   const int maxthread = relaxed_load(&max_thread_id);
945✔
1245
   for (int i = 0; i <= maxthread; i++) {
4,295✔
1246
      nvc_thread_t *thread = atomic_load(&threads[i]);
3,350✔
1247
      if (thread == NULL || thread == my_thread)
3,350✔
1248
         continue;
1,691✔
1249

1250
      PTHREAD_CHECK(pthread_kill, thread->handle, SIGSUSPEND);
1,659✔
1251
      signalled++;
1,659✔
1252
   }
1253

1254
   struct timespec ts;
945✔
1255
   if (clock_gettime(CLOCK_REALTIME, &ts) != 0)
945✔
1256
      fatal_errno("clock_gettime");
×
1257

1258
   ts.tv_sec += SUSPEND_TIMEOUT;
945✔
1259

1260
   for (; signalled > 0; signalled--) {
2,604✔
1261
      if (sem_timedwait(&stop_sem, &ts) != 0)
1,659✔
1262
         fatal_trace("timeout waiting for %d threads to suspend", signalled);
×
1263
   }
1264

1265
   assert(sem_trywait(&stop_sem) == -1 && errno == EAGAIN);
945✔
1266
#endif
1267

1268
   struct cpu_state cpu;
945✔
1269
   capture_registers(&cpu);
945✔
1270

1271
   (*callback)(my_thread->id, &cpu, arg);
945✔
1272
}
945✔
1273

1274
void start_world(void)
945✔
1275
{
1276
   assert_lock_held(&stop_lock);
945✔
1277

1278
   const int maxthread = relaxed_load(&max_thread_id);
945✔
1279

1280
#ifdef __MINGW32__
1281
   for (int i = 0; i <= maxthread; i++) {
1282
      nvc_thread_t *thread = atomic_load(&threads[i]);
1283
      if (thread == NULL || thread == my_thread)
1284
         continue;
1285

1286
      HANDLE h = pthread_gethandle(thread->handle);
1287
      if (ResumeThread(h) != 1)
1288
         fatal_errno("ResumeThread");
1289
   }
1290
#elif defined __APPLE__
1291
   for (int i = 0; i <= maxthread; i++) {
1292
      nvc_thread_t *thread = atomic_load(&threads[i]);
1293
      if (thread == NULL || thread == my_thread)
1294
         continue;
1295

1296
      kern_return_t kern_result;
1297
      do {
1298
         kern_result = thread_resume(thread->port);
1299
      } while (kern_result == KERN_ABORTED);
1300

1301
      if (kern_result != KERN_SUCCESS)
1302
         fatal_trace("failed to resume thread %d (%d)", i, kern_result);
1303
   }
1304
#else
1305
   assert(sem_trywait(&stop_sem) == -1 && errno == EAGAIN);
945✔
1306

1307
   int signalled = 0;
1308
   for (int i = 0; i <= maxthread; i++) {
4,295✔
1309
      nvc_thread_t *thread = atomic_load(&threads[i]);
3,350✔
1310
      if (thread == NULL || thread == my_thread)
3,350✔
1311
         continue;
1,691✔
1312

1313
      PTHREAD_CHECK(pthread_kill, thread->handle, SIGRESUME);
1,659✔
1314
      signalled++;
1,659✔
1315
   }
1316

1317
   struct timespec ts;
945✔
1318
   if (clock_gettime(CLOCK_REALTIME, &ts) != 0)
945✔
1319
      fatal_errno("clock_gettime");
×
1320

1321
   ts.tv_sec += SUSPEND_TIMEOUT;
945✔
1322

1323
   for (; signalled > 0; signalled--) {
2,604✔
1324
      if (sem_timedwait(&stop_sem, &ts) != 0)
1,659✔
1325
         fatal_trace("timeout waiting for %d threads to resume", signalled);
×
1326
   }
1327

1328
   assert(sem_trywait(&stop_sem) == -1 && errno == EAGAIN);
945✔
1329
#endif
1330

1331
   nvc_unlock(&stop_lock);
945✔
1332
}
945✔
1333

1334
void thread_wx_mode(wx_mode_t mode)
11,941✔
1335
{
1336
#ifdef __APPLE__
1337
   pthread_jit_write_protect_np(mode == WX_EXECUTE);
1338
#else
1339
   // Could use Intel memory protection keys here
1340
#endif
1341
}
11,941✔
1342

1343
barrier_t *barrier_new(int count)
1✔
1344
{
1345
   barrier_t *b = xcalloc(sizeof(barrier_t));
1✔
1346
   b->count = count;
1✔
1347
   return b;
1✔
1348
}
1349

1350
void barrier_free(barrier_t *b)
1✔
1351
{
1352
   free(b);
1✔
1353
}
1✔
1354

1355
void barrier_wait(barrier_t *b)
88✔
1356
{
1357
   const int count = relaxed_load(&b->count);
88✔
1358
   const int passed = relaxed_load(&b->passed);
88✔
1359

1360
   if (atomic_fetch_add(&b->reached, 1) == count - 1) {
88✔
1361
      // Last thread to pass barrier
1362
      relaxed_store(&b->reached, 0);
22✔
1363
      store_release(&b->passed, passed + 1);
22✔
1364
   }
1365
   else {
1366
      while (load_acquire(&b->passed) == passed)
139,994✔
1367
         progressive_backoff();
139,928✔
1368
   }
1369
}
88✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc