• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nickg / nvc / 6537410547

16 Oct 2023 04:40PM UTC coverage: 91.176% (-0.006%) from 91.182%
6537410547

push

github

nickg
Sync NEWS.md from 1.10 branch

49050 of 53797 relevant lines covered (91.18%)

591937.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.43
/src/thread.c
1
//
2
//  Copyright (C) 2021-2023  Nick Gasson
3
//
4
//  This program is free software: you can redistribute it and/or modify
5
//  it under the terms of the GNU General Public License as published by
6
//  the Free Software Foundation, either version 3 of the License, or
7
//  (at your option) any later version.
8
//
9
//  This program is distributed in the hope that it will be useful,
10
//  but WITHOUT ANY WARRANTY; without even the implied warranty of
11
//  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
//  GNU General Public License for more details.
13
//
14
//  You should have received a copy of the GNU General Public License
15
//  along with this program.  If not, see <http://www.gnu.org/licenses/>.
16
//
17

18
#include "util.h"
19
#include "array.h"
20
#include "cpustate.h"
21
#include "rt/mspace.h"
22
#include "thread.h"
23

24
#include <assert.h>
25
#include <errno.h>
26
#include <signal.h>
27
#include <stdlib.h>
28
#include <string.h>
29
#include <inttypes.h>
30
#include <unistd.h>
31
#include <semaphore.h>
32

33
#ifdef HAVE_PTHREAD
34
#include <pthread.h>
35
#else
36
#error missing pthread support
37
#endif
38

39
#ifdef __SANITIZE_THREAD__
40
#include <sanitizer/tsan_interface.h>
41
#endif
42

43
#if defined __MINGW32__
44
#define WIN32_LEAN_AND_MEAN
45
#include <windows.h>
46
#elif defined __APPLE__
47
#define task_t __task_t
48
#include <mach/thread_act.h>
49
#include <mach/machine.h>
50
#undef task_t
51
#endif
52

53
#define LOCK_SPINS      15
54
#define YIELD_SPINS     32
55
#define MIN_TAKE        8
56
#define PARKING_BAYS    64
57
#define SUSPEND_TIMEOUT 1
58

59
#if !defined __MINGW32__ && !defined __APPLE__
60
#define POSIX_SUSPEND 1
61
#define SIGSUSPEND    SIGRTMIN
62
#define SIGRESUME     (SIGRTMIN + 1)
63
#endif
64

65
typedef struct {
66
   int64_t locks;
67
   int64_t spins;
68
   int64_t contended;
69
   int64_t parks;
70
   int64_t spurious;
71
   int64_t retries;
72
} __attribute__((aligned(64))) lock_stats_t;
73

74
STATIC_ASSERT(sizeof(lock_stats_t) == 64)
75

76
#ifdef DEBUG
77
#define LOCK_EVENT(what, n) do {                  \
78
      if (likely(my_thread != NULL))              \
79
         lock_stats[my_thread->id].what += (n);   \
80
   } while (0)
81

82
#define WORKQ_EVENT(what, n) do {                 \
83
      if (likely(my_thread != NULL))              \
84
         workq_stats[my_thread->id].what += (n);  \
85
   } while (0)
86
#else
87
#define LOCK_EVENT(what, n)
88
#define WORKQ_EVENT(what, n)
89
#endif
90

91
#ifdef __SANITIZE_THREAD__
92
#define TSAN_PRE_LOCK(addr) \
93
   __tsan_mutex_pre_lock(addr, __tsan_mutex_linker_init);
94
#define TSAN_POST_LOCK(addr) \
95
   __tsan_mutex_post_lock(addr, __tsan_mutex_linker_init, 0);
96
#define TSAN_PRE_UNLOCK(addr) \
97
   __tsan_mutex_pre_unlock(addr, __tsan_mutex_linker_init);
98
#define TSAN_POST_UNLOCK(addr) \
99
   __tsan_mutex_post_unlock(addr, __tsan_mutex_linker_init);
100
#else
101
#define TSAN_PRE_LOCK(addr)
102
#define TSAN_POST_LOCK(addr)
103
#define TSAN_PRE_UNLOCK(addr)
104
#define TSAN_POST_UNLOCK(addr)
105
#endif
106

107
#define PTHREAD_CHECK(op, ...) do {             \
108
      if (unlikely(op(__VA_ARGS__) != 0))       \
109
         fatal_errno(#op);                      \
110
   } while (0)
111

112
// Lock implementation based on WTF::Lock in WebKit
113
//   https://webkit.org/blog/6161/locking-in-webkit/
114

115
typedef enum {
116
   IS_LOCKED  = (1 << 0),
117
   HAS_PARKED = (1 << 1)
118
} lock_bits_t;
119

120
typedef struct {
121
   pthread_mutex_t mutex;
122
   pthread_cond_t  cond;
123
   int             parked;
124
} __attribute__((aligned(64))) parking_bay_t;
125

126
typedef bool (*park_fn_t)(parking_bay_t *, void *);
127
typedef void (*unpark_fn_t)(parking_bay_t *, void *);
128

129
typedef struct {
130
   task_fn_t  fn;
131
   void      *context;
132
   void      *arg;
133
   workq_t   *workq;
134
} task_t;
135

136
// Work sealing task queue is based on:
137
//   Arora, N. S., Blumofe, R. D., and Plaxton, C. G.
138
//   Thread scheduling for multiprogrammed multiprocessors.
139
//   Theory of Computing Systems 34, 2 (2001), 115-144.
140

141
typedef uint32_t abp_idx_t;
142
typedef uint32_t abp_tag_t;
143

144
typedef union {
145
   struct {
146
      abp_idx_t top;
147
      abp_tag_t tag;
148
   };
149
   uint64_t bits;
150
} abp_age_t;
151

152
STATIC_ASSERT(sizeof(abp_age_t) <= 8);
153

154
#define THREADQ_SIZE 256
155

156
typedef struct {
157
   task_t    deque[THREADQ_SIZE];
158
   abp_age_t age;
159
   abp_idx_t bot;
160
} __attribute__((aligned(64))) threadq_t;
161

162
typedef enum { IDLE, START } workq_state_t;
163

164
typedef union {
165
   struct {
166
      uint32_t count;
167
      uint32_t epoch;
168
   };
169
   uint64_t bits;
170
} entryq_ptr_t;
171

172
STATIC_ASSERT(sizeof(entryq_ptr_t) <= 8);
173

174
typedef struct {
175
   task_t       *tasks;
176
   unsigned      queuesz;
177
   entryq_ptr_t  wptr;
178
   entryq_ptr_t  comp;
179
} __attribute__((aligned(64))) entryq_t;
180

181
STATIC_ASSERT(sizeof(entryq_t) == 64);
182

183
struct _workq {
184
   void          *context;
185
   workq_state_t  state;
186
   unsigned       epoch;
187
   unsigned       maxthread;
188
   bool           parallel;
189
   entryq_t       entryqs[MAX_THREADS];
190
};
191

192
typedef struct {
193
   int64_t comp;
194
   int64_t steals;
195
   int64_t wakes;
196
} __attribute__((aligned(64))) workq_stats_t;
197

198
STATIC_ASSERT(sizeof(workq_stats_t) == 64);
199

200
typedef enum {
201
   MAIN_THREAD,
202
   USER_THREAD,
203
   WORKER_THREAD,
204
} thread_kind_t;
205

206
struct _nvc_thread {
207
   unsigned        id;
208
   thread_kind_t   kind;
209
   unsigned        spins;
210
   threadq_t       queue;
211
   char           *name;
212
   pthread_t       handle;
213
   thread_fn_t     fn;
214
   void           *arg;
215
   int             victim;
216
   uint32_t        rngstate;
217
#ifdef __APPLE__
218
   thread_port_t   port;
219
#endif
220
};
221

222
typedef struct {
223
   nvc_lock_t   lock;
224
   task_t      *tasks;
225
   unsigned     wptr;
226
   unsigned     rptr;
227
   unsigned     max;
228
} globalq_t;
229

230
typedef struct _barrier {
231
   unsigned count;
232
   unsigned reached;
233
   unsigned passed;
234
} __attribute__((aligned(64))) barrier_t;
235

236
static parking_bay_t parking_bays[PARKING_BAYS] = {
237
   [0 ... PARKING_BAYS - 1] = {
238
      PTHREAD_MUTEX_INITIALIZER,
239
      PTHREAD_COND_INITIALIZER
240
   }
241
};
242

243
static nvc_thread_t    *threads[MAX_THREADS];
244
static unsigned         max_workers = 0;
245
static int              running_threads = 0;
246
static unsigned         max_thread_id = 0;
247
static bool             should_stop = false;
248
static globalq_t        globalq __attribute__((aligned(64)));
249
static int              async_pending __attribute__((aligned(64))) = 0;
250
static nvc_lock_t       stop_lock = 0;
251
static stop_world_fn_t  stop_callback = NULL;
252
static void            *stop_arg = NULL;
253
static pthread_cond_t   wake_workers = PTHREAD_COND_INITIALIZER;
254
static pthread_mutex_t  wakelock = PTHREAD_MUTEX_INITIALIZER;
255
#ifdef POSIX_SUSPEND
256
static sem_t            stop_sem;
257
#endif
258

259
#ifdef DEBUG
260
static lock_stats_t  lock_stats[MAX_THREADS];
261
static workq_stats_t workq_stats[MAX_THREADS];
262
#endif
263

264
static __thread nvc_thread_t *my_thread = NULL;
265

266
static parking_bay_t *parking_bay_for(void *cookie);
267

268
#ifdef POSIX_SUSPEND
269
static void suspend_handler(int sig, siginfo_t *info, void *context);
270
#endif
271

272
#ifdef DEBUG
273
static void print_lock_stats(void)
×
274
{
275
   lock_stats_t s = {};
×
276
   workq_stats_t t = {};
×
277
   for (int i = 0; i < MAX_THREADS; i++) {
×
278
      s.locks     += relaxed_load(&(lock_stats[i].locks));
×
279
      s.contended += relaxed_load(&(lock_stats[i].contended));
×
280
      s.parks     += relaxed_load(&(lock_stats[i].parks));
×
281
      s.spins     += relaxed_load(&(lock_stats[i].spins));
×
282
      s.spurious  += relaxed_load(&(lock_stats[i].spurious));
×
283
      s.retries   += relaxed_load(&(lock_stats[i].retries));
×
284

285
      t.comp   += relaxed_load(&(workq_stats[i].comp));
×
286
      t.steals += relaxed_load(&(workq_stats[i].steals));
×
287
      t.wakes  += relaxed_load(&(workq_stats[i].wakes));
×
288
   }
289

290
   printf("\nLock statistics:\n");
×
291
   printf("\tTotal locks      : %"PRIi64"\n", s.locks);
×
292
   printf("\tContended        : %"PRIi64" (%.1f%%)\n",
×
293
          s.contended, 100.0 * ((double)s.contended / (double)s.locks));
×
294
   printf("\tParked           : %"PRIi64" (%.1f%%)\n",
×
295
          s.parks, 100.0 * ((double)s.parks / (double)s.locks));
×
296
   printf("\tAverage spins    : %.1f\n", (double)s.spins / (double)s.retries);
×
297
   printf("\tSpurious wakeups : %"PRIi64"\n", s.spurious);
×
298

299
   printf("\nWork queue statistics:\n");
×
300
   printf("\tCompleted tasks  : %"PRIi64"\n", t.comp);
×
301
   printf("\tSteals           : %"PRIi64"\n", t.steals);
×
302
   printf("\tWakeups          : %"PRIi64"\n", t.wakes);
×
303
}
×
304
#endif
305

306
static void join_worker_threads(void)
4,062✔
307
{
308
   SCOPED_A(nvc_thread_t *) join_list = AINIT;
8,124✔
309

310
   for (int i = 1; i < MAX_THREADS; i++) {
259,968✔
311
      nvc_thread_t *t = atomic_load(&threads[i]);
255,906✔
312
      if (t != NULL)
255,906✔
313
         APUSH(join_list, t);
255,906✔
314
   }
315

316
   // Lock the wake mutex here to avoid races with workers sleeping
317
   PTHREAD_CHECK(pthread_mutex_lock, &wakelock);
4,062✔
318
   {
319
      atomic_store(&should_stop, true);
4,062✔
320
      PTHREAD_CHECK(pthread_cond_broadcast, &wake_workers);
4,062✔
321
   }
322
   PTHREAD_CHECK(pthread_mutex_unlock, &wakelock);
4,062✔
323

324
   for (int i = 0; i < join_list.count; i++) {
4,091✔
325
      nvc_thread_t *t = join_list.items[i];
29✔
326

327
      switch (relaxed_load(&t->kind)) {
29✔
328
      case WORKER_THREAD:
29✔
329
         thread_join(t);
29✔
330
         continue;  // Freed thread struct
29✔
331
      case USER_THREAD:
×
332
      case MAIN_THREAD:
333
         fatal_trace("leaked a user thread: %s", t->name);
×
334
      }
335
   }
336

337
   assert(atomic_load(&running_threads) == 1);
4,062✔
338
}
4,062✔
339

340
static nvc_thread_t *thread_new(thread_fn_t fn, void *arg,
4,105✔
341
                                thread_kind_t kind, char *name)
342
{
343
   nvc_thread_t *thread = xcalloc(sizeof(nvc_thread_t));
4,105✔
344
   thread->name     = name;
4,105✔
345
   thread->fn       = fn;
4,105✔
346
   thread->arg      = arg;
4,105✔
347
   thread->kind     = kind;
4,105✔
348
   thread->rngstate = rand();
4,105✔
349

350
   atomic_store(&thread->queue.age.bits, 0);
4,105✔
351
   atomic_store(&thread->queue.bot, 0);
4,105✔
352

353
   int id = 0;
4,105✔
354
   for (; id < MAX_THREADS; id++) {
4,162✔
355
      if (relaxed_load(&(threads[id])) != NULL)
4,162✔
356
         continue;
57✔
357
      else if (atomic_cas(&(threads[id]), NULL, thread))
4,105✔
358
         break;
359
   }
360

361
   if (id == MAX_THREADS)
4,105✔
362
      fatal_trace("cannot create more than %d threads", MAX_THREADS);
×
363

364
   thread->id = id;
4,105✔
365

366
   unsigned max = relaxed_load(&max_thread_id);
4,105✔
367
   while (max < id) {
4,105✔
368
      if (__atomic_cas(&max_thread_id, &max, id))
39✔
369
         break;
370
   }
371

372
   atomic_add(&running_threads, 1);
4,105✔
373
   return thread;
4,105✔
374
}
375

376
#ifdef POSIX_SUSPEND
377
static void unmask_fatal_signals(sigset_t *mask)
4,985✔
378
{
379
   sigdelset(mask, SIGQUIT);
4,985✔
380
   sigdelset(mask, SIGABRT);
4,985✔
381
   sigdelset(mask, SIGTERM);
4,985✔
382
}
4,985✔
383
#endif
384

385
#ifdef __APPLE__
386
static void reset_mach_ports(void)
387
{
388
   for (int i = 0; i < MAX_THREADS; i++) {
389
      nvc_thread_t *t = atomic_load(&(threads[i]));
390
      if (t == NULL)
391
         continue;
392

393
      // Mach ports are not valid after fork
394
      t->port = pthread_mach_thread_np(t->handle);
395
   }
396
}
397
#endif
398

399
void thread_init(void)
4,062✔
400
{
401
   assert(my_thread == NULL);
4,062✔
402

403
   my_thread = thread_new(NULL, NULL, MAIN_THREAD, xstrdup("main thread"));
4,062✔
404
   my_thread->handle = pthread_self();
4,062✔
405

406
#ifdef __APPLE__
407
   my_thread->port = pthread_mach_thread_np(my_thread->handle);
408
   pthread_atfork(NULL, NULL, reset_mach_ports);
409
#endif
410

411
   assert(my_thread->id == 0);
4,062✔
412

413
   const char *env = getenv("NVC_MAX_THREADS");
4,062✔
414
   if (env != NULL)
4,062✔
415
      max_workers = MAX(1, MIN(atoi(env), MAX_THREADS));
×
416
   else
417
      max_workers = MIN(nvc_nprocs(), DEFAULT_THREADS);
4,062✔
418
   assert(max_workers > 0);
4,062✔
419

420
#ifdef DEBUG
421
   if (getenv("NVC_THREAD_VERBOSE") != NULL)
4,062✔
422
      atexit(print_lock_stats);
×
423
#endif
424

425
   atexit(join_worker_threads);
4,062✔
426

427
#ifdef POSIX_SUSPEND
428
   sem_init(&stop_sem, 0, 0);
4,062✔
429

430
   struct sigaction sa = {
4,062✔
431
      .sa_sigaction = suspend_handler,
432
      .sa_flags = SA_RESTART | SA_SIGINFO
433
   };
434
   sigfillset(&sa.sa_mask);
4,062✔
435
   unmask_fatal_signals(&sa.sa_mask);
4,062✔
436

437
   sigaction(SIGSUSPEND, &sa, NULL);
4,062✔
438
   sigaction(SIGRESUME, &sa, NULL);
4,062✔
439

440
   sigset_t mask;
4,062✔
441
   PTHREAD_CHECK(pthread_sigmask, SIG_SETMASK, NULL, &mask);
4,062✔
442

443
   sigdelset(&mask, SIGSUSPEND);
4,062✔
444
   sigaddset(&mask, SIGRESUME);
4,062✔
445

446
   PTHREAD_CHECK(pthread_sigmask, SIG_SETMASK, &mask, NULL);
4,062✔
447
#endif
448
}
4,062✔
449

450
int thread_id(void)
74,571,500✔
451
{
452
   assert(my_thread != NULL);
74,571,500✔
453
   return my_thread->id;
74,571,500✔
454
}
455

456
bool thread_attached(void)
42✔
457
{
458
   return my_thread != NULL;
42✔
459
}
460

461
void thread_sleep(int usec)
×
462
{
463
   usleep(usec);
×
464
}
×
465

466
static void *thread_wrapper(void *arg)
43✔
467
{
468
   assert(my_thread == NULL);
43✔
469
   my_thread = arg;
43✔
470

471
   void *result = (*my_thread->fn)(my_thread->arg);
43✔
472

473
   // Avoid races with stop_world
474
   SCOPED_LOCK(stop_lock);
43✔
475

476
   assert(threads[my_thread->id] == my_thread);
43✔
477
   atomic_store(&(threads[my_thread->id]),  NULL);
43✔
478

479
   atomic_add(&running_threads, -1);
43✔
480
   return result;
43✔
481
}
482

483
nvc_thread_t *thread_create(thread_fn_t fn, void *arg, const char *fmt, ...)
14✔
484
{
485
   va_list ap;
14✔
486
   va_start(ap, fmt);
14✔
487
   char *name = xvasprintf(fmt, ap);
14✔
488
   va_end(ap);
14✔
489

490
   // Avoid races with stop_world
491
   SCOPED_LOCK(stop_lock);
14✔
492

493
   nvc_thread_t *thread = thread_new(fn, arg, USER_THREAD, name);
14✔
494

495
   PTHREAD_CHECK(pthread_create, &(thread->handle), NULL,
14✔
496
                 thread_wrapper, thread);
497

498
#ifdef __APPLE__
499
   thread->port = pthread_mach_thread_np(thread->handle);
500
#endif
501

502
   return thread;
14✔
503
}
504

505
void *thread_join(nvc_thread_t *thread)
43✔
506
{
507
   if (thread == my_thread || thread->kind == MAIN_THREAD)
43✔
508
      fatal_trace("cannot join self or main thread");
×
509

510
   void *retval = NULL;
43✔
511
   PTHREAD_CHECK(pthread_join, thread->handle, &retval);
43✔
512

513
   async_free(thread->name);
43✔
514
   async_free(thread);
43✔
515

516
   return retval;
43✔
517
}
518

519
nvc_thread_t *get_thread(int id)
28,480✔
520
{
521
   assert(id >= 0 && id < MAX_THREADS);
28,480✔
522
   return atomic_load(&threads[id]);
28,480✔
523
}
524

525
static inline parking_bay_t *parking_bay_for(void *cookie)
1,149✔
526
{
527
   return &(parking_bays[mix_bits_64(cookie) % PARKING_BAYS]);
1,149✔
528
}
529

530
static void thread_park(void *cookie, park_fn_t fn)
674✔
531
{
532
   parking_bay_t *bay = parking_bay_for(cookie);
674✔
533

534
   PTHREAD_CHECK(pthread_mutex_lock, &(bay->mutex));
674✔
535
   {
536
      if ((*fn)(bay, cookie)) {
674✔
537
         bay->parked++;
673✔
538
         PTHREAD_CHECK(pthread_cond_wait, &(bay->cond), &(bay->mutex));
673✔
539
         assert(bay->parked > 0);
673✔
540
         bay->parked--;
673✔
541
      }
542
   }
543
   PTHREAD_CHECK(pthread_mutex_unlock, &(bay->mutex));
674✔
544
}
674✔
545

546
static void thread_unpark(void *cookie, unpark_fn_t fn)
475✔
547
{
548
   parking_bay_t *bay = parking_bay_for(cookie);
475✔
549

550
   if (fn != NULL) {
475✔
551
      PTHREAD_CHECK(pthread_mutex_lock, &(bay->mutex));
475✔
552
      {
553
         (*fn)(bay, cookie);
475✔
554
      }
555
      PTHREAD_CHECK(pthread_mutex_unlock, &(bay->mutex));
475✔
556
   }
557

558
   // Do not use pthread_cond_signal here as multiple threads parked in
559
   // this bay may be waiting on different cookies
560
   PTHREAD_CHECK(pthread_cond_broadcast, &(bay->cond));
475✔
561
}
475✔
562

563
void spin_wait(void)
13,233,300✔
564
{
565
#ifdef __x86_64__
566
   __asm__ volatile ("pause");
13,233,300✔
567
#elif defined __aarch64__
568
   // YIELD is a no-op on most AArch64 cores so also do an ISB to stall
569
   // the pipeline for a bit
570
   __asm__ volatile ("yield; isb");
571
#endif
572
}
13,233,300✔
573

574
static bool lock_park_cb(parking_bay_t *bay, void *cookie)
674✔
575
{
576
   nvc_lock_t *lock = cookie;
674✔
577

578
   // This is called with the park mutex held: check the lock is still
579
   // owned by someone and the park bit is still set
580
   return relaxed_load(lock) == (IS_LOCKED | HAS_PARKED);
674✔
581
}
582

583
static void lock_unpark_cb(parking_bay_t *bay, void *cookie)
475✔
584
{
585
   nvc_lock_t *lock = cookie;
475✔
586

587
   assert(relaxed_load(lock) == (IS_LOCKED | HAS_PARKED));
475✔
588

589
   // Unlock must have release semantics
590
   atomic_store(lock, (bay->parked > 0 ? HAS_PARKED : 0));
475✔
591
}
475✔
592

593
void nvc_lock(nvc_lock_t *lock)
69,978,400✔
594
{
595
   LOCK_EVENT(locks, 1);
69,978,400✔
596
   TSAN_PRE_LOCK(lock);
69,978,400✔
597

598
   int8_t state = relaxed_load(lock);
69,978,400✔
599
   if (state & IS_LOCKED)
69,978,400✔
600
      LOCK_EVENT(contended, 1);
67✔
601
   else if (likely(__atomic_cas(lock, &state, state | IS_LOCKED)))
69,978,300✔
602
      goto locked;  // Fast path: acquired the lock without contention
69,978,300✔
603

604
   for (;;) {
696✔
605
      LOCK_EVENT(retries, 1);
696✔
606

607
      // Spin a few times waiting for the owner to release the lock
608
      // before parking
609
      int spins = 0;
610
      for (; (state & IS_LOCKED) && spins < LOCK_SPINS;
10,915✔
611
           spins++, state = relaxed_load(lock))
10,219✔
612
         spin_wait();
10,219✔
613

614
      if (spins == LOCK_SPINS) {
696✔
615
         // Ignore failures here as we will check the lock state again
616
         // in the callback with the park mutex held
617
         atomic_cas(lock, IS_LOCKED, IS_LOCKED | HAS_PARKED);
674✔
618

619
         LOCK_EVENT(parks, 1);
674✔
620
         thread_park(lock, lock_park_cb);
674✔
621

622
         if ((state = relaxed_load(lock)) & IS_LOCKED) {
674✔
623
            // Someone else grabbed the lock before our thread was unparked
624
            LOCK_EVENT(spurious, 1);
627✔
625
            continue;
627✔
626
         }
627
      }
628
      else
629
         LOCK_EVENT(spins, spins);
22✔
630

631
      assert(!(state & IS_LOCKED));
69✔
632

633
      // If we get here then we've seen the lock in an unowned state:
634
      // attempt to grab it with a CAS
635
      if (__atomic_cas(lock, &state, state | IS_LOCKED))
69✔
636
         goto locked;
69✔
637
   }
638

639
 locked:
69,978,400✔
640
   TSAN_POST_LOCK(lock);
69,978,400✔
641
}
69,978,400✔
642

643
void nvc_unlock(nvc_lock_t *lock)
69,978,400✔
644
{
645
   TSAN_PRE_UNLOCK(lock);
69,978,400✔
646

647
   // Fast path: unlock assuming no parked waiters
648
   if (likely(atomic_cas(lock, IS_LOCKED, 0)))
69,978,400✔
649
      goto unlocked;
69,977,900✔
650

651
   // If we get here then we must own the lock with at least one parked
652
   // waiting thread
653
   assert(relaxed_load(lock) == (IS_LOCKED | HAS_PARKED));
475✔
654

655
   // Lock released in callback
656
   thread_unpark(lock, lock_unpark_cb);
475✔
657

658
 unlocked:
69,978,400✔
659
   TSAN_POST_UNLOCK(lock);
69,978,400✔
660
}
69,978,400✔
661

662
#ifdef DEBUG
663
void assert_lock_held(nvc_lock_t *lock)
133,596✔
664
{
665
   int8_t state = relaxed_load(lock);
133,596✔
666
   if (unlikely(!(state & IS_LOCKED)))
133,596✔
667
      fatal_trace("expected lock at %p to be held", lock);
×
668
}
133,596✔
669
#endif
670

671
void __scoped_unlock(nvc_lock_t **plock)
69,927,400✔
672
{
673
   nvc_unlock(*plock);
69,927,400✔
674
}
69,927,400✔
675

676
static void push_bot(threadq_t *tq, const task_t *tasks, size_t count)
57✔
677
{
678
   const abp_idx_t bot = relaxed_load(&tq->bot);
57✔
679
   assert(bot + count <= THREADQ_SIZE);
57✔
680

681
   memcpy(tq->deque + bot, tasks, count * sizeof(task_t));
57✔
682
   store_release(&tq->bot, bot + count);
57✔
683
}
57✔
684

685
static bool pop_bot(threadq_t *tq, task_t *task)
225✔
686
{
687
   const abp_idx_t old_bot = relaxed_load(&tq->bot);
225✔
688
   if (old_bot == 0)
225✔
689
      return false;
690

691
   const abp_idx_t new_bot = old_bot - 1;
193✔
692
   atomic_store(&tq->bot, new_bot);
193✔
693

694
   *task = tq->deque[new_bot];
193✔
695

696
   const abp_age_t old_age = { .bits = atomic_load(&tq->age.bits) };
193✔
697
   if (new_bot > old_age.top)
193✔
698
      return true;
699

700
   atomic_store(&tq->bot, 0);
57✔
701

702
   const abp_age_t new_age = { .top = 0, .tag = old_age.tag + 1 };
57✔
703
   if (new_bot == old_age.top) {
57✔
704
      if (atomic_cas(&tq->age.bits, old_age.bits, new_age.bits))
32✔
705
         return true;
706
   }
707

708
   atomic_store(&tq->age.bits, new_age.bits);
25✔
709
   return false;
25✔
710
}
711

712
__attribute__((no_sanitize("thread")))
713
static bool pop_top(threadq_t *tq, task_t *task)
39✔
714
{
715
   const abp_age_t old_age = { .bits = atomic_load(&tq->age.bits) };
39✔
716
   const abp_idx_t bot = atomic_load(&tq->bot);
39✔
717

718
   if (bot <= old_age.top)
39✔
719
      return false;
720

721
   // This triggers a TSAN false-positive: we will never read from *task
722
   // if the CAS fails below, so it's safe to ignore
723
   *task = tq->deque[old_age.top];
39✔
724

725
   const abp_age_t new_age = {
39✔
726
      .tag = old_age.tag,
727
      .top = old_age.top + 1
39✔
728
   };
729

730
   return atomic_cas(&tq->age.bits, old_age.bits, new_age.bits);
39✔
731
}
732

733
static void globalq_put(globalq_t *gq, const task_t *tasks, size_t count)
138✔
734
{
735
   assert_lock_held(&gq->lock);
138✔
736

737
   if (gq->wptr == gq->rptr)
138✔
738
      gq->wptr = gq->rptr = 0;
52✔
739

740
   if (gq->wptr + count > gq->max) {
138✔
741
      gq->max = next_power_of_2(gq->wptr + count);
36✔
742
      gq->tasks = xrealloc_array(gq->tasks, gq->max, sizeof(task_t));
36✔
743
   }
744

745
   memcpy(gq->tasks + gq->wptr, tasks, count * sizeof(task_t));
138✔
746
   gq->wptr += count;
138✔
747
}
138✔
748

749
__attribute__((no_sanitize("thread")))
750
static bool globalq_unlocked_empty(globalq_t *gq)
246✔
751
{
752
   return relaxed_load(&gq->wptr) == relaxed_load(&gq->rptr);
246✔
753
}
754

755
static size_t globalq_take(globalq_t *gq, threadq_t *tq)
246✔
756
{
757
   if (globalq_unlocked_empty(gq))
246✔
758
      return 0;
759

760
   const int nthreads = relaxed_load(&running_threads);
58✔
761

762
   SCOPED_LOCK(gq->lock);
116✔
763

764
   if (gq->wptr == gq->rptr)
58✔
765
      return 0;
766

767
   const int remain = gq->wptr - gq->rptr;
57✔
768
   const int share = gq->wptr / nthreads;
57✔
769
   const int take = MIN(remain, MAX(MIN_TAKE, MIN(THREADQ_SIZE, share)));
57✔
770
   const int from = gq->rptr;
57✔
771

772
   gq->rptr += take;
57✔
773

774
   push_bot(tq, gq->tasks + from, take);
57✔
775
   return take;
57✔
776
}
777

778
static void execute_task(task_t *task)
1,317✔
779
{
780
   (*task->fn)(task->context, task->arg);
1,317✔
781

782
   if (task->workq != NULL) {
1,317✔
783
      entryq_t *eq = &(task->workq->entryqs[my_thread->id]);
1,206✔
784

785
      const entryq_ptr_t cur = { .bits = relaxed_load(&eq->comp.bits) };
1,206✔
786
      const int epoch = atomic_load(&task->workq->epoch);
1,206✔
787
      const int count = cur.epoch == epoch ? cur.count : 0;
1,206✔
788
      const entryq_ptr_t next = { .count = count + 1, .epoch = epoch };
1,206✔
789
      store_release(&eq->comp.bits, next.bits);
1,206✔
790
   }
791
   else
792
      atomic_add(&async_pending, -1);
111✔
793
}
1,317✔
794

795
static bool globalq_poll(globalq_t *gq, threadq_t *tq)
246✔
796
{
797
   int ntasks;
246✔
798
   if ((ntasks = globalq_take(gq, tq))) {
246✔
799
      task_t task;
57✔
800
      int comp = 0;
57✔
801
      for (; pop_bot(tq, &task); comp++)
282✔
802
         execute_task(&task);
168✔
803

804
      WORKQ_EVENT(comp, comp);
57✔
805
      return true;
57✔
806
   }
807
   else
808
      return false;
809
}
810

811
workq_t *workq_new(void *context)
1,137✔
812
{
813
   if (my_thread->kind != MAIN_THREAD)
1,137✔
814
      fatal_trace("work queues can only be created by the main thread");
×
815

816
   workq_t *wq = xcalloc(sizeof(workq_t));
1,137✔
817
   wq->state    = IDLE;
1,137✔
818
   wq->context  = context;
1,137✔
819
   wq->parallel = max_workers > 1;
1,137✔
820
   wq->epoch    = 1;
1,137✔
821

822
   return wq;
1,137✔
823
}
824

825
void workq_not_thread_safe(workq_t *wq)
×
826
{
827
   wq->parallel = false;
×
828
}
×
829

830
void workq_free(workq_t *wq)
1,137✔
831
{
832
   if (my_thread->kind != MAIN_THREAD)
1,137✔
833
      fatal_trace("work queues can only be freed by the main thread");
×
834

835
   assert(wq->state == IDLE);
1,137✔
836

837
   for (int i = 0; i < MAX_THREADS; i++)
73,905✔
838
      free(wq->entryqs[i].tasks);
72,768✔
839

840
   free(wq);
1,137✔
841
}
1,137✔
842

843
void workq_do(workq_t *wq, task_fn_t fn, void *arg)
1,206✔
844
{
845
   assert(wq->state == IDLE);
1,206✔
846

847
   entryq_t *eq = &(wq->entryqs[my_thread->id]);
1,206✔
848

849
   const entryq_ptr_t cur = { .bits = relaxed_load(&eq->wptr.bits) };
1,206✔
850
   const int epoch = atomic_load(&wq->epoch);
1,206✔
851
   const int wptr = cur.epoch == epoch ? cur.count : 0;
1,206✔
852

853
   if (wptr == eq->queuesz) {
1,206✔
854
      eq->queuesz = MAX(eq->queuesz * 2, 64);
1,137✔
855
      eq->tasks = xrealloc_array(eq->tasks, eq->queuesz, sizeof(task_t));
1,137✔
856
   }
857

858
   eq->tasks[wptr] = (task_t){ fn, wq->context, arg, wq };
1,206✔
859

860
   const entryq_ptr_t next = { .count = wptr + 1, .epoch = epoch };
1,206✔
861
   store_release(&eq->wptr.bits, next.bits);
1,206✔
862

863
   unsigned maxthread = relaxed_load(&wq->maxthread);
1,206✔
864
   while (maxthread < my_thread->id) {
1,206✔
865
      if (__atomic_cas(&wq->maxthread, &maxthread, my_thread->id))
×
866
         break;
867
   }
868
}
1,206✔
869

870
void workq_scan(workq_t *wq, scan_fn_t fn, void *arg)
×
871
{
872
   const int maxthread = relaxed_load(&wq->maxthread);
×
873
   for (int i = 0; i <= maxthread; i++) {
×
874
      entryq_t *eq = &(wq->entryqs[my_thread->id]);
×
875
      const entryq_ptr_t wptr = { .bits = load_acquire(&eq->wptr.bits) };
×
876
      for (int j = 0; j < wptr.count; j++)
×
877
         (*fn)(wq->context, eq->tasks[j].arg, arg);
×
878
   }
879
}
×
880

881
static int estimate_depth(threadq_t *tq)
339✔
882
{
883
   const abp_age_t age = { .bits = relaxed_load(&tq->age.bits) };
339✔
884
   const abp_idx_t bot = relaxed_load(&tq->bot);
339✔
885

886
   return bot <= age.top ? 0 : bot - age.top;
339✔
887
}
888

889
static threadq_t *get_thread_queue(int id)
339✔
890
{
891
   assert(id < MAX_THREADS);
339✔
892

893
   nvc_thread_t *t = atomic_load(&(threads[id]));
339✔
894
   if (t == NULL)
339✔
895
      return NULL;
896

897
   return &(t->queue);
339✔
898
}
899

900
static uint32_t fast_rand(void)
150✔
901
{
902
   uint32_t state = my_thread->rngstate;
150✔
903
   state ^= (state << 13);
150✔
904
   state ^= (state >> 17);
150✔
905
   state ^= (state << 5);
150✔
906
   return (my_thread->rngstate = state);
150✔
907
}
908

909
static threadq_t *find_victim(void)
189✔
910
{
911
   threadq_t *last = get_thread_queue(my_thread->victim);
189✔
912
   if (last != NULL && estimate_depth(last) > 0)
189✔
913
      return last;
914

915
   const int maxthread = relaxed_load(&max_thread_id);
150✔
916
   const int start = fast_rand() % (maxthread + 1);
150✔
917
   int idx = start;
150✔
918
   do {
300✔
919
      if (idx != my_thread->id) {
300✔
920
         threadq_t *q = get_thread_queue(idx);
150✔
921
         if (q != NULL && estimate_depth(q) > 0) {
150✔
922
            my_thread->victim = idx;
×
923
            return q;
×
924
         }
925
      }
926
   } while ((idx = (idx + 1) % (maxthread + 1)) != start);
300✔
927

928
   return NULL;
929
}
930

931
static bool steal_task(void)
189✔
932
{
933
   threadq_t *tq = find_victim();
189✔
934
   if (tq == NULL)
189✔
935
      return false;
936

937
   task_t task;
39✔
938
   if (pop_top(tq, &task)) {
39✔
939
      WORKQ_EVENT(steals, 1);
39✔
940
      execute_task(&task);
39✔
941
      WORKQ_EVENT(comp, 1);
39✔
942
      return true;
39✔
943
   }
944

945
   return false;
946
}
947

948
static void progressive_backoff(void)
13,632,100✔
949
{
950
   if (my_thread->spins++ < YIELD_SPINS)
13,632,100✔
951
      spin_wait();
13,219,000✔
952
   else {
953
      sched_yield();
413,084✔
954
      my_thread->spins = 0;
413,084✔
955
   }
956
}
13,632,100✔
957

958
static void *worker_thread(void *arg)
29✔
959
{
960
   mspace_stack_limit(MSPACE_CURRENT_FRAME);
29✔
961

962
   do {
191✔
963
      if (globalq_poll(&globalq, &(my_thread->queue)) || steal_task())
191✔
964
         my_thread->spins = 0;  // Did work
68✔
965
      else if (my_thread->spins++ < 2)
123✔
966
         spin_wait();
82✔
967
      else {
968
         PTHREAD_CHECK(pthread_mutex_lock, &wakelock);
41✔
969
         {
970
            if (!relaxed_load(&should_stop))
41✔
971
               PTHREAD_CHECK(pthread_cond_wait, &wake_workers, &wakelock);
41✔
972
         }
973
         PTHREAD_CHECK(pthread_mutex_unlock, &wakelock);
41✔
974
      }
975
   } while (likely(!relaxed_load(&should_stop)));
191✔
976

977
   return NULL;
29✔
978
}
979

980
static void create_workers(int needed)
138✔
981
{
982
   assert(my_thread->kind == MAIN_THREAD);
138✔
983

984
   if (relaxed_load(&should_stop))
138✔
985
      return;
986

987
   while (relaxed_load(&running_threads) < MIN(max_workers, needed)) {
167✔
988
      static int counter = 0;
29✔
989
      char *name = xasprintf("worker thread %d", atomic_add(&counter, 1));
29✔
990
      nvc_thread_t *thread =
29✔
991
         thread_new(worker_thread, NULL, WORKER_THREAD, name);
29✔
992

993
      PTHREAD_CHECK(pthread_create, &(thread->handle), NULL,
167✔
994
                    thread_wrapper, thread);
995

996
#ifdef __APPLE__
997
      thread->port = pthread_mach_thread_np(thread->handle);
998
#endif
999
   }
1000

1001
   PTHREAD_CHECK(pthread_cond_broadcast, &wake_workers);
138✔
1002
}
1003

1004
void workq_start(workq_t *wq)
1,137✔
1005
{
1006
   assert(my_thread->kind == MAIN_THREAD);
1,137✔
1007

1008
   const int epoch = relaxed_load(&wq->epoch);
1,137✔
1009
   const int maxthread = relaxed_load(&wq->maxthread);
1,137✔
1010

1011
   assert(wq->state == IDLE);
1,137✔
1012
   wq->state = START;
1,137✔
1013

1014
   int nserial = 0, nparallel = 0;
1,137✔
1015
   for (int i = 0; i <= maxthread; i++) {
2,274✔
1016
      entryq_t *eq = &wq->entryqs[i];
1,137✔
1017
      const entryq_ptr_t wptr = { .bits = load_acquire(&eq->wptr.bits) };
1,137✔
1018
      if (wptr.epoch == epoch) {
1,137✔
1019
         // Only bump epoch if there are tasks to run
1020
         if (nserial + nparallel == 0)
1,137✔
1021
            atomic_add(&wq->epoch, 1);
1,137✔
1022

1023
         assert(wptr.count > 0);
1,137✔
1024

1025
         if (i == maxthread && nserial + nparallel == 0 && wptr.count == 1) {
1,137✔
1026
            execute_task(&(eq->tasks[0]));   // Only one task in total
1,110✔
1027
            nserial++;
1,110✔
1028
         }
1029
         else if (wq->parallel) {
27✔
1030
            if (nparallel == 0)
27✔
1031
               nvc_lock(&globalq.lock);   // Lazily acquire lock
27✔
1032
            globalq_put(&globalq, eq->tasks, wptr.count);
27✔
1033
            nparallel += wptr.count;
27✔
1034
         }
1035
         else {
1036
            for (int j = 0; j < wptr.count; j++)
×
1037
               execute_task(&(eq->tasks[j]));
×
1038
            nserial += wptr.count;
×
1039
         }
1040
      }
1041
   }
1042

1043
   if (wq->parallel && nparallel > 0) {
1,137✔
1044
      nvc_unlock(&globalq.lock);
27✔
1045
      create_workers(nparallel);
27✔
1046
   }
1047
}
1,137✔
1048

1049
static int workq_outstanding(workq_t *wq)
13,493,000✔
1050
{
1051
   assert(wq->state == START);
13,493,000✔
1052

1053
   const int epoch = atomic_load(&wq->epoch);
13,493,000✔
1054
   const int maxthread = relaxed_load(&max_thread_id);
13,493,000✔
1055

1056
   int pending = 0;
13,493,000✔
1057
   for (int i = 0; i <= maxthread; i++) {
40,477,800✔
1058
      entryq_t *eq = &(wq->entryqs[i]);
26,984,800✔
1059

1060
      const entryq_ptr_t wptr = { .bits = load_acquire(&eq->wptr.bits) };
26,984,800✔
1061
      if (wptr.epoch == epoch - 1)
26,984,800✔
1062
         pending += wptr.count;
13,493,000✔
1063

1064
      const entryq_ptr_t comp = { .bits = load_acquire(&eq->comp.bits) };
26,984,800✔
1065
      if (comp.epoch == epoch)
26,984,800✔
1066
         pending -= comp.count;
20,638,600✔
1067
   }
1068

1069
   assert(pending >= 0);
13,493,000✔
1070
   return pending;
13,493,000✔
1071
}
1072

1073
static void workq_parallel_drain(workq_t *wq)
1,137✔
1074
{
1075
   if (workq_outstanding(wq) > 0) {
1,137✔
1076
      while (globalq_poll(&globalq, &(my_thread->queue)));
54✔
1077
      while (steal_task());
27✔
1078

1079
      while (workq_outstanding(wq) > 0)
13,491,800✔
1080
         progressive_backoff();
13,491,800✔
1081
   }
1082

1083
   wq->state = IDLE;
1,137✔
1084
}
1,137✔
1085

1086
void workq_drain(workq_t *wq)
1,137✔
1087
{
1088
   if (my_thread->kind != MAIN_THREAD)
1,137✔
1089
      fatal_trace("workq_drain can only be called from the main thread");
×
1090

1091
   if (wq->parallel)
1,137✔
1092
      workq_parallel_drain(wq);
1,137✔
1093
   else {
1094
      assert(wq->state == START);
×
1095
      wq->state = IDLE;
×
1096
   }
1097
}
1,137✔
1098

1099
void async_do(task_fn_t fn, void *context, void *arg)
111✔
1100
{
1101
   if (max_workers == 1)
111✔
1102
      (*fn)(context, arg);   // Single CPU
×
1103
   else {
1104
      const int npending = atomic_add(&async_pending, 1);
111✔
1105
      create_workers(npending + 1 /* Do not count main thread */);
111✔
1106

1107
      task_t tasks[1] = {
111✔
1108
         { fn, context, arg, NULL }
1109
      };
1110
      SCOPED_LOCK(globalq.lock);
222✔
1111
      globalq_put(&globalq, tasks, 1);
111✔
1112
   }
1113
}
111✔
1114

1115
void async_barrier(void)
7,133✔
1116
{
1117
   while (atomic_load(&async_pending) > 0) {
14,267✔
1118
      if (!globalq_poll(&globalq, &(my_thread->queue)))
1✔
1119
         progressive_backoff();
×
1120
   }
1121
}
7,133✔
1122

1123
void async_free(void *ptr)
4,975✔
1124
{
1125
   if (relaxed_load(&running_threads) == 1)
4,975✔
1126
      free(ptr);
4,959✔
1127
   else if (ptr != NULL) {
1128
      // TODO: free when all threads in quiescent state
1129
   }
4,975✔
1130
}
4,975✔
1131

1132
#ifdef POSIX_SUSPEND
1133
static void suspend_handler(int sig, siginfo_t *info, void *context)
1,846✔
1134
{
1135
   if (info->si_pid != getpid())
1,846✔
1136
      return;   // Not sent by us, ignore it
923✔
1137
   else if (sig == SIGRESUME)
1,846✔
1138
      return;
1139

1140
   const int olderrno = errno;
923✔
1141

1142
   if (my_thread != NULL) {
923✔
1143
      struct cpu_state cpu;
923✔
1144
      fill_cpu_state(&cpu, (ucontext_t *)context);
923✔
1145

1146
      stop_world_fn_t callback = atomic_load(&stop_callback);
923✔
1147
      void *arg = atomic_load(&stop_arg);
923✔
1148

1149
      (*callback)(my_thread->id, &cpu, arg);
923✔
1150
   }
1151

1152
   sem_post(&stop_sem);
923✔
1153

1154
   sigset_t mask;
923✔
1155
   sigfillset(&mask);
923✔
1156
   unmask_fatal_signals(&mask);
923✔
1157
   sigdelset(&mask, SIGRESUME);
923✔
1158

1159
   sigsuspend(&mask);
923✔
1160

1161
   sem_post(&stop_sem);
923✔
1162

1163
   errno = olderrno;
923✔
1164
}
1165
#endif
1166

1167
void stop_world(stop_world_fn_t callback, void *arg)
945✔
1168
{
1169
   nvc_lock(&stop_lock);
945✔
1170

1171
   atomic_store(&stop_callback, callback);
945✔
1172
   atomic_store(&stop_arg, arg);
945✔
1173

1174
#ifdef __MINGW32__
1175
   const int maxthread = relaxed_load(&max_thread_id);
1176
   for (int i = 0; i <= maxthread; i++) {
1177
      nvc_thread_t *thread = atomic_load(&threads[i]);
1178
      if (thread == NULL || thread == my_thread)
1179
         continue;
1180

1181
      HANDLE h = pthread_gethandle(thread->handle);
1182
      if (SuspendThread(h) != 0)
1183
         fatal_errno("SuspendThread");
1184

1185
      CONTEXT context;
1186
      context.ContextFlags = CONTEXT_INTEGER | CONTEXT_CONTROL;
1187
      if (!GetThreadContext(h, &context))
1188
         fatal_errno("GetThreadContext");
1189

1190
      struct cpu_state cpu;
1191
      fill_cpu_state(&cpu, &context);
1192

1193
      (*callback)(thread->id, &cpu, arg);
1194
   }
1195
#elif defined __APPLE__
1196
   const int maxthread = relaxed_load(&max_thread_id);
1197
   for (int i = 0; i <= maxthread; i++) {
1198
      nvc_thread_t *thread = atomic_load(&threads[i]);
1199
      if (thread == NULL || thread == my_thread)
1200
         continue;
1201

1202
      assert(thread->port != MACH_PORT_NULL);
1203

1204
      kern_return_t kern_result;
1205
      do {
1206
         kern_result = thread_suspend(thread->port);
1207
      } while (kern_result == KERN_ABORTED);
1208

1209
      if (kern_result != KERN_SUCCESS)
1210
         fatal_trace("failed to suspend thread %d (%d)", i, kern_result);
1211

1212
#ifdef __aarch64__
1213
      arm_thread_state64_t state;
1214
      mach_msg_type_number_t count = ARM_THREAD_STATE64_COUNT;
1215
      thread_state_flavor_t flavor = ARM_THREAD_STATE64;
1216
#else
1217
      x86_thread_state64_t state;
1218
      mach_msg_type_number_t count = x86_THREAD_STATE64_COUNT;
1219
      thread_state_flavor_t flavor = x86_THREAD_STATE64;
1220
#endif
1221
      kern_result = thread_get_state(thread->port, flavor,
1222
                                     (natural_t *)&state, &count);
1223
      if (kern_result != KERN_SUCCESS)
1224
         fatal_trace("failed to get thread %d state (%d)", i, kern_result);
1225

1226
      // Fake a ucontext_t that we can pass to fill_cpu_state
1227
      ucontext_t uc;
1228
      typeof(*uc.uc_mcontext) mc;
1229
      uc.uc_mcontext = &mc;
1230
      mc.__ss = state;
1231

1232
      struct cpu_state cpu;
1233
      fill_cpu_state(&cpu, &uc);
1234

1235
      (*callback)(thread->id, &cpu, arg);
1236
   }
1237
#elif defined __SANITIZE_THREAD__
1238
   // https://github.com/google/sanitizers/issues/1179
1239
   fatal_trace("stop_world is not supported with tsan");
1240
#else
1241
   assert(sem_trywait(&stop_sem) == -1 && errno == EAGAIN);
945✔
1242

1243
   int signalled = 0;
945✔
1244
   const int maxthread = relaxed_load(&max_thread_id);
945✔
1245
   for (int i = 0; i <= maxthread; i++) {
3,308✔
1246
      nvc_thread_t *thread = atomic_load(&threads[i]);
2,363✔
1247
      if (thread == NULL || thread == my_thread)
2,363✔
1248
         continue;
1,440✔
1249

1250
      PTHREAD_CHECK(pthread_kill, thread->handle, SIGSUSPEND);
923✔
1251
      signalled++;
923✔
1252
   }
1253

1254
   struct timespec ts;
945✔
1255
   if (clock_gettime(CLOCK_REALTIME, &ts) != 0)
945✔
1256
      fatal_errno("clock_gettime");
×
1257

1258
   ts.tv_sec += SUSPEND_TIMEOUT;
945✔
1259

1260
   for (; signalled > 0; signalled--) {
1,868✔
1261
      if (sem_timedwait(&stop_sem, &ts) != 0)
923✔
1262
         fatal_trace("timeout waiting for %d threads to suspend", signalled);
×
1263
   }
1264

1265
   assert(sem_trywait(&stop_sem) == -1 && errno == EAGAIN);
945✔
1266
#endif
1267

1268
   struct cpu_state cpu;
945✔
1269
   capture_registers(&cpu);
945✔
1270

1271
   (*callback)(my_thread->id, &cpu, arg);
945✔
1272
}
945✔
1273

1274
void start_world(void)
945✔
1275
{
1276
   assert_lock_held(&stop_lock);
945✔
1277

1278
   const int maxthread = relaxed_load(&max_thread_id);
945✔
1279

1280
#ifdef __MINGW32__
1281
   for (int i = 0; i <= maxthread; i++) {
1282
      nvc_thread_t *thread = atomic_load(&threads[i]);
1283
      if (thread == NULL || thread == my_thread)
1284
         continue;
1285

1286
      HANDLE h = pthread_gethandle(thread->handle);
1287
      if (ResumeThread(h) != 1)
1288
         fatal_errno("ResumeThread");
1289
   }
1290
#elif defined __APPLE__
1291
   for (int i = 0; i <= maxthread; i++) {
1292
      nvc_thread_t *thread = atomic_load(&threads[i]);
1293
      if (thread == NULL || thread == my_thread)
1294
         continue;
1295

1296
      kern_return_t kern_result;
1297
      do {
1298
         kern_result = thread_resume(thread->port);
1299
      } while (kern_result == KERN_ABORTED);
1300

1301
      if (kern_result != KERN_SUCCESS)
1302
         fatal_trace("failed to resume thread %d (%d)", i, kern_result);
1303
   }
1304
#else
1305
   assert(sem_trywait(&stop_sem) == -1 && errno == EAGAIN);
945✔
1306

1307
   int signalled = 0;
1308
   for (int i = 0; i <= maxthread; i++) {
3,308✔
1309
      nvc_thread_t *thread = atomic_load(&threads[i]);
2,363✔
1310
      if (thread == NULL || thread == my_thread)
2,363✔
1311
         continue;
1,440✔
1312

1313
      PTHREAD_CHECK(pthread_kill, thread->handle, SIGRESUME);
923✔
1314
      signalled++;
923✔
1315
   }
1316

1317
   struct timespec ts;
945✔
1318
   if (clock_gettime(CLOCK_REALTIME, &ts) != 0)
945✔
1319
      fatal_errno("clock_gettime");
×
1320

1321
   ts.tv_sec += SUSPEND_TIMEOUT;
945✔
1322

1323
   for (; signalled > 0; signalled--) {
1,868✔
1324
      if (sem_timedwait(&stop_sem, &ts) != 0)
923✔
1325
         fatal_trace("timeout waiting for %d threads to resume", signalled);
×
1326
   }
1327

1328
   assert(sem_trywait(&stop_sem) == -1 && errno == EAGAIN);
945✔
1329
#endif
1330

1331
   nvc_unlock(&stop_lock);
945✔
1332
}
945✔
1333

1334
void thread_wx_mode(wx_mode_t mode)
12,195✔
1335
{
1336
#ifdef __APPLE__
1337
   pthread_jit_write_protect_np(mode == WX_EXECUTE);
1338
#else
1339
   // Could use Intel memory protection keys here
1340
#endif
1341
}
12,195✔
1342

1343
barrier_t *barrier_new(int count)
1✔
1344
{
1345
   barrier_t *b = xcalloc(sizeof(barrier_t));
1✔
1346
   b->count = count;
1✔
1347
   return b;
1✔
1348
}
1349

1350
void barrier_free(barrier_t *b)
1✔
1351
{
1352
   free(b);
1✔
1353
}
1✔
1354

1355
void barrier_wait(barrier_t *b)
88✔
1356
{
1357
   const int count = relaxed_load(&b->count);
88✔
1358
   const int passed = relaxed_load(&b->passed);
88✔
1359

1360
   if (atomic_fetch_add(&b->reached, 1) == count - 1) {
88✔
1361
      // Last thread to pass barrier
1362
      relaxed_store(&b->reached, 0);
22✔
1363
      store_release(&b->passed, passed + 1);
22✔
1364
   }
1365
   else {
1366
      while (load_acquire(&b->passed) == passed)
140,331✔
1367
         progressive_backoff();
140,265✔
1368
   }
1369
}
88✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc