• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nickg / nvc / 9571824330

18 Jun 2024 06:53PM UTC coverage: 91.563% (+0.01%) from 91.553%
9571824330

push

github

nickg
Fix parsing of non-ANSI Verilog port declarations. Issue 811

56788 of 62021 relevant lines covered (91.56%)

663660.82 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.05
/src/thread.c
1
//
2
//  Copyright (C) 2021-2023  Nick Gasson
3
//
4
//  This program is free software: you can redistribute it and/or modify
5
//  it under the terms of the GNU General Public License as published by
6
//  the Free Software Foundation, either version 3 of the License, or
7
//  (at your option) any later version.
8
//
9
//  This program is distributed in the hope that it will be useful,
10
//  but WITHOUT ANY WARRANTY; without even the implied warranty of
11
//  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
//  GNU General Public License for more details.
13
//
14
//  You should have received a copy of the GNU General Public License
15
//  along with this program.  If not, see <http://www.gnu.org/licenses/>.
16
//
17

18
#include "util.h"
19
#include "array.h"
20
#include "cpustate.h"
21
#include "rt/mspace.h"
22
#include "thread.h"
23

24
#include <assert.h>
25
#include <errno.h>
26
#include <signal.h>
27
#include <stdlib.h>
28
#include <string.h>
29
#include <inttypes.h>
30
#include <unistd.h>
31
#include <semaphore.h>
32

33
#ifdef HAVE_PTHREAD
34
#include <pthread.h>
35
#elif !defined __MINGW32__
36
#error missing pthread support
37
#endif
38

39
#ifdef __SANITIZE_THREAD__
40
#include <sanitizer/tsan_interface.h>
41
#endif
42

43
#if defined __MINGW32__
44
#define WIN32_LEAN_AND_MEAN
45
#include <windows.h>
46
#elif defined __APPLE__
47
#define task_t __task_t
48
#include <mach/thread_act.h>
49
#include <mach/machine.h>
50
#undef task_t
51
#endif
52

53
#define LOCK_SPINS      64
54
#define YIELD_SPINS     32
55
#define MIN_TAKE        8
56
#define PARKING_BAYS    64
57
#define SUSPEND_TIMEOUT 1
58

59
#if !defined __MINGW32__ && !defined __APPLE__
60
#define POSIX_SUSPEND 1
61
#define SIGSUSPEND    SIGRTMIN
62
#define SIGRESUME     (SIGRTMIN + 1)
63
#endif
64

65
typedef struct {
66
   int64_t locks;
67
   int64_t spins;
68
   int64_t contended;
69
   int64_t parks;
70
   int64_t spurious;
71
   int64_t retries;
72
} __attribute__((aligned(64))) lock_stats_t;
73

74
STATIC_ASSERT(sizeof(lock_stats_t) == 64)
75

76
#ifdef DEBUG
77
#define LOCK_EVENT(what, n) do {                  \
78
      if (likely(my_thread != NULL))              \
79
         lock_stats[my_thread->id].what += (n);   \
80
   } while (0)
81

82
#define WORKQ_EVENT(what, n) do {                 \
83
      if (likely(my_thread != NULL))              \
84
         workq_stats[my_thread->id].what += (n);  \
85
   } while (0)
86
#else
87
#define LOCK_EVENT(what, n) (void)(n)
88
#define WORKQ_EVENT(what, n) (void)(n)
89
#endif
90

91
#ifdef __SANITIZE_THREAD__
92
#define TSAN_PRE_LOCK(addr) \
93
   __tsan_mutex_pre_lock(addr, __tsan_mutex_linker_init);
94
#define TSAN_POST_LOCK(addr) \
95
   __tsan_mutex_post_lock(addr, __tsan_mutex_linker_init, 0);
96
#define TSAN_PRE_UNLOCK(addr) \
97
   __tsan_mutex_pre_unlock(addr, __tsan_mutex_linker_init);
98
#define TSAN_POST_UNLOCK(addr) \
99
   __tsan_mutex_post_unlock(addr, __tsan_mutex_linker_init);
100
#else
101
#define TSAN_PRE_LOCK(addr)
102
#define TSAN_POST_LOCK(addr)
103
#define TSAN_PRE_UNLOCK(addr)
104
#define TSAN_POST_UNLOCK(addr)
105
#endif
106

107
#ifndef PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP
108
#define PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP PTHREAD_MUTEX_INITIALIZER
109
#endif
110

111
#define PTHREAD_CHECK(op, ...) do {             \
112
      if (unlikely(op(__VA_ARGS__) != 0))       \
113
         fatal_errno(#op);                      \
114
   } while (0)
115

116
// Lock implementation based on WTF::Lock in WebKit
117
//   https://webkit.org/blog/6161/locking-in-webkit/
118

119
typedef enum {
120
   IS_LOCKED  = (1 << 0),
121
   HAS_PARKED = (1 << 1)
122
} lock_bits_t;
123

124
typedef struct {
125
#ifdef __MINGW32__
126
   CRITICAL_SECTION   mutex;
127
   CONDITION_VARIABLE cond;
128
#else
129
   pthread_mutex_t    mutex;
130
   pthread_cond_t     cond;
131
#endif
132
   int                parked;
133
} __attribute__((aligned(64))) parking_bay_t;
134

135
typedef bool (*park_fn_t)(parking_bay_t *, void *);
136
typedef void (*unpark_fn_t)(parking_bay_t *, void *);
137

138
typedef struct {
139
   task_fn_t  fn;
140
   void      *context;
141
   void      *arg;
142
   workq_t   *workq;
143
} task_t;
144

145
// Work sealing task queue is based on:
146
//   Arora, N. S., Blumofe, R. D., and Plaxton, C. G.
147
//   Thread scheduling for multiprogrammed multiprocessors.
148
//   Theory of Computing Systems 34, 2 (2001), 115-144.
149

150
typedef uint32_t abp_idx_t;
151
typedef uint32_t abp_tag_t;
152

153
typedef union {
154
   struct {
155
      abp_idx_t top;
156
      abp_tag_t tag;
157
   };
158
   uint64_t bits;
159
} abp_age_t;
160

161
STATIC_ASSERT(sizeof(abp_age_t) <= 8);
162

163
#define THREADQ_SIZE 256
164

165
typedef struct {
166
   task_t    deque[THREADQ_SIZE];
167
   abp_age_t age;
168
   abp_idx_t bot;
169
} __attribute__((aligned(64))) threadq_t;
170

171
typedef enum { IDLE, START } workq_state_t;
172

173
typedef union {
174
   struct {
175
      uint32_t count;
176
      uint32_t epoch;
177
   };
178
   uint64_t bits;
179
} entryq_ptr_t;
180

181
STATIC_ASSERT(sizeof(entryq_ptr_t) <= 8);
182

183
typedef struct {
184
   task_t       *tasks;
185
   unsigned      queuesz;
186
   entryq_ptr_t  wptr;
187
   entryq_ptr_t  comp;
188
} __attribute__((aligned(64))) entryq_t;
189

190
STATIC_ASSERT(sizeof(entryq_t) == 64);
191

192
struct _workq {
193
   void          *context;
194
   workq_state_t  state;
195
   unsigned       epoch;
196
   unsigned       maxthread;
197
   bool           parallel;
198
   entryq_t       entryqs[MAX_THREADS];
199
};
200

201
typedef struct {
202
   int64_t comp;
203
   int64_t steals;
204
   int64_t wakes;
205
} __attribute__((aligned(64))) workq_stats_t;
206

207
STATIC_ASSERT(sizeof(workq_stats_t) == 64);
208

209
typedef enum {
210
   MAIN_THREAD,
211
   USER_THREAD,
212
   WORKER_THREAD,
213
} thread_kind_t;
214

215
struct _nvc_thread {
216
   unsigned        id;
217
   thread_kind_t   kind;
218
   unsigned        spins;
219
   threadq_t       queue;
220
   char           *name;
221
   thread_fn_t     fn;
222
   void           *arg;
223
   int             victim;
224
   uint32_t        rngstate;
225
#ifdef __MINGW32__
226
   HANDLE          handle;
227
   void           *retval;
228
#else
229
   pthread_t       handle;
230
#endif
231
#ifdef __APPLE__
232
   thread_port_t   port;
233
#endif
234
};
235

236
typedef struct {
237
   nvc_lock_t   lock;
238
   task_t      *tasks;
239
   unsigned     wptr;
240
   unsigned     rptr;
241
   unsigned     max;
242
} globalq_t;
243

244
typedef struct _barrier {
245
   unsigned count;
246
   unsigned reached;
247
   unsigned passed;
248
} __attribute__((aligned(64))) barrier_t;
249

250
static parking_bay_t parking_bays[PARKING_BAYS] = {
251
#ifndef __MINGW32__
252
   [0 ... PARKING_BAYS - 1] = {
253
      PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP,
254
      PTHREAD_COND_INITIALIZER
255
   }
256
#endif
257
};
258

259
static nvc_thread_t    *threads[MAX_THREADS];
260
static unsigned         max_workers = 0;
261
static int              running_threads = 0;
262
static unsigned         max_thread_id = 0;
263
static bool             should_stop = false;
264
static globalq_t        globalq __attribute__((aligned(64)));
265
static int              async_pending __attribute__((aligned(64))) = 0;
266
static nvc_lock_t       stop_lock = 0;
267
static stop_world_fn_t  stop_callback = NULL;
268
static void            *stop_arg = NULL;
269

270
#ifdef __MINGW32__
271
static CONDITION_VARIABLE wake_workers = CONDITION_VARIABLE_INIT;
272
static CRITICAL_SECTION   wakelock;
273
#else
274
static pthread_cond_t     wake_workers = PTHREAD_COND_INITIALIZER;
275
static pthread_mutex_t    wakelock = PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP;
276
#endif
277

278
#ifdef POSIX_SUSPEND
279
static sem_t stop_sem;
280
#endif
281

282
#ifdef DEBUG
283
static lock_stats_t  lock_stats[MAX_THREADS];
284
static workq_stats_t workq_stats[MAX_THREADS];
285
#endif
286

287
static __thread nvc_thread_t *my_thread = NULL;
288

289
static parking_bay_t *parking_bay_for(void *cookie);
290

291
#ifdef POSIX_SUSPEND
292
static void suspend_handler(int sig, siginfo_t *info, void *context);
293
#endif
294

295
#ifdef DEBUG
296
static void print_lock_stats(void)
×
297
{
298
   lock_stats_t s = {};
×
299
   workq_stats_t t = {};
×
300
   for (int i = 0; i < MAX_THREADS; i++) {
×
301
      s.locks     += relaxed_load(&(lock_stats[i].locks));
×
302
      s.contended += relaxed_load(&(lock_stats[i].contended));
×
303
      s.parks     += relaxed_load(&(lock_stats[i].parks));
×
304
      s.spins     += relaxed_load(&(lock_stats[i].spins));
×
305
      s.spurious  += relaxed_load(&(lock_stats[i].spurious));
×
306
      s.retries   += relaxed_load(&(lock_stats[i].retries));
×
307

308
      t.comp   += relaxed_load(&(workq_stats[i].comp));
×
309
      t.steals += relaxed_load(&(workq_stats[i].steals));
×
310
      t.wakes  += relaxed_load(&(workq_stats[i].wakes));
×
311
   }
312

313
   printf("\nLock statistics:\n");
×
314
   printf("\tTotal locks      : %"PRIi64"\n", s.locks);
×
315
   printf("\tContended        : %"PRIi64" (%.1f%%)\n",
×
316
          s.contended, 100.0 * ((double)s.contended / (double)s.locks));
×
317
   printf("\tParked           : %"PRIi64" (%.1f%%)\n",
×
318
          s.parks, 100.0 * ((double)s.parks / (double)s.locks));
×
319
   printf("\tAverage spins    : %.1f\n", (double)s.spins / (double)s.retries);
×
320
   printf("\tSpurious wakeups : %"PRIi64"\n", s.spurious);
×
321

322
   printf("\nWork queue statistics:\n");
×
323
   printf("\tCompleted tasks  : %"PRIi64"\n", t.comp);
×
324
   printf("\tSteals           : %"PRIi64"\n", t.steals);
×
325
   printf("\tWakeups          : %"PRIi64"\n", t.wakes);
×
326
}
×
327
#endif
328

329
#ifdef __MINGW32__
330
static inline void platform_mutex_lock(LPCRITICAL_SECTION lpcs)
331
{
332
   EnterCriticalSection(lpcs);
333
}
334

335
static inline void platform_mutex_unlock(LPCRITICAL_SECTION lpcs)
336
{
337
   LeaveCriticalSection(lpcs);
338
}
339

340
static inline void platform_cond_broadcast(PCONDITION_VARIABLE pcv)
341
{
342
   WakeAllConditionVariable(pcv);
343
}
344

345
static inline void platform_cond_wait(PCONDITION_VARIABLE pcv,
346
                                      LPCRITICAL_SECTION lpcs)
347
{
348
   SleepConditionVariableCS(pcv, lpcs, INFINITE);
349
}
350
#else
351
static inline void platform_mutex_lock(pthread_mutex_t *mtx)
6,862✔
352
{
353
   PTHREAD_CHECK(pthread_mutex_lock, mtx);
6,862✔
354
}
6,862✔
355

356
static inline void platform_mutex_unlock(pthread_mutex_t *mtx)
6,862✔
357
{
358
   PTHREAD_CHECK(pthread_mutex_unlock, mtx);
6,862✔
359
}
6,862✔
360

361
static inline void platform_cond_broadcast(pthread_cond_t *cond)
5,367✔
362
{
363
   PTHREAD_CHECK(pthread_cond_broadcast, cond);
5,367✔
364
}
5,367✔
365

366
static inline void platform_cond_wait(pthread_cond_t *cond,
1,632✔
367
                                      pthread_mutex_t *mtx)
368
{
369
   PTHREAD_CHECK(pthread_cond_wait, cond, mtx);
1,632✔
370
}
1,632✔
371
#endif
372

373
static void join_worker_threads(void)
4,555✔
374
{
375
   SCOPED_A(nvc_thread_t *) join_list = AINIT;
9,110✔
376

377
   for (int i = 1; i < MAX_THREADS; i++) {
291,520✔
378
      nvc_thread_t *t = atomic_load(&threads[i]);
286,965✔
379
      if (t != NULL)
286,965✔
380
         APUSH(join_list, t);
286,965✔
381
   }
382

383
   // Lock the wake mutex here to avoid races with workers sleeping
384
   platform_mutex_lock(&wakelock);
4,555✔
385
   {
386
      atomic_store(&should_stop, true);
4,555✔
387
      platform_cond_broadcast(&wake_workers);
4,555✔
388
   }
389
   platform_mutex_unlock(&wakelock);
4,555✔
390

391
   for (int i = 0; i < join_list.count; i++) {
4,611✔
392
      nvc_thread_t *t = join_list.items[i];
56✔
393

394
      switch (relaxed_load(&t->kind)) {
56✔
395
      case WORKER_THREAD:
56✔
396
         thread_join(t);
56✔
397
         continue;  // Freed thread struct
56✔
398
      case USER_THREAD:
×
399
      case MAIN_THREAD:
400
         fatal_trace("leaked a user thread: %s", t->name);
×
401
      }
402
   }
403

404
   assert(atomic_load(&running_threads) == 1);
4,555✔
405

406
#ifdef DEBUG
407
   for (int i = 0; i < PARKING_BAYS; i++)
296,075✔
408
      assert(parking_bays[i].parked == 0);
291,520✔
409
#endif
410
}
4,555✔
411

412
static nvc_thread_t *thread_new(thread_fn_t fn, void *arg,
4,620✔
413
                                thread_kind_t kind, char *name)
414
{
415
   nvc_thread_t *thread = xcalloc(sizeof(nvc_thread_t));
4,620✔
416
   thread->name     = name;
4,620✔
417
   thread->fn       = fn;
4,620✔
418
   thread->arg      = arg;
4,620✔
419
   thread->kind     = kind;
4,620✔
420
   thread->rngstate = rand();
4,620✔
421

422
   atomic_store(&thread->queue.age.bits, 0);
4,620✔
423
   atomic_store(&thread->queue.bot, 0);
4,620✔
424

425
   int id = 0;
4,620✔
426
   for (; id < MAX_THREADS; id++) {
4,746✔
427
      if (relaxed_load(&(threads[id])) != NULL)
4,746✔
428
         continue;
126✔
429
      else if (atomic_cas(&(threads[id]), NULL, thread))
4,620✔
430
         break;
431
   }
432

433
   if (id == MAX_THREADS)
4,620✔
434
      fatal_trace("cannot create more than %d threads", MAX_THREADS);
×
435

436
   thread->id = id;
4,620✔
437

438
   unsigned max = relaxed_load(&max_thread_id);
4,620✔
439
   while (max < id) {
4,620✔
440
      if (__atomic_cas(&max_thread_id, &max, id))
70✔
441
         break;
442
   }
443

444
   atomic_add(&running_threads, 1);
4,620✔
445
   return thread;
4,620✔
446
}
447

448
#ifdef POSIX_SUSPEND
449
static void unmask_fatal_signals(sigset_t *mask)
6,547✔
450
{
451
   sigdelset(mask, SIGQUIT);
6,547✔
452
   sigdelset(mask, SIGABRT);
6,547✔
453
   sigdelset(mask, SIGTERM);
6,547✔
454
}
6,547✔
455
#endif
456

457
#ifdef __APPLE__
458
static void reset_mach_ports(void)
459
{
460
   for (int i = 0; i < MAX_THREADS; i++) {
461
      nvc_thread_t *t = atomic_load(&(threads[i]));
462
      if (t == NULL)
463
         continue;
464

465
      // Mach ports are not valid after fork
466
      t->port = pthread_mach_thread_np(t->handle);
467
   }
468
}
469
#endif
470

471
void thread_init(void)
4,550✔
472
{
473
   assert(my_thread == NULL);
4,550✔
474

475
   my_thread = thread_new(NULL, NULL, MAIN_THREAD, xstrdup("main thread"));
4,550✔
476

477
#ifdef __MINGW32__
478
   my_thread->handle = GetCurrentThread();
479
#else
480
   my_thread->handle = pthread_self();
4,550✔
481
#endif
482

483
#ifdef __APPLE__
484
   my_thread->port = pthread_mach_thread_np(my_thread->handle);
485
   pthread_atfork(NULL, NULL, reset_mach_ports);
486
#endif
487

488
#ifdef __MINGW32__
489
   InitializeCriticalSectionAndSpinCount(&wakelock, LOCK_SPINS);
490
   InitializeConditionVariable(&wake_workers);
491

492
   for (int i = 0; i < PARKING_BAYS; i++) {
493
      parking_bay_t *bay = &(parking_bays[i]);
494
      InitializeCriticalSectionAndSpinCount(&(bay->mutex), LOCK_SPINS);
495
      InitializeConditionVariable(&(bay->cond));
496
   }
497
#endif
498

499
   assert(my_thread->id == 0);
4,550✔
500

501
   const char *env = getenv("NVC_MAX_THREADS");
4,550✔
502
   if (env != NULL)
4,550✔
503
      max_workers = MAX(1, MIN(atoi(env), MAX_THREADS));
×
504
   else
505
      max_workers = MIN(nvc_nprocs(), DEFAULT_THREADS);
4,550✔
506
   assert(max_workers > 0);
4,550✔
507

508
#ifdef DEBUG
509
   if (getenv("NVC_THREAD_VERBOSE") != NULL)
4,550✔
510
      atexit(print_lock_stats);
×
511
#endif
512

513
   atexit(join_worker_threads);
4,550✔
514

515
#ifdef POSIX_SUSPEND
516
   sem_init(&stop_sem, 0, 0);
4,550✔
517

518
   struct sigaction sa = {
4,550✔
519
      .sa_sigaction = suspend_handler,
520
      .sa_flags = SA_RESTART | SA_SIGINFO
521
   };
522
   sigfillset(&sa.sa_mask);
4,550✔
523
   unmask_fatal_signals(&sa.sa_mask);
4,550✔
524

525
   sigaction(SIGSUSPEND, &sa, NULL);
4,550✔
526
   sigaction(SIGRESUME, &sa, NULL);
4,550✔
527

528
   sigset_t mask;
4,550✔
529
   PTHREAD_CHECK(pthread_sigmask, SIG_SETMASK, NULL, &mask);
4,550✔
530

531
   sigdelset(&mask, SIGSUSPEND);
4,550✔
532
   sigaddset(&mask, SIGRESUME);
4,550✔
533

534
   PTHREAD_CHECK(pthread_sigmask, SIG_SETMASK, &mask, NULL);
4,550✔
535
#endif
536
}
4,550✔
537

538
int thread_id(void)
75,379,123✔
539
{
540
   assert(my_thread != NULL);
75,379,123✔
541
   return my_thread->id;
75,379,123✔
542
}
543

544
bool thread_attached(void)
45✔
545
{
546
   return my_thread != NULL;
45✔
547
}
548

549
void thread_sleep(int usec)
×
550
{
551
   usleep(usec);
×
552
}
×
553

554
static void *thread_wrapper(void *arg)
70✔
555
{
556
   assert(my_thread == NULL);
70✔
557
   my_thread = arg;
70✔
558

559
   void *result = (*my_thread->fn)(my_thread->arg);
70✔
560

561
   // Avoid races with stop_world
562
   SCOPED_LOCK(stop_lock);
70✔
563

564
   assert(threads[my_thread->id] == my_thread);
70✔
565
   atomic_store(&(threads[my_thread->id]),  NULL);
70✔
566

567
   atomic_add(&running_threads, -1);
70✔
568
   return result;
70✔
569
}
570

571
#ifdef __MINGW32__
572
static DWORD win32_thread_wrapper(LPVOID param)
573
{
574
   void *ret = thread_wrapper(param);
575
   atomic_store(&(my_thread->retval), ret);
576
   return 0;
577
}
578
#endif
579

580
nvc_thread_t *thread_create(thread_fn_t fn, void *arg, const char *fmt, ...)
14✔
581
{
582
   va_list ap;
14✔
583
   va_start(ap, fmt);
14✔
584
   char *name = xvasprintf(fmt, ap);
14✔
585
   va_end(ap);
14✔
586

587
   // Avoid races with stop_world
588
   SCOPED_LOCK(stop_lock);
14✔
589

590
   nvc_thread_t *thread = thread_new(fn, arg, USER_THREAD, name);
14✔
591

592
#ifdef __MINGW32__
593
   if ((thread->handle = CreateThread(NULL, 0, win32_thread_wrapper,
594
                                      thread, 0, NULL)) == NULL)
595
      fatal_errno("CreateThread");
596
#else
597
   PTHREAD_CHECK(pthread_create, &(thread->handle), NULL,
14✔
598
                 thread_wrapper, thread);
599
#endif
600

601
#ifdef __APPLE__
602
   thread->port = pthread_mach_thread_np(thread->handle);
603
#endif
604

605
   return thread;
14✔
606
}
607

608
void *thread_join(nvc_thread_t *thread)
70✔
609
{
610
   if (thread == my_thread || thread->kind == MAIN_THREAD)
70✔
611
      fatal_trace("cannot join self or main thread");
×
612

613
   void *retval = NULL;
70✔
614
#ifdef __MINGW32__
615
   if (WaitForSingleObject(thread->handle, INFINITE) == WAIT_FAILED)
616
      fatal_errno("WaitForSingleObject failed for thread %s", thread->name);
617

618
   retval = atomic_load(&(thread->retval));
619
#else
620
   PTHREAD_CHECK(pthread_join, thread->handle, &retval);
70✔
621
#endif
622

623
   async_free(thread->name);
70✔
624
   async_free(thread);
70✔
625

626
   return retval;
70✔
627
}
628

629
nvc_thread_t *get_thread(int id)
27,456✔
630
{
631
   assert(id >= 0 && id < MAX_THREADS);
27,456✔
632
   return atomic_load(&threads[id]);
27,456✔
633
}
634

635
static inline parking_bay_t *parking_bay_for(void *cookie)
2,214✔
636
{
637
   return &(parking_bays[mix_bits_64(cookie) % PARKING_BAYS]);
2,214✔
638
}
639

640
static void thread_park(void *cookie, park_fn_t fn)
1,545✔
641
{
642
   parking_bay_t *bay = parking_bay_for(cookie);
1,545✔
643

644
   platform_mutex_lock(&(bay->mutex));
1,545✔
645
   {
646
      if ((*fn)(bay, cookie)) {
1,545✔
647
         bay->parked++;
1,539✔
648
         platform_cond_wait(&(bay->cond), &(bay->mutex));
1,539✔
649
         assert(bay->parked > 0);
1,539✔
650
         bay->parked--;
1,539✔
651
      }
652
   }
653
   platform_mutex_unlock(&(bay->mutex));
1,545✔
654
}
1,545✔
655

656
static void thread_unpark(void *cookie, unpark_fn_t fn)
669✔
657
{
658
   parking_bay_t *bay = parking_bay_for(cookie);
669✔
659

660
   if (fn != NULL) {
669✔
661
      platform_mutex_lock(&(bay->mutex));
669✔
662
      {
663
         (*fn)(bay, cookie);
669✔
664
      }
665
      platform_mutex_unlock(&(bay->mutex));
669✔
666
   }
667

668
   // Do not use pthread_cond_signal here as multiple threads parked in
669
   // this bay may be waiting on different cookies
670
   platform_cond_broadcast(&(bay->cond));
669✔
671
}
669✔
672

673
void spin_wait(void)
56,830,821✔
674
{
675
#ifdef __x86_64__
676
   __asm__ volatile ("pause");
56,830,821✔
677
#elif defined __aarch64__
678
   // YIELD is a no-op on most AArch64 cores so also do an ISB to stall
679
   // the pipeline for a bit
680
   __asm__ volatile ("yield; isb");
681
#endif
682
}
56,830,821✔
683

684
static bool lock_park_cb(parking_bay_t *bay, void *cookie)
1,545✔
685
{
686
   nvc_lock_t *lock = cookie;
1,545✔
687

688
   // This is called with the park mutex held: check the lock is still
689
   // owned by someone and the park bit is still set
690
   return relaxed_load(lock) == (IS_LOCKED | HAS_PARKED);
1,545✔
691
}
692

693
static void lock_unpark_cb(parking_bay_t *bay, void *cookie)
669✔
694
{
695
   nvc_lock_t *lock = cookie;
669✔
696

697
   assert(relaxed_load(lock) == (IS_LOCKED | HAS_PARKED));
669✔
698

699
   // Unlock must have release semantics
700
   atomic_store(lock, (bay->parked > 0 ? HAS_PARKED : 0));
669✔
701
}
669✔
702

703
void nvc_lock(nvc_lock_t *lock)
70,036,800✔
704
{
705
   LOCK_EVENT(locks, 1);
70,036,800✔
706
   TSAN_PRE_LOCK(lock);
70,036,800✔
707

708
   int8_t state = 0;
70,036,800✔
709
   if (likely(__atomic_cas(lock, &state, state | IS_LOCKED)))
70,036,800✔
710
      goto locked;  // Fast path: acquired the lock without contention
70,028,080✔
711

712
   LOCK_EVENT(contended, 1);
8,720✔
713

714
   for (;;) {
36,328✔
715
      LOCK_EVENT(retries, 1);
36,328✔
716

717
      // Spin a few times waiting for the owner to release the lock
718
      // before parking
719
      int spins = 0;
720
      for (; (state & IS_LOCKED) && spins < LOCK_SPINS;
216,104✔
721
           spins++, state = relaxed_load(lock))
179,776✔
722
         spin_wait();
179,776✔
723

724
      if (state & IS_LOCKED) {
36,328✔
725
         // Ignore failures here as we will check the lock state again
726
         // in the callback with the park mutex held
727
         atomic_cas(lock, IS_LOCKED, IS_LOCKED | HAS_PARKED);
1,545✔
728

729
         LOCK_EVENT(parks, 1);
1,545✔
730
         thread_park(lock, lock_park_cb);
1,545✔
731

732
         if ((state = relaxed_load(lock)) & IS_LOCKED) {
1,545✔
733
            // Someone else grabbed the lock before our thread was unparked
734
            LOCK_EVENT(spurious, 1);
1,325✔
735
            continue;
1,325✔
736
         }
737
      }
738
      else
739
         LOCK_EVENT(spins, spins);
34,783✔
740

741
      assert(!(state & IS_LOCKED));
35,003✔
742

743
      // If we get here then we've seen the lock in an unowned state:
744
      // attempt to grab it with a CAS
745
      if (__atomic_cas(lock, &state, state | IS_LOCKED))
35,003✔
746
         goto locked;
8,720✔
747
   }
748

749
 locked:
70,036,800✔
750
   TSAN_POST_LOCK(lock);
70,036,800✔
751
}
70,036,800✔
752

753
void nvc_unlock(nvc_lock_t *lock)
70,036,797✔
754
{
755
   TSAN_PRE_UNLOCK(lock);
70,036,797✔
756

757
   // Fast path: unlock assuming no parked waiters
758
   if (likely(atomic_cas(lock, IS_LOCKED, 0)))
70,036,797✔
759
      goto unlocked;
70,036,128✔
760

761
   // If we get here then we must own the lock with at least one parked
762
   // waiting thread
763
   assert(relaxed_load(lock) == (IS_LOCKED | HAS_PARKED));
669✔
764

765
   // Lock released in callback
766
   thread_unpark(lock, lock_unpark_cb);
669✔
767

768
 unlocked:
70,036,797✔
769
   TSAN_POST_UNLOCK(lock);
70,036,797✔
770
}
70,036,797✔
771

772
#ifdef DEBUG
773
void assert_lock_held(nvc_lock_t *lock)
160,052✔
774
{
775
   int8_t state = relaxed_load(lock);
160,052✔
776
   if (unlikely(!(state & IS_LOCKED)))
160,052✔
777
      fatal_trace("expected lock at %p to be held", lock);
×
778
}
160,052✔
779
#endif
780

781
void __scoped_unlock(nvc_lock_t **plock)
69,985,836✔
782
{
783
   nvc_unlock(*plock);
69,985,836✔
784
}
69,985,836✔
785

786
static void push_bot(threadq_t *tq, const task_t *tasks, size_t count)
79✔
787
{
788
   const abp_idx_t bot = relaxed_load(&tq->bot);
79✔
789
   assert(bot + count <= THREADQ_SIZE);
79✔
790

791
   memcpy(tq->deque + bot, tasks, count * sizeof(task_t));
79✔
792
   store_release(&tq->bot, bot + count);
79✔
793
}
79✔
794

795
static bool pop_bot(threadq_t *tq, task_t *task)
239✔
796
{
797
   const abp_idx_t old_bot = relaxed_load(&tq->bot);
239✔
798
   if (old_bot == 0)
239✔
799
      return false;
800

801
   const abp_idx_t new_bot = old_bot - 1;
193✔
802
   atomic_store(&tq->bot, new_bot);
193✔
803

804
   *task = tq->deque[new_bot];
193✔
805

806
   const abp_age_t old_age = { .bits = atomic_load(&tq->age.bits) };
193✔
807
   if (new_bot > old_age.top)
193✔
808
      return true;
809

810
   atomic_store(&tq->bot, 0);
79✔
811

812
   const abp_age_t new_age = { .top = 0, .tag = old_age.tag + 1 };
79✔
813
   if (new_bot == old_age.top) {
79✔
814
      if (atomic_cas(&tq->age.bits, old_age.bits, new_age.bits))
46✔
815
         return true;
816
   }
817

818
   atomic_store(&tq->age.bits, new_age.bits);
33✔
819
   return false;
33✔
820
}
821

822
__attribute__((no_sanitize("thread")))
823
static bool pop_top(threadq_t *tq, task_t *task)
71✔
824
{
825
   const abp_age_t old_age = { .bits = atomic_load(&tq->age.bits) };
71✔
826
   const abp_idx_t bot = atomic_load(&tq->bot);
71✔
827

828
   if (bot <= old_age.top)
71✔
829
      return false;
830

831
   // This triggers a TSAN false-positive: we will never read from *task
832
   // if the CAS fails below, so it's safe to ignore
833
   *task = tq->deque[old_age.top];
70✔
834

835
   const abp_age_t new_age = {
70✔
836
      .tag = old_age.tag,
837
      .top = old_age.top + 1
70✔
838
   };
839

840
   return atomic_cas(&tq->age.bits, old_age.bits, new_age.bits);
70✔
841
}
842

843
static void globalq_put(globalq_t *gq, const task_t *tasks, size_t count)
143✔
844
{
845
   assert_lock_held(&gq->lock);
143✔
846

847
   if (gq->wptr == gq->rptr)
143✔
848
      gq->wptr = gq->rptr = 0;
69✔
849

850
   if (gq->wptr + count > gq->max) {
143✔
851
      gq->max = next_power_of_2(gq->wptr + count);
39✔
852
      gq->tasks = xrealloc_array(gq->tasks, gq->max, sizeof(task_t));
39✔
853
   }
854

855
   memcpy(gq->tasks + gq->wptr, tasks, count * sizeof(task_t));
143✔
856
   gq->wptr += count;
143✔
857
}
143✔
858

859
__attribute__((no_sanitize("thread")))
860
static bool globalq_unlocked_empty(globalq_t *gq)
428✔
861
{
862
   return relaxed_load(&gq->wptr) == relaxed_load(&gq->rptr);
428✔
863
}
864

865
static size_t globalq_take(globalq_t *gq, threadq_t *tq)
428✔
866
{
867
   if (globalq_unlocked_empty(gq))
428✔
868
      return 0;
869

870
   const int nthreads = relaxed_load(&running_threads);
95✔
871

872
   SCOPED_LOCK(gq->lock);
190✔
873

874
   if (gq->wptr == gq->rptr)
95✔
875
      return 0;
876

877
   const int remain = gq->wptr - gq->rptr;
79✔
878
   const int share = gq->wptr / nthreads;
79✔
879
   const int take = MIN(remain, MAX(MIN_TAKE, MIN(THREADQ_SIZE, share)));
79✔
880
   const int from = gq->rptr;
79✔
881

882
   gq->rptr += take;
79✔
883

884
   push_bot(tq, gq->tasks + from, take);
79✔
885
   return take;
79✔
886
}
887

888
static void execute_task(task_t *task)
1,288✔
889
{
890
   (*task->fn)(task->context, task->arg);
1,288✔
891

892
   if (task->workq != NULL) {
1,288✔
893
      entryq_t *eq = &(task->workq->entryqs[my_thread->id]);
1,177✔
894

895
      const entryq_ptr_t cur = { .bits = relaxed_load(&eq->comp.bits) };
1,177✔
896
      const int epoch = atomic_load(&task->workq->epoch);
1,177✔
897
      const int count = cur.epoch == epoch ? cur.count : 0;
1,177✔
898
      const entryq_ptr_t next = { .count = count + 1, .epoch = epoch };
1,177✔
899
      store_release(&eq->comp.bits, next.bits);
1,177✔
900
   }
901
   else
902
      atomic_add(&async_pending, -1);
111✔
903
}
1,288✔
904

905
static bool globalq_poll(globalq_t *gq, threadq_t *tq)
428✔
906
{
907
   int ntasks;
428✔
908
   if ((ntasks = globalq_take(gq, tq))) {
428✔
909
      task_t task;
79✔
910
      int comp = 0;
79✔
911
      for (; pop_bot(tq, &task); comp++)
318✔
912
         execute_task(&task);
160✔
913

914
      WORKQ_EVENT(comp, comp);
79✔
915
      return true;
79✔
916
   }
917
   else
918
      return false;
919
}
920

921
workq_t *workq_new(void *context)
1,090✔
922
{
923
   if (my_thread->kind != MAIN_THREAD)
1,090✔
924
      fatal_trace("work queues can only be created by the main thread");
×
925

926
   workq_t *wq = xcalloc(sizeof(workq_t));
1,090✔
927
   wq->state    = IDLE;
1,090✔
928
   wq->context  = context;
1,090✔
929
   wq->parallel = max_workers > 1;
1,090✔
930
   wq->epoch    = 1;
1,090✔
931

932
   return wq;
1,090✔
933
}
934

935
void workq_not_thread_safe(workq_t *wq)
×
936
{
937
   wq->parallel = false;
×
938
}
×
939

940
void workq_free(workq_t *wq)
1,090✔
941
{
942
   if (my_thread->kind != MAIN_THREAD)
1,090✔
943
      fatal_trace("work queues can only be freed by the main thread");
×
944

945
   assert(wq->state == IDLE);
1,090✔
946

947
   for (int i = 0; i < MAX_THREADS; i++)
70,850✔
948
      free(wq->entryqs[i].tasks);
69,760✔
949

950
   free(wq);
1,090✔
951
}
1,090✔
952

953
void workq_do(workq_t *wq, task_fn_t fn, void *arg)
1,177✔
954
{
955
   assert(wq->state == IDLE);
1,177✔
956

957
   entryq_t *eq = &(wq->entryqs[my_thread->id]);
1,177✔
958

959
   const entryq_ptr_t cur = { .bits = relaxed_load(&eq->wptr.bits) };
1,177✔
960
   const int epoch = atomic_load(&wq->epoch);
1,177✔
961
   const int wptr = cur.epoch == epoch ? cur.count : 0;
1,177✔
962

963
   if (wptr == eq->queuesz) {
1,177✔
964
      eq->queuesz = MAX(eq->queuesz * 2, 64);
1,090✔
965
      eq->tasks = xrealloc_array(eq->tasks, eq->queuesz, sizeof(task_t));
1,090✔
966
   }
967

968
   eq->tasks[wptr] = (task_t){ fn, wq->context, arg, wq };
1,177✔
969

970
   const entryq_ptr_t next = { .count = wptr + 1, .epoch = epoch };
1,177✔
971
   store_release(&eq->wptr.bits, next.bits);
1,177✔
972

973
   unsigned maxthread = relaxed_load(&wq->maxthread);
1,177✔
974
   while (maxthread < my_thread->id) {
1,177✔
975
      if (__atomic_cas(&wq->maxthread, &maxthread, my_thread->id))
×
976
         break;
977
   }
978
}
1,177✔
979

980
void workq_scan(workq_t *wq, scan_fn_t fn, void *arg)
×
981
{
982
   const int maxthread = relaxed_load(&wq->maxthread);
×
983
   for (int i = 0; i <= maxthread; i++) {
×
984
      entryq_t *eq = &(wq->entryqs[my_thread->id]);
×
985
      const entryq_ptr_t wptr = { .bits = load_acquire(&eq->wptr.bits) };
×
986
      for (int j = 0; j < wptr.count; j++)
×
987
         (*fn)(wq->context, eq->tasks[j].arg, arg);
×
988
   }
989
}
×
990

991
static int estimate_depth(threadq_t *tq)
1,039✔
992
{
993
   const abp_age_t age = { .bits = relaxed_load(&tq->age.bits) };
1,039✔
994
   const abp_idx_t bot = relaxed_load(&tq->bot);
1,039✔
995

996
   return bot <= age.top ? 0 : bot - age.top;
1,039✔
997
}
998

999
static threadq_t *get_thread_queue(int id)
1,039✔
1000
{
1001
   assert(id < MAX_THREADS);
1,039✔
1002

1003
   nvc_thread_t *t = atomic_load(&(threads[id]));
1,039✔
1004
   if (t == NULL)
1,039✔
1005
      return NULL;
1006

1007
   return &(t->queue);
1,039✔
1008
}
1009

1010
static uint32_t fast_rand(void)
324✔
1011
{
1012
   uint32_t state = my_thread->rngstate;
324✔
1013
   state ^= (state << 13);
324✔
1014
   state ^= (state >> 17);
324✔
1015
   state ^= (state << 5);
324✔
1016
   return (my_thread->rngstate = state);
324✔
1017
}
1018

1019
static threadq_t *find_victim(void)
364✔
1020
{
1021
   threadq_t *last = get_thread_queue(my_thread->victim);
364✔
1022
   if (last != NULL && estimate_depth(last) > 0)
364✔
1023
      return last;
1024

1025
   const int maxthread = relaxed_load(&max_thread_id);
324✔
1026
   const int start = fast_rand() % (maxthread + 1);
324✔
1027
   int idx = start;
324✔
1028
   do {
986✔
1029
      if (idx != my_thread->id) {
986✔
1030
         threadq_t *q = get_thread_queue(idx);
675✔
1031
         if (q != NULL && estimate_depth(q) > 0) {
675✔
1032
            my_thread->victim = idx;
31✔
1033
            return q;
31✔
1034
         }
1035
      }
1036
   } while ((idx = (idx + 1) % (maxthread + 1)) != start);
955✔
1037

1038
   return NULL;
1039
}
1040

1041
static bool steal_task(void)
364✔
1042
{
1043
   threadq_t *tq = find_victim();
364✔
1044
   if (tq == NULL)
364✔
1045
      return false;
1046

1047
   task_t task;
71✔
1048
   if (pop_top(tq, &task)) {
71✔
1049
      WORKQ_EVENT(steals, 1);
70✔
1050
      execute_task(&task);
70✔
1051
      WORKQ_EVENT(comp, 1);
70✔
1052
      return true;
70✔
1053
   }
1054

1055
   return false;
1056
}
1057

1058
static void progressive_backoff(void)
58,416,512✔
1059
{
1060
   if (my_thread->spins++ < YIELD_SPINS)
58,416,512✔
1061
      spin_wait();
56,646,325✔
1062
   else {
1063
#ifdef __MINGW32__
1064
      SwitchToThread();
1065
#else
1066
      sched_yield();
1,770,187✔
1067
#endif
1068
      my_thread->spins = 0;
1,770,187✔
1069
   }
1070
}
58,416,512✔
1071

1072
static void *worker_thread(void *arg)
56✔
1073
{
1074
   mspace_stack_limit(MSPACE_CURRENT_FRAME);
56✔
1075

1076
   do {
374✔
1077
      if (globalq_poll(&globalq, &(my_thread->queue)) || steal_task())
374✔
1078
         my_thread->spins = 0;  // Did work
112✔
1079
      else if (my_thread->spins++ < 2)
262✔
1080
         spin_wait();
169✔
1081
      else {
1082
         platform_mutex_lock(&wakelock);
93✔
1083
         {
1084
            if (!relaxed_load(&should_stop))
93✔
1085
               platform_cond_wait(&wake_workers, &wakelock);
93✔
1086
         }
1087
         platform_mutex_unlock(&wakelock);
93✔
1088
      }
1089
   } while (likely(!relaxed_load(&should_stop)));
374✔
1090

1091
   return NULL;
56✔
1092
}
1093

1094
static void create_workers(int needed)
143✔
1095
{
1096
   assert(my_thread->kind == MAIN_THREAD);
143✔
1097

1098
   if (relaxed_load(&should_stop))
143✔
1099
      return;
1100

1101
   while (relaxed_load(&running_threads) < MIN(max_workers, needed)) {
199✔
1102
      static int counter = 0;
56✔
1103
      char *name = xasprintf("worker thread %d", atomic_add(&counter, 1));
56✔
1104
      nvc_thread_t *thread =
56✔
1105
         thread_new(worker_thread, NULL, WORKER_THREAD, name);
56✔
1106

1107
#ifdef __MINGW32__
1108
      if ((thread->handle = CreateThread(NULL, 0, win32_thread_wrapper,
1109
                                         thread, 0, NULL)) == NULL)
1110
         fatal_errno("CreateThread");
1111
#else
1112
      PTHREAD_CHECK(pthread_create, &(thread->handle), NULL,
199✔
1113
                    thread_wrapper, thread);
1114
#endif
1115
   }
1116

1117
   platform_cond_broadcast(&wake_workers);
143✔
1118
}
1119

1120
void workq_start(workq_t *wq)
1,090✔
1121
{
1122
   assert(my_thread->kind == MAIN_THREAD);
1,090✔
1123

1124
   const int epoch = relaxed_load(&wq->epoch);
1,090✔
1125
   const int maxthread = relaxed_load(&wq->maxthread);
1,090✔
1126

1127
   assert(wq->state == IDLE);
1,090✔
1128
   wq->state = START;
1,090✔
1129

1130
   int nserial = 0, nparallel = 0;
1,090✔
1131
   for (int i = 0; i <= maxthread; i++) {
2,180✔
1132
      entryq_t *eq = &wq->entryqs[i];
1,090✔
1133
      const entryq_ptr_t wptr = { .bits = load_acquire(&eq->wptr.bits) };
1,090✔
1134
      if (wptr.epoch == epoch) {
1,090✔
1135
         // Only bump epoch if there are tasks to run
1136
         if (nserial + nparallel == 0)
1,090✔
1137
            atomic_add(&wq->epoch, 1);
1,090✔
1138

1139
         assert(wptr.count > 0);
1,090✔
1140

1141
         if (i == maxthread && nserial + nparallel == 0 && wptr.count == 1) {
1,090✔
1142
            execute_task(&(eq->tasks[0]));   // Only one task in total
1,058✔
1143
            nserial++;
1,058✔
1144
         }
1145
         else if (wq->parallel) {
32✔
1146
            if (nparallel == 0)
32✔
1147
               nvc_lock(&globalq.lock);   // Lazily acquire lock
32✔
1148
            globalq_put(&globalq, eq->tasks, wptr.count);
32✔
1149
            nparallel += wptr.count;
32✔
1150
         }
1151
         else {
1152
            for (int j = 0; j < wptr.count; j++)
×
1153
               execute_task(&(eq->tasks[j]));
×
1154
            nserial += wptr.count;
×
1155
         }
1156
      }
1157
   }
1158

1159
   if (wq->parallel && nparallel > 0) {
1,090✔
1160
      nvc_unlock(&globalq.lock);
32✔
1161
      create_workers(nparallel);
32✔
1162
   }
1163
}
1,090✔
1164

1165
static int workq_outstanding(workq_t *wq)
58,295,700✔
1166
{
1167
   assert(wq->state == START);
58,295,700✔
1168

1169
   const int epoch = atomic_load(&wq->epoch);
58,295,700✔
1170
   const int maxthread = relaxed_load(&max_thread_id);
58,295,700✔
1171

1172
   int pending = 0;
58,295,700✔
1173
   for (int i = 0; i <= maxthread; i++) {
213,590,731✔
1174
      entryq_t *eq = &(wq->entryqs[i]);
155,295,031✔
1175

1176
      const entryq_ptr_t wptr = { .bits = load_acquire(&eq->wptr.bits) };
155,295,031✔
1177
      if (wptr.epoch == epoch - 1)
155,295,031✔
1178
         pending += wptr.count;
58,295,700✔
1179

1180
      const entryq_ptr_t comp = { .bits = load_acquire(&eq->comp.bits) };
155,295,031✔
1181
      if (comp.epoch == epoch)
155,295,031✔
1182
         pending -= comp.count;
90,996,750✔
1183
   }
1184

1185
   assert(pending >= 0);
58,295,700✔
1186
   return pending;
58,295,700✔
1187
}
1188

1189
static void workq_parallel_drain(workq_t *wq)
1,090✔
1190
{
1191
   if (workq_outstanding(wq) > 0) {
1,090✔
1192
      while (globalq_poll(&globalq, &(my_thread->queue)));
53✔
1193
      while (steal_task());
47✔
1194

1195
      while (workq_outstanding(wq) > 0)
58,294,610✔
1196
         progressive_backoff();
58,294,578✔
1197
   }
1198

1199
   wq->state = IDLE;
1,090✔
1200
}
1,090✔
1201

1202
void workq_drain(workq_t *wq)
1,090✔
1203
{
1204
   if (my_thread->kind != MAIN_THREAD)
1,090✔
1205
      fatal_trace("workq_drain can only be called from the main thread");
×
1206

1207
   if (wq->parallel)
1,090✔
1208
      workq_parallel_drain(wq);
1,090✔
1209
   else {
1210
      assert(wq->state == START);
×
1211
      wq->state = IDLE;
×
1212
   }
1213
}
1,090✔
1214

1215
void async_do(task_fn_t fn, void *context, void *arg)
111✔
1216
{
1217
   if (max_workers == 1)
111✔
1218
      (*fn)(context, arg);   // Single CPU
×
1219
   else {
1220
      const int npending = atomic_add(&async_pending, 1);
111✔
1221
      create_workers(npending + 1 /* Do not count main thread */);
111✔
1222

1223
      task_t tasks[1] = {
111✔
1224
         { fn, context, arg, NULL }
1225
      };
1226
      SCOPED_LOCK(globalq.lock);
222✔
1227
      globalq_put(&globalq, tasks, 1);
111✔
1228
   }
1229
}
111✔
1230

1231
void async_barrier(void)
7,982✔
1232
{
1233
   while (atomic_load(&async_pending) > 0) {
15,965✔
1234
      if (!globalq_poll(&globalq, &(my_thread->queue)))
1✔
1235
         progressive_backoff();
×
1236
   }
1237
}
7,982✔
1238

1239
void async_free(void *ptr)
5,995✔
1240
{
1241
   if (relaxed_load(&running_threads) == 1)
5,995✔
1242
      free(ptr);
5,980✔
1243
   else if (ptr != NULL) {
1244
      // TODO: free when all threads in quiescent state
1245
   }
5,995✔
1246
}
5,995✔
1247

1248
#ifdef POSIX_SUSPEND
1249
static void suspend_handler(int sig, siginfo_t *info, void *context)
3,994✔
1250
{
1251
   if (info->si_pid != getpid())
3,994✔
1252
      return;   // Not sent by us, ignore it
1,997✔
1253
   else if (sig == SIGRESUME)
3,994✔
1254
      return;
1255

1256
   const int olderrno = errno;
1,997✔
1257

1258
   if (my_thread != NULL) {
1,997✔
1259
      struct cpu_state cpu;
1,997✔
1260
      fill_cpu_state(&cpu, (ucontext_t *)context);
1,997✔
1261

1262
      stop_world_fn_t callback = atomic_load(&stop_callback);
1,997✔
1263
      void *arg = atomic_load(&stop_arg);
1,997✔
1264

1265
      (*callback)(my_thread->id, &cpu, arg);
1,997✔
1266
   }
1267

1268
   sem_post(&stop_sem);
1,997✔
1269

1270
   sigset_t mask;
1,997✔
1271
   sigfillset(&mask);
1,997✔
1272
   unmask_fatal_signals(&mask);
1,997✔
1273
   sigdelset(&mask, SIGRESUME);
1,997✔
1274

1275
   sigsuspend(&mask);
1,997✔
1276

1277
   sem_post(&stop_sem);
1,997✔
1278

1279
   errno = olderrno;
1,997✔
1280
}
1281
#endif
1282

1283
void stop_world(stop_world_fn_t callback, void *arg)
929✔
1284
{
1285
   nvc_lock(&stop_lock);
929✔
1286

1287
   atomic_store(&stop_callback, callback);
929✔
1288
   atomic_store(&stop_arg, arg);
929✔
1289

1290
#ifdef __MINGW32__
1291
   const int maxthread = relaxed_load(&max_thread_id);
1292
   for (int i = 0; i <= maxthread; i++) {
1293
      nvc_thread_t *thread = atomic_load(&threads[i]);
1294
      if (thread == NULL || thread == my_thread)
1295
         continue;
1296

1297
      if (SuspendThread(thread->handle) != 0)
1298
         fatal_errno("SuspendThread");
1299

1300
      CONTEXT context;
1301
      context.ContextFlags = CONTEXT_INTEGER | CONTEXT_CONTROL;
1302
      if (!GetThreadContext(thread->handle, &context))
1303
         fatal_errno("GetThreadContext");
1304

1305
      struct cpu_state cpu;
1306
      fill_cpu_state(&cpu, &context);
1307

1308
      (*callback)(thread->id, &cpu, arg);
1309
   }
1310
#elif defined __APPLE__
1311
   const int maxthread = relaxed_load(&max_thread_id);
1312
   for (int i = 0; i <= maxthread; i++) {
1313
      nvc_thread_t *thread = atomic_load(&threads[i]);
1314
      if (thread == NULL || thread == my_thread)
1315
         continue;
1316

1317
      assert(thread->port != MACH_PORT_NULL);
1318

1319
      kern_return_t kern_result;
1320
      do {
1321
         kern_result = thread_suspend(thread->port);
1322
      } while (kern_result == KERN_ABORTED);
1323

1324
      if (kern_result != KERN_SUCCESS)
1325
         fatal_trace("failed to suspend thread %d (%d)", i, kern_result);
1326

1327
#ifdef __aarch64__
1328
      arm_thread_state64_t state;
1329
      mach_msg_type_number_t count = ARM_THREAD_STATE64_COUNT;
1330
      thread_state_flavor_t flavor = ARM_THREAD_STATE64;
1331
#else
1332
      x86_thread_state64_t state;
1333
      mach_msg_type_number_t count = x86_THREAD_STATE64_COUNT;
1334
      thread_state_flavor_t flavor = x86_THREAD_STATE64;
1335
#endif
1336
      kern_result = thread_get_state(thread->port, flavor,
1337
                                     (natural_t *)&state, &count);
1338
      if (kern_result != KERN_SUCCESS)
1339
         fatal_trace("failed to get thread %d state (%d)", i, kern_result);
1340

1341
      // Fake a ucontext_t that we can pass to fill_cpu_state
1342
      ucontext_t uc;
1343
      typeof(*uc.uc_mcontext) mc;
1344
      uc.uc_mcontext = &mc;
1345
      mc.__ss = state;
1346

1347
      struct cpu_state cpu;
1348
      fill_cpu_state(&cpu, &uc);
1349

1350
      (*callback)(thread->id, &cpu, arg);
1351
   }
1352
#elif defined __SANITIZE_THREAD__
1353
   // https://github.com/google/sanitizers/issues/1179
1354
   fatal_trace("stop_world is not supported with tsan");
1355
#else
1356
   assert(sem_trywait(&stop_sem) == -1 && errno == EAGAIN);
929✔
1357

1358
   int signalled = 0;
929✔
1359
   const int maxthread = relaxed_load(&max_thread_id);
929✔
1360
   for (int i = 0; i <= maxthread; i++) {
4,336✔
1361
      nvc_thread_t *thread = atomic_load(&threads[i]);
3,407✔
1362
      if (thread == NULL || thread == my_thread)
3,407✔
1363
         continue;
1,410✔
1364

1365
      PTHREAD_CHECK(pthread_kill, thread->handle, SIGSUSPEND);
1,997✔
1366
      signalled++;
1,997✔
1367
   }
1368

1369
   struct timespec ts;
929✔
1370
   if (clock_gettime(CLOCK_REALTIME, &ts) != 0)
929✔
1371
      fatal_errno("clock_gettime");
×
1372

1373
   ts.tv_sec += SUSPEND_TIMEOUT;
929✔
1374

1375
   for (; signalled > 0; signalled--) {
2,926✔
1376
      if (sem_timedwait(&stop_sem, &ts) != 0)
1,997✔
1377
         fatal_trace("timeout waiting for %d threads to suspend", signalled);
×
1378
   }
1379

1380
   assert(sem_trywait(&stop_sem) == -1 && errno == EAGAIN);
929✔
1381
#endif
1382

1383
   struct cpu_state cpu;
929✔
1384
   capture_registers(&cpu);
929✔
1385

1386
   (*callback)(my_thread->id, &cpu, arg);
929✔
1387
}
929✔
1388

1389
void start_world(void)
929✔
1390
{
1391
   assert_lock_held(&stop_lock);
929✔
1392

1393
   const int maxthread = relaxed_load(&max_thread_id);
929✔
1394

1395
#ifdef __MINGW32__
1396
   for (int i = 0; i <= maxthread; i++) {
1397
      nvc_thread_t *thread = atomic_load(&threads[i]);
1398
      if (thread == NULL || thread == my_thread)
1399
         continue;
1400

1401
      if (ResumeThread(thread->handle) != 1)
1402
         fatal_errno("ResumeThread");
1403
   }
1404
#elif defined __APPLE__
1405
   for (int i = 0; i <= maxthread; i++) {
1406
      nvc_thread_t *thread = atomic_load(&threads[i]);
1407
      if (thread == NULL || thread == my_thread)
1408
         continue;
1409

1410
      kern_return_t kern_result;
1411
      do {
1412
         kern_result = thread_resume(thread->port);
1413
      } while (kern_result == KERN_ABORTED);
1414

1415
      if (kern_result != KERN_SUCCESS)
1416
         fatal_trace("failed to resume thread %d (%d)", i, kern_result);
1417
   }
1418
#else
1419
   assert(sem_trywait(&stop_sem) == -1 && errno == EAGAIN);
929✔
1420

1421
   int signalled = 0;
1422
   for (int i = 0; i <= maxthread; i++) {
4,336✔
1423
      nvc_thread_t *thread = atomic_load(&threads[i]);
3,407✔
1424
      if (thread == NULL || thread == my_thread)
3,407✔
1425
         continue;
1,410✔
1426

1427
      PTHREAD_CHECK(pthread_kill, thread->handle, SIGRESUME);
1,997✔
1428
      signalled++;
1,997✔
1429
   }
1430

1431
   struct timespec ts;
929✔
1432
   if (clock_gettime(CLOCK_REALTIME, &ts) != 0)
929✔
1433
      fatal_errno("clock_gettime");
×
1434

1435
   ts.tv_sec += SUSPEND_TIMEOUT;
929✔
1436

1437
   for (; signalled > 0; signalled--) {
2,926✔
1438
      if (sem_timedwait(&stop_sem, &ts) != 0)
1,997✔
1439
         fatal_trace("timeout waiting for %d threads to resume", signalled);
×
1440
   }
1441

1442
   assert(sem_trywait(&stop_sem) == -1 && errno == EAGAIN);
929✔
1443
#endif
1444

1445
   nvc_unlock(&stop_lock);
929✔
1446
}
929✔
1447

1448
void thread_wx_mode(wx_mode_t mode)
14,767✔
1449
{
1450
#ifdef __APPLE__
1451
   pthread_jit_write_protect_np(mode == WX_EXECUTE);
1452
#else
1453
   // Could use Intel memory protection keys here
1454
#endif
1455
}
14,767✔
1456

1457
barrier_t *barrier_new(int count)
1✔
1458
{
1459
   barrier_t *b = xcalloc(sizeof(barrier_t));
1✔
1460
   b->count = count;
1✔
1461
   return b;
1✔
1462
}
1463

1464
void barrier_free(barrier_t *b)
1✔
1465
{
1466
   free(b);
1✔
1467
}
1✔
1468

1469
void barrier_wait(barrier_t *b)
88✔
1470
{
1471
   const int count = relaxed_load(&b->count);
88✔
1472
   const int passed = relaxed_load(&b->passed);
88✔
1473

1474
   if (atomic_fetch_add(&b->reached, 1) == count - 1) {
88✔
1475
      // Last thread to pass barrier
1476
      relaxed_store(&b->reached, 0);
22✔
1477
      store_release(&b->passed, passed + 1);
22✔
1478
   }
1479
   else {
1480
      while (load_acquire(&b->passed) == passed)
122,000✔
1481
         progressive_backoff();
121,934✔
1482
   }
1483
}
88✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc