• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nickg / nvc / 4773166962

pending completion
4773166962

push

github

Nick Gasson
Missing driver for protected procedure call signal argument. Fixes #675

40776 of 45100 relevant lines covered (90.41%)

960305.04 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.57
/src/thread.c
1
//
2
//  Copyright (C) 2021-2023  Nick Gasson
3
//
4
//  This program is free software: you can redistribute it and/or modify
5
//  it under the terms of the GNU General Public License as published by
6
//  the Free Software Foundation, either version 3 of the License, or
7
//  (at your option) any later version.
8
//
9
//  This program is distributed in the hope that it will be useful,
10
//  but WITHOUT ANY WARRANTY; without even the implied warranty of
11
//  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
//  GNU General Public License for more details.
13
//
14
//  You should have received a copy of the GNU General Public License
15
//  along with this program.  If not, see <http://www.gnu.org/licenses/>.
16
//
17

18
#include "util.h"
19
#include "array.h"
20
#include "cpustate.h"
21
#include "rt/mspace.h"
22
#include "thread.h"
23

24
#include <assert.h>
25
#include <errno.h>
26
#include <signal.h>
27
#include <stdlib.h>
28
#include <string.h>
29
#include <inttypes.h>
30
#include <unistd.h>
31
#include <semaphore.h>
32

33
#ifdef HAVE_PTHREAD
34
#include <pthread.h>
35
#else
36
#error missing pthread support
37
#endif
38

39
#ifdef __SANITIZE_THREAD__
40
#include <sanitizer/tsan_interface.h>
41
#endif
42

43
#if defined __MINGW32__
44
#define WIN32_LEAN_AND_MEAN
45
#include <windows.h>
46
#elif defined __APPLE__
47
#define task_t __task_t
48
#include <mach/thread_act.h>
49
#include <mach/machine.h>
50
#undef task_t
51
#endif
52

53
#define LOCK_SPINS      15
54
#define YIELD_SPINS     32
55
#define MIN_TAKE        8
56
#define PARKING_BAYS    64
57
#define SUSPEND_TIMEOUT 1
58

59
#if !defined __MINGW32__ && !defined __APPLE__
60
#define POSIX_SUSPEND 1
61
#define SIGSUSPEND    SIGRTMIN
62
#define SIGRESUME     (SIGRTMIN + 1)
63
#endif
64

65
typedef struct {
66
   int64_t locks;
67
   int64_t spins;
68
   int64_t contended;
69
   int64_t parks;
70
   int64_t spurious;
71
   int64_t retries;
72
} __attribute__((aligned(64))) lock_stats_t;
73

74
STATIC_ASSERT(sizeof(lock_stats_t) == 64)
75

76
#ifdef DEBUG
77
#define LOCK_EVENT(what, n) do {                  \
78
      if (likely(my_thread != NULL))              \
79
         lock_stats[my_thread->id].what += (n);   \
80
   } while (0)
81

82
#define WORKQ_EVENT(what, n) do {                 \
83
      if (likely(my_thread != NULL))              \
84
         workq_stats[my_thread->id].what += (n);  \
85
   } while (0)
86
#else
87
#define LOCK_EVENT(what, n)
88
#define WORKQ_EVENT(what, n)
89
#endif
90

91
#ifdef __SANITIZE_THREAD__
92
#define TSAN_PRE_LOCK(addr) \
93
   __tsan_mutex_pre_lock(addr, __tsan_mutex_linker_init);
94
#define TSAN_POST_LOCK(addr) \
95
   __tsan_mutex_post_lock(addr, __tsan_mutex_linker_init, 0);
96
#define TSAN_PRE_UNLOCK(addr) \
97
   __tsan_mutex_pre_unlock(addr, __tsan_mutex_linker_init);
98
#define TSAN_POST_UNLOCK(addr) \
99
   __tsan_mutex_post_unlock(addr, __tsan_mutex_linker_init);
100
#else
101
#define TSAN_PRE_LOCK(addr)
102
#define TSAN_POST_LOCK(addr)
103
#define TSAN_PRE_UNLOCK(addr)
104
#define TSAN_POST_UNLOCK(addr)
105
#endif
106

107
#define PTHREAD_CHECK(op, ...) do {             \
108
      if (unlikely(op(__VA_ARGS__) != 0))       \
109
         fatal_errno(#op);                      \
110
   } while (0)
111

112
// Lock implementation based on WTF::Lock in WebKit
113
//   https://webkit.org/blog/6161/locking-in-webkit/
114

115
typedef enum {
116
   IS_LOCKED  = (1 << 0),
117
   HAS_PARKED = (1 << 1)
118
} lock_bits_t;
119

120
typedef struct {
121
   pthread_mutex_t mutex;
122
   pthread_cond_t  cond;
123
   int             parked;
124
} __attribute__((aligned(64))) parking_bay_t;
125

126
typedef bool (*park_fn_t)(parking_bay_t *, void *);
127
typedef void (*unpark_fn_t)(parking_bay_t *, void *);
128

129
typedef struct {
130
   task_fn_t  fn;
131
   void      *context;
132
   void      *arg;
133
   workq_t   *workq;
134
} task_t;
135

136
// Work sealing task queue is based on:
137
//   Arora, N. S., Blumofe, R. D., and Plaxton, C. G.
138
//   Thread scheduling for multiprogrammed multiprocessors.
139
//   Theory of Computing Systems 34, 2 (2001), 115-144.
140

141
typedef uint32_t abp_idx_t;
142
typedef uint32_t abp_tag_t;
143

144
typedef union {
145
   struct {
146
      abp_idx_t top;
147
      abp_tag_t tag;
148
   };
149
   uint64_t bits;
150
} abp_age_t;
151

152
STATIC_ASSERT(sizeof(abp_age_t) <= 8);
153

154
#define THREADQ_SIZE 256
155

156
typedef struct {
157
   task_t    deque[THREADQ_SIZE];
158
   abp_age_t age;
159
   abp_idx_t bot;
160
} __attribute__((aligned(64))) threadq_t;
161

162
typedef enum { IDLE, START } workq_state_t;
163

164
typedef union {
165
   struct {
166
      uint32_t count;
167
      uint32_t epoch;
168
   };
169
   uint64_t bits;
170
} entryq_ptr_t;
171

172
STATIC_ASSERT(sizeof(entryq_ptr_t) <= 8);
173

174
typedef struct {
175
   task_t       *tasks;
176
   unsigned      queuesz;
177
   entryq_ptr_t  wptr;
178
   entryq_ptr_t  comp;
179
} __attribute__((aligned(64))) entryq_t;
180

181
STATIC_ASSERT(sizeof(entryq_t) == 64);
182

183
struct _workq {
184
   void          *context;
185
   workq_state_t  state;
186
   unsigned       epoch;
187
   unsigned       maxthread;
188
   bool           parallel;
189
   entryq_t       entryqs[MAX_THREADS];
190
};
191

192
typedef struct {
193
   int64_t comp;
194
   int64_t steals;
195
   int64_t wakes;
196
} __attribute__((aligned(64))) workq_stats_t;
197

198
STATIC_ASSERT(sizeof(workq_stats_t) == 64);
199

200
typedef enum {
201
   MAIN_THREAD,
202
   USER_THREAD,
203
   WORKER_THREAD,
204
} thread_kind_t;
205

206
struct _nvc_thread {
207
   unsigned        id;
208
   thread_kind_t   kind;
209
   unsigned        spins;
210
   threadq_t       queue;
211
   char           *name;
212
   pthread_t       handle;
213
   thread_fn_t     fn;
214
   void           *arg;
215
   int             victim;
216
   uint32_t        rngstate;
217
#ifdef __APPLE__
218
   thread_port_t   port;
219
#endif
220
};
221

222
typedef struct {
223
   nvc_lock_t   lock;
224
   task_t      *tasks;
225
   unsigned     wptr;
226
   unsigned     rptr;
227
   unsigned     max;
228
} globalq_t;
229

230
static parking_bay_t parking_bays[PARKING_BAYS] = {
231
   [0 ... PARKING_BAYS - 1] = {
232
      PTHREAD_MUTEX_INITIALIZER,
233
      PTHREAD_COND_INITIALIZER
234
   }
235
};
236

237
static nvc_thread_t    *threads[MAX_THREADS];
238
static unsigned         max_workers = 0;
239
static int              running_threads = 0;
240
static unsigned         max_thread_id = 0;
241
static bool             should_stop = false;
242
static globalq_t        globalq __attribute__((aligned(64)));
243
static int              async_pending __attribute__((aligned(64))) = 0;
244
static nvc_lock_t       stop_lock = 0;
245
static stop_world_fn_t  stop_callback = NULL;
246
static void            *stop_arg = NULL;
247
static pthread_cond_t   wake_workers = PTHREAD_COND_INITIALIZER;
248
static pthread_mutex_t  wakelock = PTHREAD_MUTEX_INITIALIZER;
249
#ifdef POSIX_SUSPEND
250
static sem_t            stop_sem;
251
#endif
252

253
#ifdef DEBUG
254
static lock_stats_t  lock_stats[MAX_THREADS];
255
static workq_stats_t workq_stats[MAX_THREADS];
256
#endif
257

258
static __thread nvc_thread_t *my_thread = NULL;
259

260
static parking_bay_t *parking_bay_for(void *cookie);
261

262
#ifdef POSIX_SUSPEND
263
static void suspend_handler(int sig, siginfo_t *info, void *context);
264
#endif
265

266
#ifdef DEBUG
267
static void print_lock_stats(void)
×
268
{
269
   lock_stats_t s = {};
×
270
   workq_stats_t t = {};
×
271
   for (int i = 0; i < MAX_THREADS; i++) {
×
272
      s.locks     += relaxed_load(&(lock_stats[i].locks));
×
273
      s.contended += relaxed_load(&(lock_stats[i].contended));
×
274
      s.parks     += relaxed_load(&(lock_stats[i].parks));
×
275
      s.spins     += relaxed_load(&(lock_stats[i].spins));
×
276
      s.spurious  += relaxed_load(&(lock_stats[i].spurious));
×
277
      s.retries   += relaxed_load(&(lock_stats[i].retries));
×
278

279
      t.comp   += relaxed_load(&(workq_stats[i].comp));
×
280
      t.steals += relaxed_load(&(workq_stats[i].steals));
×
281
      t.wakes  += relaxed_load(&(workq_stats[i].wakes));
×
282
   }
283

284
   printf("\nLock statistics:\n");
×
285
   printf("\tTotal locks      : %"PRIi64"\n", s.locks);
×
286
   printf("\tContended        : %"PRIi64" (%.1f%%)\n",
×
287
          s.contended, 100.0 * ((double)s.contended / (double)s.locks));
×
288
   printf("\tParked           : %"PRIi64" (%.1f%%)\n",
×
289
          s.parks, 100.0 * ((double)s.parks / (double)s.locks));
×
290
   printf("\tAverage spins    : %.1f\n", (double)s.spins / (double)s.retries);
×
291
   printf("\tSpurious wakeups : %"PRIi64"\n", s.spurious);
×
292

293
   printf("\nWork queue statistics:\n");
×
294
   printf("\tCompleted tasks  : %"PRIi64"\n", t.comp);
×
295
   printf("\tSteals           : %"PRIi64"\n", t.steals);
×
296
   printf("\tWakeups          : %"PRIi64"\n", t.wakes);
×
297
}
×
298
#endif
299

300
static void join_worker_threads(void)
3,453✔
301
{
302
   SCOPED_A(nvc_thread_t *) join_list = AINIT;
6,906✔
303

304
   for (int i = 1; i < MAX_THREADS; i++) {
220,992✔
305
      nvc_thread_t *t = atomic_load(&threads[i]);
217,539✔
306
      if (t != NULL)
217,539✔
307
         APUSH(join_list, t);
217,539✔
308
   }
309

310
   // Lock the wake mutex here to avoid races with workers sleeping
311
   PTHREAD_CHECK(pthread_mutex_lock, &wakelock);
3,453✔
312
   {
313
      atomic_store(&should_stop, true);
3,453✔
314
      PTHREAD_CHECK(pthread_cond_broadcast, &wake_workers);
3,453✔
315
   }
316
   PTHREAD_CHECK(pthread_mutex_unlock, &wakelock);
3,453✔
317

318
   for (int i = 0; i < join_list.count; i++) {
3,476✔
319
      nvc_thread_t *t = join_list.items[i];
23✔
320

321
      switch (relaxed_load(&t->kind)) {
23✔
322
      case WORKER_THREAD:
23✔
323
         thread_join(t);
23✔
324
         continue;  // Freed thread struct
23✔
325
      case USER_THREAD:
×
326
      case MAIN_THREAD:
327
         fatal_trace("leaked a user thread: %s", t->name);
×
328
      }
329
   }
330

331
   assert(atomic_load(&running_threads) == 1);
3,453✔
332
}
3,453✔
333

334
static nvc_thread_t *thread_new(thread_fn_t fn, void *arg,
3,486✔
335
                                thread_kind_t kind, char *name)
336
{
337
   nvc_thread_t *thread = xcalloc(sizeof(nvc_thread_t));
3,486✔
338
   thread->name     = name;
3,486✔
339
   thread->fn       = fn;
3,486✔
340
   thread->arg      = arg;
3,486✔
341
   thread->kind     = kind;
3,486✔
342
   thread->rngstate = rand();
3,486✔
343

344
   atomic_store(&thread->queue.age.bits, 0);
3,486✔
345
   atomic_store(&thread->queue.bot, 0);
3,486✔
346

347
   int id = 0;
3,486✔
348
   for (; id < MAX_THREADS; id++) {
3,533✔
349
      if (relaxed_load(&(threads[id])) != NULL)
3,533✔
350
         continue;
47✔
351
      else if (atomic_cas(&(threads[id]), NULL, thread))
3,486✔
352
         break;
353
   }
354

355
   if (id == MAX_THREADS)
3,486✔
356
      fatal_trace("cannot create more than %d threads", MAX_THREADS);
×
357

358
   thread->id = id;
3,486✔
359

360
   unsigned max = relaxed_load(&max_thread_id);
3,486✔
361
   while (max < id) {
3,486✔
362
      if (__atomic_cas(&max_thread_id, &max, id))
31✔
363
         break;
364
   }
365

366
   atomic_add(&running_threads, 1);
3,486✔
367
   return thread;
3,486✔
368
}
369

370
#ifdef POSIX_SUSPEND
371
static void unmask_fatal_signals(sigset_t *mask)
5,298✔
372
{
373
   sigdelset(mask, SIGQUIT);
5,298✔
374
   sigdelset(mask, SIGABRT);
5,298✔
375
   sigdelset(mask, SIGTERM);
5,298✔
376
}
5,298✔
377
#endif
378

379
#ifdef __APPLE__
380
static void reset_mach_ports(void)
381
{
382
   for (int i = 0; i < MAX_THREADS; i++) {
383
      nvc_thread_t *t = atomic_load(&(threads[i]));
384
      if (t == NULL)
385
         continue;
386

387
      // Mach ports are not valid after fork
388
      t->port = pthread_mach_thread_np(t->handle);
389
   }
390
}
391
#endif
392

393
void thread_init(void)
3,453✔
394
{
395
   assert(my_thread == NULL);
3,453✔
396

397
   my_thread = thread_new(NULL, NULL, MAIN_THREAD, xstrdup("main thread"));
3,453✔
398
   my_thread->handle = pthread_self();
3,453✔
399

400
#ifdef __APPLE__
401
   my_thread->port = pthread_mach_thread_np(my_thread->handle);
402
   pthread_atfork(NULL, NULL, reset_mach_ports);
403
#endif
404

405
   assert(my_thread->id == 0);
3,453✔
406

407
   const char *env = getenv("NVC_MAX_THREADS");
3,453✔
408
   if (env != NULL)
3,453✔
409
      max_workers = MAX(1, MIN(atoi(env), MAX_THREADS));
×
410
   else
411
      max_workers = MIN(nvc_nprocs(), DEFAULT_THREADS);
3,453✔
412
   assert(max_workers > 0);
3,453✔
413

414
#ifdef DEBUG
415
   if (getenv("NVC_THREAD_VERBOSE") != NULL)
3,453✔
416
      atexit(print_lock_stats);
×
417
#endif
418

419
   atexit(join_worker_threads);
3,453✔
420

421
#ifdef POSIX_SUSPEND
422
   sem_init(&stop_sem, 0, 0);
3,453✔
423

424
   struct sigaction sa = {
3,453✔
425
      .sa_sigaction = suspend_handler,
426
      .sa_flags = SA_RESTART | SA_SIGINFO
427
   };
428
   sigfillset(&sa.sa_mask);
3,453✔
429
   unmask_fatal_signals(&sa.sa_mask);
3,453✔
430

431
   sigaction(SIGSUSPEND, &sa, NULL);
3,453✔
432
   sigaction(SIGRESUME, &sa, NULL);
3,453✔
433

434
   sigset_t mask;
3,453✔
435
   PTHREAD_CHECK(pthread_sigmask, SIG_SETMASK, NULL, &mask);
3,453✔
436

437
   sigdelset(&mask, SIGSUSPEND);
3,453✔
438
   sigaddset(&mask, SIGRESUME);
3,453✔
439

440
   PTHREAD_CHECK(pthread_sigmask, SIG_SETMASK, &mask, NULL);
3,453✔
441
#endif
442
}
3,453✔
443

444
int thread_id(void)
25,161,700✔
445
{
446
   assert(my_thread != NULL);
25,161,700✔
447
   return my_thread->id;
25,161,700✔
448
}
449

450
bool thread_attached(void)
42✔
451
{
452
   return my_thread != NULL;
42✔
453
}
454

455
void thread_sleep(int usec)
23✔
456
{
457
   usleep(usec);
23✔
458
}
23✔
459

460
static void *thread_wrapper(void *arg)
33✔
461
{
462
   assert(my_thread == NULL);
33✔
463
   my_thread = arg;
33✔
464

465
   void *result = (*my_thread->fn)(my_thread->arg);
33✔
466

467
   // Avoid races with stop_world
468
   SCOPED_LOCK(stop_lock);
33✔
469

470
   assert(threads[my_thread->id] == my_thread);
33✔
471
   atomic_store(&(threads[my_thread->id]),  NULL);
33✔
472

473
   atomic_add(&running_threads, -1);
33✔
474
   return result;
33✔
475
}
476

477
nvc_thread_t *thread_create(thread_fn_t fn, void *arg, const char *fmt, ...)
10✔
478
{
479
   va_list ap;
10✔
480
   va_start(ap, fmt);
10✔
481
   char *name = xvasprintf(fmt, ap);
10✔
482
   va_end(ap);
10✔
483

484
   // Avoid races with stop_world
485
   SCOPED_LOCK(stop_lock);
10✔
486

487
   nvc_thread_t *thread = thread_new(fn, arg, USER_THREAD, name);
10✔
488

489
   PTHREAD_CHECK(pthread_create, &(thread->handle), NULL,
10✔
490
                 thread_wrapper, thread);
491

492
#ifdef __APPLE__
493
   thread->port = pthread_mach_thread_np(thread->handle);
494
#endif
495

496
   return thread;
10✔
497
}
498

499
void *thread_join(nvc_thread_t *thread)
33✔
500
{
501
   if (thread == my_thread || thread->kind == MAIN_THREAD)
33✔
502
      fatal_trace("cannot join self or main thread");
×
503

504
   void *retval = NULL;
33✔
505
   PTHREAD_CHECK(pthread_join, thread->handle, &retval);
33✔
506

507
   async_free(thread->name);
33✔
508
   async_free(thread);
33✔
509

510
   return retval;
33✔
511
}
512

513
nvc_thread_t *get_thread(int id)
22,656✔
514
{
515
   assert(id >= 0 && id < MAX_THREADS);
22,656✔
516
   return atomic_load(&threads[id]);
22,656✔
517
}
518

519
static inline parking_bay_t *parking_bay_for(void *cookie)
1,826✔
520
{
521
   return &(parking_bays[mix_bits_64(cookie) % PARKING_BAYS]);
1,826✔
522
}
523

524
static void thread_park(void *cookie, park_fn_t fn)
1,297✔
525
{
526
   parking_bay_t *bay = parking_bay_for(cookie);
1,297✔
527

528
   PTHREAD_CHECK(pthread_mutex_lock, &(bay->mutex));
1,297✔
529
   {
530
      if ((*fn)(bay, cookie)) {
1,297✔
531
         bay->parked++;
1,296✔
532
         PTHREAD_CHECK(pthread_cond_wait, &(bay->cond), &(bay->mutex));
1,296✔
533
         assert(bay->parked > 0);
1,296✔
534
         bay->parked--;
1,296✔
535
      }
536
   }
537
   PTHREAD_CHECK(pthread_mutex_unlock, &(bay->mutex));
1,297✔
538
}
1,297✔
539

540
static void thread_unpark(void *cookie, unpark_fn_t fn)
529✔
541
{
542
   parking_bay_t *bay = parking_bay_for(cookie);
529✔
543

544
   if (fn != NULL) {
529✔
545
      PTHREAD_CHECK(pthread_mutex_lock, &(bay->mutex));
529✔
546
      {
547
         (*fn)(bay, cookie);
529✔
548
      }
549
      PTHREAD_CHECK(pthread_mutex_unlock, &(bay->mutex));
529✔
550
   }
551

552
   // Do not use pthread_cond_signal here as multiple threads parked in
553
   // this bay may be waiting on different cookies
554
   PTHREAD_CHECK(pthread_cond_broadcast, &(bay->cond));
529✔
555
}
529✔
556

557
void spin_wait(void)
5,103,230✔
558
{
559
#ifdef __x86_64__
560
   __asm__ volatile ("pause");
5,103,230✔
561
#elif defined __aarch64__
562
   // YIELD is a no-op on most AArch64 cores so also do an ISB to stall
563
   // the pipeline for a bit
564
   __asm__ volatile ("yield; isb");
565
#endif
566
}
5,103,230✔
567

568
static bool lock_park_cb(parking_bay_t *bay, void *cookie)
1,297✔
569
{
570
   nvc_lock_t *lock = cookie;
1,297✔
571

572
   // This is called with the park mutex held: check the lock is still
573
   // owned by someone and the park bit is still set
574
   return relaxed_load(lock) == (IS_LOCKED | HAS_PARKED);
1,297✔
575
}
576

577
static void lock_unpark_cb(parking_bay_t *bay, void *cookie)
529✔
578
{
579
   nvc_lock_t *lock = cookie;
529✔
580

581
   assert(relaxed_load(lock) == (IS_LOCKED | HAS_PARKED));
529✔
582

583
   // Unlock must have release semantics
584
   atomic_store(lock, (bay->parked > 0 ? HAS_PARKED : 0));
529✔
585
}
529✔
586

587
void nvc_lock(nvc_lock_t *lock)
24,390,000✔
588
{
589
   LOCK_EVENT(locks, 1);
24,390,000✔
590
   TSAN_PRE_LOCK(lock);
24,390,000✔
591

592
   int8_t state = relaxed_load(lock);
24,390,000✔
593
   if (state & IS_LOCKED)
24,390,000✔
594
      LOCK_EVENT(contended, 1);
131✔
595
   else if (likely(__atomic_cas(lock, &state, state | IS_LOCKED)))
24,389,800✔
596
      goto locked;  // Fast path: acquired the lock without contention
24,389,800✔
597

598
   for (;;) {
1,338✔
599
      LOCK_EVENT(retries, 1);
1,338✔
600

601
      // Spin a few times waiting for the owner to release the lock
602
      // before parking
603
      int spins = 0;
604
      for (; (state & IS_LOCKED) && spins < LOCK_SPINS;
20,942✔
605
           spins++, state = relaxed_load(lock))
19,604✔
606
         spin_wait();
19,604✔
607

608
      if (spins == LOCK_SPINS) {
1,338✔
609
         // Ignore failures here as we will check the lock state again
610
         // in the callback with the park mutex held
611
         atomic_cas(lock, IS_LOCKED, IS_LOCKED | HAS_PARKED);
1,297✔
612

613
         LOCK_EVENT(parks, 1);
1,297✔
614
         thread_park(lock, lock_park_cb);
1,297✔
615

616
         if ((state = relaxed_load(lock)) & IS_LOCKED) {
1,297✔
617
            // Someone else grabbed the lock before our thread was unparked
618
            LOCK_EVENT(spurious, 1);
1,190✔
619
            continue;
1,190✔
620
         }
621
      }
622
      else
623
         LOCK_EVENT(spins, spins);
41✔
624

625
      assert(!(state & IS_LOCKED));
148✔
626

627
      // If we get here then we've seen the lock in an unowned state:
628
      // attempt to grab it with a CAS
629
      if (__atomic_cas(lock, &state, state | IS_LOCKED))
148✔
630
         goto locked;
132✔
631
   }
632

633
 locked:
24,390,000✔
634
   TSAN_POST_LOCK(lock);
24,390,000✔
635
}
24,390,000✔
636

637
void nvc_unlock(nvc_lock_t *lock)
24,390,000✔
638
{
639
   TSAN_PRE_UNLOCK(lock);
24,390,000✔
640

641
   // Fast path: unlock assuming no parked waiters
642
   if (likely(atomic_cas(lock, IS_LOCKED, 0)))
24,390,000✔
643
      goto unlocked;
24,389,400✔
644

645
   // If we get here then we must own the lock with at least one parked
646
   // waiting thread
647
   assert(relaxed_load(lock) == (IS_LOCKED | HAS_PARKED));
529✔
648

649
   // Lock released in callback
650
   thread_unpark(lock, lock_unpark_cb);
529✔
651

652
 unlocked:
24,390,000✔
653
   TSAN_POST_UNLOCK(lock);
24,390,000✔
654
}
24,390,000✔
655

656
#ifdef DEBUG
657
void assert_lock_held(nvc_lock_t *lock)
106,980✔
658
{
659
   int8_t state = relaxed_load(lock);
106,980✔
660
   if (unlikely(!(state & IS_LOCKED)))
106,980✔
661
      fatal_trace("expected lock at %p to be held", lock);
×
662
}
106,980✔
663
#endif
664

665
void __scoped_unlock(nvc_lock_t **plock)
24,339,100✔
666
{
667
   nvc_unlock(*plock);
24,339,100✔
668
}
24,339,100✔
669

670
static void push_bot(threadq_t *tq, const task_t *tasks, size_t count)
37✔
671
{
672
   const abp_idx_t bot = relaxed_load(&tq->bot);
37✔
673
   assert(bot + count <= THREADQ_SIZE);
37✔
674

675
   memcpy(tq->deque + bot, tasks, count * sizeof(task_t));
37✔
676
   store_release(&tq->bot, bot + count);
37✔
677
}
37✔
678

679
static bool pop_bot(threadq_t *tq, task_t *task)
188✔
680
{
681
   const abp_idx_t old_bot = relaxed_load(&tq->bot);
188✔
682
   if (old_bot == 0)
188✔
683
      return false;
684

685
   const abp_idx_t new_bot = old_bot - 1;
171✔
686
   atomic_store(&tq->bot, new_bot);
171✔
687

688
   *task = tq->deque[new_bot];
171✔
689

690
   const abp_age_t old_age = { .bits = atomic_load(&tq->age.bits) };
171✔
691
   if (new_bot > old_age.top)
171✔
692
      return true;
693

694
   atomic_store(&tq->bot, 0);
37✔
695

696
   const abp_age_t new_age = { .top = 0, .tag = old_age.tag + 1 };
37✔
697
   if (new_bot == old_age.top) {
37✔
698
      if (atomic_cas(&tq->age.bits, old_age.bits, new_age.bits))
17✔
699
         return true;
700
   }
701

702
   atomic_store(&tq->age.bits, new_age.bits);
20✔
703
   return false;
20✔
704
}
705

706
__attribute__((no_sanitize("thread")))
707
static bool pop_top(threadq_t *tq, task_t *task)
30✔
708
{
709
   const abp_age_t old_age = { .bits = atomic_load(&tq->age.bits) };
30✔
710
   const abp_idx_t bot = atomic_load(&tq->bot);
30✔
711

712
   if (bot <= old_age.top)
30✔
713
      return false;
714

715
   // This triggers a TSAN false-positive: we will never read from *task
716
   // if the CAS fails below, so it's safe to ignore
717
   *task = tq->deque[old_age.top];
30✔
718

719
   const abp_age_t new_age = {
30✔
720
      .tag = old_age.tag,
721
      .top = old_age.top + 1
30✔
722
   };
723

724
   return atomic_cas(&tq->age.bits, old_age.bits, new_age.bits);
30✔
725
}
726

727
static void globalq_put(globalq_t *gq, const task_t *tasks, size_t count)
122✔
728
{
729
   assert_lock_held(&gq->lock);
122✔
730

731
   if (gq->wptr == gq->rptr)
122✔
732
      gq->wptr = gq->rptr = 0;
31✔
733

734
   if (gq->wptr + count > gq->max) {
122✔
735
      gq->max = next_power_of_2(gq->wptr + count);
29✔
736
      gq->tasks = xrealloc_array(gq->tasks, gq->max, sizeof(task_t));
29✔
737
   }
738

739
   memcpy(gq->tasks + gq->wptr, tasks, count * sizeof(task_t));
122✔
740
   gq->wptr += count;
122✔
741
}
122✔
742

743
__attribute__((no_sanitize("thread")))
744
static bool globalq_unlocked_empty(globalq_t *gq)
162✔
745
{
746
   return relaxed_load(&gq->wptr) == relaxed_load(&gq->rptr);
162✔
747
}
748

749
static size_t globalq_take(globalq_t *gq, threadq_t *tq)
162✔
750
{
751
   if (globalq_unlocked_empty(gq))
162✔
752
      return 0;
753

754
   const int nthreads = relaxed_load(&running_threads);
37✔
755

756
   SCOPED_LOCK(gq->lock);
74✔
757

758
   if (gq->wptr == gq->rptr)
37✔
759
      return 0;
760

761
   const int remain = gq->wptr - gq->rptr;
37✔
762
   const int share = gq->wptr / nthreads;
37✔
763
   const int take = MIN(remain, MAX(MIN_TAKE, MIN(THREADQ_SIZE, share)));
37✔
764
   const int from = gq->rptr;
37✔
765

766
   gq->rptr += take;
37✔
767

768
   push_bot(tq, gq->tasks + from, take);
37✔
769
   return take;
37✔
770
}
771

772
static void execute_task(task_t *task)
818,635✔
773
{
774
   (*task->fn)(task->context, task->arg);
818,635✔
775

776
   if (task->workq != NULL) {
818,635✔
777
      entryq_t *eq = &(task->workq->entryqs[my_thread->id]);
818,535✔
778

779
      const entryq_ptr_t cur = { .bits = relaxed_load(&eq->comp.bits) };
818,535✔
780
      const int epoch = atomic_load(&task->workq->epoch);
818,535✔
781
      const int count = cur.epoch == epoch ? cur.count : 0;
818,535✔
782
      const entryq_ptr_t next = { .count = count + 1, .epoch = epoch };
818,535✔
783
      store_release(&eq->comp.bits, next.bits);
818,535✔
784
   }
785
   else
786
      atomic_add(&async_pending, -1);
100✔
787
}
818,635✔
788

789
static bool globalq_poll(globalq_t *gq, threadq_t *tq)
162✔
790
{
791
   int ntasks;
162✔
792
   if ((ntasks = globalq_take(gq, tq))) {
162✔
793
      task_t task;
37✔
794
      int comp = 0;
37✔
795
      for (; pop_bot(tq, &task); comp++)
225✔
796
         execute_task(&task);
151✔
797

798
      WORKQ_EVENT(comp, comp);
37✔
799
      return true;
37✔
800
   }
801
   else
802
      return false;
803
}
804

805
workq_t *workq_new(void *context)
14,656✔
806
{
807
   if (my_thread->kind != MAIN_THREAD)
14,656✔
808
      fatal_trace("work queues can only be created by the main thread");
×
809

810
   workq_t *wq = xcalloc(sizeof(workq_t));
14,656✔
811
   wq->state    = IDLE;
14,656✔
812
   wq->context  = context;
14,656✔
813
   wq->parallel = max_workers > 1;
14,656✔
814
   wq->epoch    = 1;
14,656✔
815

816
   return wq;
14,656✔
817
}
818

819
void workq_not_thread_safe(workq_t *wq)
13,701✔
820
{
821
   wq->parallel = false;
13,701✔
822
}
13,701✔
823

824
void workq_free(workq_t *wq)
14,638✔
825
{
826
   if (my_thread->kind != MAIN_THREAD)
14,638✔
827
      fatal_trace("work queues can only be freed by the main thread");
×
828

829
   assert(wq->state == IDLE);
14,638✔
830

831
   for (int i = 0; i < MAX_THREADS; i++)
951,470✔
832
      free(wq->entryqs[i].tasks);
936,832✔
833

834
   free(wq);
14,638✔
835
}
14,638✔
836

837
void workq_do(workq_t *wq, task_fn_t fn, void *arg)
818,598✔
838
{
839
   assert(wq->state == IDLE);
818,598✔
840

841
   entryq_t *eq = &(wq->entryqs[my_thread->id]);
818,598✔
842

843
   const entryq_ptr_t cur = { .bits = relaxed_load(&eq->wptr.bits) };
818,598✔
844
   const int epoch = atomic_load(&wq->epoch);
818,598✔
845
   const int wptr = cur.epoch == epoch ? cur.count : 0;
818,598✔
846

847
   if (wptr == eq->queuesz) {
818,598✔
848
      eq->queuesz = MAX(eq->queuesz * 2, 64);
5,903✔
849
      eq->tasks = xrealloc_array(eq->tasks, eq->queuesz, sizeof(task_t));
5,903✔
850
   }
851

852
   eq->tasks[wptr] = (task_t){ fn, wq->context, arg, wq };
818,598✔
853

854
   const entryq_ptr_t next = { .count = wptr + 1, .epoch = epoch };
818,598✔
855
   store_release(&eq->wptr.bits, next.bits);
818,598✔
856

857
   unsigned maxthread = relaxed_load(&wq->maxthread);
818,598✔
858
   while (maxthread < my_thread->id) {
818,598✔
859
      if (__atomic_cas(&wq->maxthread, &maxthread, my_thread->id))
×
860
         break;
861
   }
862
}
818,598✔
863

864
void workq_scan(workq_t *wq, scan_fn_t fn, void *arg)
6✔
865
{
866
   const int maxthread = relaxed_load(&wq->maxthread);
6✔
867
   for (int i = 0; i <= maxthread; i++) {
12✔
868
      entryq_t *eq = &(wq->entryqs[my_thread->id]);
6✔
869
      const entryq_ptr_t wptr = { .bits = load_acquire(&eq->wptr.bits) };
6✔
870
      for (int j = 0; j < wptr.count; j++)
15✔
871
         (*fn)(wq->context, eq->tasks[j].arg, arg);
9✔
872
   }
873
}
6✔
874

875
static int estimate_depth(threadq_t *tq)
220✔
876
{
877
   const abp_age_t age = { .bits = relaxed_load(&tq->age.bits) };
220✔
878
   const abp_idx_t bot = relaxed_load(&tq->bot);
220✔
879

880
   return bot <= age.top ? 0 : bot - age.top;
220✔
881
}
882

883
static threadq_t *get_thread_queue(int id)
220✔
884
{
885
   assert(id < MAX_THREADS);
220✔
886

887
   nvc_thread_t *t = atomic_load(&(threads[id]));
220✔
888
   if (t == NULL)
220✔
889
      return NULL;
890

891
   return &(t->queue);
220✔
892
}
893

894
static uint32_t fast_rand(void)
95✔
895
{
896
   uint32_t state = my_thread->rngstate;
95✔
897
   state ^= (state << 13);
95✔
898
   state ^= (state >> 17);
95✔
899
   state ^= (state << 5);
95✔
900
   return (my_thread->rngstate = state);
95✔
901
}
902

903
static threadq_t *find_victim(void)
125✔
904
{
905
   threadq_t *last = get_thread_queue(my_thread->victim);
125✔
906
   if (last != NULL && estimate_depth(last) > 0)
125✔
907
      return last;
908

909
   const int maxthread = relaxed_load(&max_thread_id);
95✔
910
   const int start = fast_rand() % (maxthread + 1);
95✔
911
   int idx = start;
95✔
912
   do {
190✔
913
      if (idx != my_thread->id) {
190✔
914
         threadq_t *q = get_thread_queue(idx);
95✔
915
         if (q != NULL && estimate_depth(q) > 0) {
95✔
916
            my_thread->victim = idx;
×
917
            return q;
×
918
         }
919
      }
920
   } while ((idx = (idx + 1) % (maxthread + 1)) != start);
190✔
921

922
   return NULL;
923
}
924

925
static bool steal_task(void)
125✔
926
{
927
   threadq_t *tq = find_victim();
125✔
928
   if (tq == NULL)
125✔
929
      return false;
930

931
   task_t task;
30✔
932
   if (pop_top(tq, &task)) {
30✔
933
      WORKQ_EVENT(steals, 1);
30✔
934
      execute_task(&task);
30✔
935
      WORKQ_EVENT(comp, 1);
30✔
936
      return true;
30✔
937
   }
938

939
   return false;
940
}
941

942
static void progressive_backoff(void)
5,238,870✔
943
{
944
   if (my_thread->spins++ < YIELD_SPINS)
5,238,870✔
945
      spin_wait();
5,080,120✔
946
   else {
947
      sched_yield();
158,748✔
948
      my_thread->spins = 0;
158,748✔
949
   }
950
}
5,238,870✔
951

952
static void *worker_thread(void *arg)
23✔
953
{
954
   mspace_stack_limit(MSPACE_CURRENT_FRAME);
23✔
955

956
   do {
117✔
957
      if (globalq_poll(&globalq, &(my_thread->queue)) || steal_task())
117✔
958
         my_thread->spins = 0;  // Did work
44✔
959
      else if (my_thread->spins++ < 2)
73✔
960
         spin_wait();
49✔
961
      else {
962
         PTHREAD_CHECK(pthread_mutex_lock, &wakelock);
24✔
963
         {
964
            if (!relaxed_load(&should_stop))
24✔
965
               PTHREAD_CHECK(pthread_cond_wait, &wake_workers, &wakelock);
24✔
966
         }
967
         PTHREAD_CHECK(pthread_mutex_unlock, &wakelock);
24✔
968
      }
969
   } while (likely(!relaxed_load(&should_stop)));
117✔
970

971
   return NULL;
23✔
972
}
973

974
static void create_workers(int needed)
122✔
975
{
976
   assert(my_thread->kind == MAIN_THREAD);
122✔
977

978
   if (relaxed_load(&should_stop))
122✔
979
      return;
980

981
   while (relaxed_load(&running_threads) < MIN(max_workers, needed)) {
145✔
982
      static int counter = 0;
23✔
983
      char *name = xasprintf("worker thread %d", atomic_add(&counter, 1));
23✔
984
      nvc_thread_t *thread =
23✔
985
         thread_new(worker_thread, NULL, WORKER_THREAD, name);
23✔
986

987
      PTHREAD_CHECK(pthread_create, &(thread->handle), NULL,
145✔
988
                    thread_wrapper, thread);
989

990
#ifdef __APPLE__
991
      thread->port = pthread_mach_thread_np(thread->handle);
992
#endif
993
   }
994

995
   PTHREAD_CHECK(pthread_cond_broadcast, &wake_workers);
122✔
996
}
997

998
void workq_start(workq_t *wq)
2,227,060✔
999
{
1000
   assert(my_thread->kind == MAIN_THREAD);
2,227,060✔
1001

1002
   const int epoch = relaxed_load(&wq->epoch);
2,227,060✔
1003
   const int maxthread = relaxed_load(&wq->maxthread);
2,227,060✔
1004

1005
   assert(wq->state == IDLE);
2,227,060✔
1006
   wq->state = START;
2,227,060✔
1007

1008
   int nserial = 0, nparallel = 0;
2,227,060✔
1009
   for (int i = 0; i <= maxthread; i++) {
4,454,130✔
1010
      entryq_t *eq = &wq->entryqs[i];
2,227,060✔
1011
      const entryq_ptr_t wptr = { .bits = load_acquire(&eq->wptr.bits) };
2,227,060✔
1012
      if (wptr.epoch == epoch) {
2,227,060✔
1013
         // Only bump epoch if there are tasks to run
1014
         if (nserial + nparallel == 0)
704,469✔
1015
            atomic_add(&wq->epoch, 1);
704,469✔
1016

1017
         assert(wptr.count > 0);
704,469✔
1018

1019
         if (i == maxthread && nserial + nparallel == 0 && wptr.count == 1) {
704,469✔
1020
            execute_task(&(eq->tasks[0]));   // Only one task in total
608,440✔
1021
            nserial++;
608,440✔
1022
         }
1023
         else if (wq->parallel) {
96,029✔
1024
            if (nparallel == 0)
22✔
1025
               nvc_lock(&globalq.lock);   // Lazily acquire lock
22✔
1026
            globalq_put(&globalq, eq->tasks, wptr.count);
22✔
1027
            nparallel += wptr.count;
22✔
1028
         }
1029
         else {
1030
            for (int j = 0; j < wptr.count; j++)
306,021✔
1031
               execute_task(&(eq->tasks[j]));
210,014✔
1032
            nserial += wptr.count;
96,007✔
1033
         }
1034
      }
1035
   }
1036

1037
   if (wq->parallel && nparallel > 0) {
2,227,060✔
1038
      nvc_unlock(&globalq.lock);
22✔
1039
      create_workers(nparallel);
22✔
1040
   }
1041
}
2,227,060✔
1042

1043
static int workq_outstanding(workq_t *wq)
5,239,850✔
1044
{
1045
   assert(wq->state == START);
5,239,850✔
1046

1047
   const int epoch = atomic_load(&wq->epoch);
5,239,850✔
1048
   const int maxthread = relaxed_load(&max_thread_id);
5,239,850✔
1049

1050
   int pending = 0;
5,239,850✔
1051
   for (int i = 0; i <= maxthread; i++) {
15,718,600✔
1052
      entryq_t *eq = &(wq->entryqs[i]);
10,478,800✔
1053

1054
      const entryq_ptr_t wptr = { .bits = load_acquire(&eq->wptr.bits) };
10,478,800✔
1055
      if (wptr.epoch == epoch - 1)
10,478,800✔
1056
         pending += wptr.count;
5,239,850✔
1057

1058
      const entryq_ptr_t comp = { .bits = load_acquire(&eq->comp.bits) };
10,478,800✔
1059
      if (comp.epoch == epoch)
10,478,800✔
1060
         pending -= comp.count;
10,164,000✔
1061
   }
1062

1063
   assert(pending >= 0);
5,239,850✔
1064
   return pending;
5,239,850✔
1065
}
1066

1067
static void workq_parallel_drain(workq_t *wq)
955✔
1068
{
1069
   if (workq_outstanding(wq) > 0) {
955✔
1070
      while (globalq_poll(&globalq, &(my_thread->queue)));
44✔
1071
      while (steal_task());
22✔
1072

1073
      while (workq_outstanding(wq) > 0)
5,238,900✔
1074
         progressive_backoff();
5,238,870✔
1075
   }
1076

1077
   wq->state = IDLE;
955✔
1078
}
955✔
1079

1080
void workq_drain(workq_t *wq)
2,227,060✔
1081
{
1082
   if (my_thread->kind != MAIN_THREAD)
2,227,060✔
1083
      fatal_trace("workq_drain can only be called from the main thread");
×
1084

1085
   if (wq->parallel)
2,227,060✔
1086
      workq_parallel_drain(wq);
955✔
1087
   else {
1088
      assert(wq->state == START);
2,226,110✔
1089
      wq->state = IDLE;
2,226,110✔
1090
   }
1091
}
2,227,060✔
1092

1093
void async_do(task_fn_t fn, void *context, void *arg)
100✔
1094
{
1095
   if (max_workers == 1)
100✔
1096
      (*fn)(context, arg);   // Single CPU
×
1097
   else {
1098
      const int npending = atomic_add(&async_pending, 1);
100✔
1099
      create_workers(npending + 1 /* Do not count main thread */);
100✔
1100

1101
      task_t tasks[1] = {
100✔
1102
         { fn, context, arg, NULL }
1103
      };
1104
      SCOPED_LOCK(globalq.lock);
200✔
1105
      globalq_put(&globalq, tasks, 1);
100✔
1106
   }
1107
}
100✔
1108

1109
void async_barrier(void)
10,968✔
1110
{
1111
   while (atomic_load(&async_pending) > 0) {
21,937✔
1112
      if (!globalq_poll(&globalq, &(my_thread->queue)))
1✔
1113
         progressive_backoff();
×
1114
   }
1115
}
10,968✔
1116

1117
void async_free(void *ptr)
7✔
1118
{
1119
   // TODO: free when all threads in quiescent state
1120
}
7✔
1121

1122
#ifdef POSIX_SUSPEND
1123
static void suspend_handler(int sig, siginfo_t *info, void *context)
3,690✔
1124
{
1125
   if (info->si_pid != getpid())
3,690✔
1126
      return;   // Not sent by us, ignore it
1,845✔
1127
   else if (sig == SIGRESUME)
3,690✔
1128
      return;
1129

1130
   const int olderrno = errno;
1,845✔
1131

1132
   if (my_thread != NULL) {
1,845✔
1133
      struct cpu_state cpu;
1,845✔
1134
      fill_cpu_state(&cpu, (ucontext_t *)context);
1,845✔
1135

1136
      stop_world_fn_t callback = atomic_load(&stop_callback);
1,845✔
1137
      void *arg = atomic_load(&stop_arg);
1,845✔
1138

1139
      (*callback)(my_thread->id, &cpu, arg);
1,845✔
1140
   }
1141

1142
   sem_post(&stop_sem);
1,845✔
1143

1144
   sigset_t mask;
1,845✔
1145
   sigfillset(&mask);
1,845✔
1146
   unmask_fatal_signals(&mask);
1,845✔
1147
   sigdelset(&mask, SIGRESUME);
1,845✔
1148

1149
   sigsuspend(&mask);
1,845✔
1150

1151
   sem_post(&stop_sem);
1,845✔
1152

1153
   errno = olderrno;
1,845✔
1154
}
1155
#endif
1156

1157
void stop_world(stop_world_fn_t callback, void *arg)
854✔
1158
{
1159
   nvc_lock(&stop_lock);
854✔
1160

1161
   atomic_store(&stop_callback, callback);
854✔
1162
   atomic_store(&stop_arg, arg);
854✔
1163

1164
#ifdef __MINGW32__
1165
   const int maxthread = relaxed_load(&max_thread_id);
1166
   for (int i = 0; i <= maxthread; i++) {
1167
      nvc_thread_t *thread = atomic_load(&threads[i]);
1168
      if (thread == NULL || thread == my_thread)
1169
         continue;
1170

1171
      HANDLE h = pthread_gethandle(thread->handle);
1172
      if (SuspendThread(h) != 0)
1173
         fatal_errno("SuspendThread");
1174

1175
      CONTEXT context;
1176
      context.ContextFlags = CONTEXT_INTEGER | CONTEXT_CONTROL;
1177
      if (!GetThreadContext(h, &context))
1178
         fatal_errno("GetThreadContext");
1179

1180
      struct cpu_state cpu;
1181
      fill_cpu_state(&cpu, &context);
1182

1183
      (*callback)(thread->id, &cpu, arg);
1184
   }
1185
#elif defined __APPLE__
1186
   const int maxthread = relaxed_load(&max_thread_id);
1187
   for (int i = 0; i <= maxthread; i++) {
1188
      nvc_thread_t *thread = atomic_load(&threads[i]);
1189
      if (thread == NULL || thread == my_thread)
1190
         continue;
1191

1192
      assert(thread->port != MACH_PORT_NULL);
1193

1194
      kern_return_t kern_result;
1195
      do {
1196
         kern_result = thread_suspend(thread->port);
1197
      } while (kern_result == KERN_ABORTED);
1198

1199
      if (kern_result != KERN_SUCCESS)
1200
         fatal_trace("failed to suspend thread %d (%d)", i, kern_result);
1201

1202
#ifdef __aarch64__
1203
      arm_thread_state64_t state;
1204
      mach_msg_type_number_t count = ARM_THREAD_STATE64_COUNT;
1205
      thread_state_flavor_t flavor = ARM_THREAD_STATE64;
1206
#else
1207
      x86_thread_state64_t state;
1208
      mach_msg_type_number_t count = x86_THREAD_STATE64_COUNT;
1209
      thread_state_flavor_t flavor = x86_THREAD_STATE64;
1210
#endif
1211
      kern_result = thread_get_state(thread->port, flavor,
1212
                                     (natural_t *)&state, &count);
1213
      if (kern_result != KERN_SUCCESS)
1214
         fatal_trace("failed to get thread %d state (%d)", i, kern_result);
1215

1216
      // Fake a ucontext_t that we can pass to fill_cpu_state
1217
      ucontext_t uc;
1218
      typeof(*uc.uc_mcontext) mc;
1219
      uc.uc_mcontext = &mc;
1220
      mc.__ss = state;
1221

1222
      struct cpu_state cpu;
1223
      fill_cpu_state(&cpu, &uc);
1224

1225
      (*callback)(thread->id, &cpu, arg);
1226
   }
1227
#elif defined __SANITIZE_THREAD__
1228
   // https://github.com/google/sanitizers/issues/1179
1229
   fatal_trace("stop_world is not supported with tsan");
1230
#else
1231
   assert(sem_trywait(&stop_sem) == -1 && errno == EAGAIN);
854✔
1232

1233
   int signalled = 0;
854✔
1234
   const int maxthread = relaxed_load(&max_thread_id);
854✔
1235
   for (int i = 0; i <= maxthread; i++) {
4,174✔
1236
      nvc_thread_t *thread = atomic_load(&threads[i]);
3,320✔
1237
      if (thread == NULL || thread == my_thread)
3,320✔
1238
         continue;
1,475✔
1239

1240
      PTHREAD_CHECK(pthread_kill, thread->handle, SIGSUSPEND);
1,845✔
1241
      signalled++;
1,845✔
1242
   }
1243

1244
   struct timespec ts;
854✔
1245
   if (clock_gettime(CLOCK_REALTIME, &ts) != 0)
854✔
1246
      fatal_errno("clock_gettime");
×
1247

1248
   ts.tv_sec += SUSPEND_TIMEOUT;
854✔
1249

1250
   for (; signalled > 0; signalled--) {
2,699✔
1251
      if (sem_timedwait(&stop_sem, &ts) != 0)
1,845✔
1252
         fatal_trace("timeout waiting for %d threads to suspend", signalled);
×
1253
   }
1254

1255
   assert(sem_trywait(&stop_sem) == -1 && errno == EAGAIN);
854✔
1256
#endif
1257

1258
   struct cpu_state cpu;
854✔
1259
   capture_registers(&cpu);
854✔
1260

1261
   (*callback)(my_thread->id, &cpu, arg);
854✔
1262
}
854✔
1263

1264
void start_world(void)
854✔
1265
{
1266
   assert_lock_held(&stop_lock);
854✔
1267

1268
   const int maxthread = relaxed_load(&max_thread_id);
854✔
1269

1270
#ifdef __MINGW32__
1271
   for (int i = 0; i <= maxthread; i++) {
1272
      nvc_thread_t *thread = atomic_load(&threads[i]);
1273
      if (thread == NULL || thread == my_thread)
1274
         continue;
1275

1276
      HANDLE h = pthread_gethandle(thread->handle);
1277
      if (ResumeThread(h) != 1)
1278
         fatal_errno("ResumeThread");
1279
   }
1280
#elif defined __APPLE__
1281
   for (int i = 0; i <= maxthread; i++) {
1282
      nvc_thread_t *thread = atomic_load(&threads[i]);
1283
      if (thread == NULL || thread == my_thread)
1284
         continue;
1285

1286
      kern_return_t kern_result;
1287
      do {
1288
         kern_result = thread_resume(thread->port);
1289
      } while (kern_result == KERN_ABORTED);
1290

1291
      if (kern_result != KERN_SUCCESS)
1292
         fatal_trace("failed to resume thread %d (%d)", i, kern_result);
1293
   }
1294
#else
1295
   assert(sem_trywait(&stop_sem) == -1 && errno == EAGAIN);
854✔
1296

1297
   int signalled = 0;
1298
   for (int i = 0; i <= maxthread; i++) {
4,174✔
1299
      nvc_thread_t *thread = atomic_load(&threads[i]);
3,320✔
1300
      if (thread == NULL || thread == my_thread)
3,320✔
1301
         continue;
1,475✔
1302

1303
      PTHREAD_CHECK(pthread_kill, thread->handle, SIGRESUME);
1,845✔
1304
      signalled++;
1,845✔
1305
   }
1306

1307
   struct timespec ts;
854✔
1308
   if (clock_gettime(CLOCK_REALTIME, &ts) != 0)
854✔
1309
      fatal_errno("clock_gettime");
×
1310

1311
   ts.tv_sec += SUSPEND_TIMEOUT;
854✔
1312

1313
   for (; signalled > 0; signalled--) {
2,699✔
1314
      if (sem_timedwait(&stop_sem, &ts) != 0)
1,845✔
1315
         fatal_trace("timeout waiting for %d threads to resume", signalled);
×
1316
   }
1317

1318
   assert(sem_trywait(&stop_sem) == -1 && errno == EAGAIN);
854✔
1319
#endif
1320

1321
   nvc_unlock(&stop_lock);
854✔
1322
}
854✔
1323

1324
void thread_wx_mode(wx_mode_t mode)
364✔
1325
{
1326
#ifdef __APPLE__
1327
   pthread_jit_write_protect_np(mode == WX_EXECUTE);
1328
#else
1329
   // Could use Intel memory protection keys here
1330
#endif
1331
}
364✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc