• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

krakjoe / ort / 16440320110

22 Jul 2025 09:27AM UTC coverage: 92.876% (-0.3%) from 93.144%
16440320110

push

github

krakjoe
develop better scaling internally and externally ... for many reasons

235 of 248 new or added lines in 6 files covered. (94.76%)

14 existing lines in 2 files now uncovered.

5828 of 6275 relevant lines covered (92.88%)

81892.33 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.67
/src/maths/pool.c
1
/*
2
 +----------------------------------------------------------------------+
3
 | ort                                                                  |
4
 +----------------------------------------------------------------------+
5
 | Copyright (c) Joe Watkins 2025                                       |
6
 +----------------------------------------------------------------------+
7
 | This source file is subject to version 3.01 of the PHP license,      |
8
 | that is bundled with this package in the file LICENSE, and is        |
9
 | available through the world-wide-web at the following url:           |
10
 | http://www.php.net/license/3_01.txt                                  |
11
 | If you did not receive a copy of the PHP license and are unable to   |
12
 | obtain it through the world-wide-web, please send a note to          |
13
 | license@php.net so we can mail you a copy immediately.               |
14
 +----------------------------------------------------------------------+
15
 | Author: krakjoe                                                      |
16
 +----------------------------------------------------------------------+
17
*/
18
#if !defined(_WIN32) && !defined(_GNU_SOURCE)
19
# define _GNU_SOURCE
20
#endif
21

22
#include "maths/cast.h"
23
#include "maths/core.h"
24
#include "maths/pool.h"
25
#include "maths/result.h"
26

27
#if defined(_WIN32)
28
# include <windows.h>
29
 typedef HANDLE ort_thread_t;
30
 typedef CRITICAL_SECTION ort_mutex_t;
31
 typedef CONDITION_VARIABLE ort_cond_t;
32
#define ort_pool_mutex_init(mutex) \
33
   InitializeCriticalSection(mutex)
34
#define ort_pool_mutex_lock(mutex) \
35
   EnterCriticalSection(mutex)
36
#define ort_pool_mutex_unlock(mutex) \
37
   LeaveCriticalSection(mutex)
38
#define ort_pool_mutex_destroy(mutex) \
39
   DeleteCriticalSection(mutex)
40
#define ort_pool_cond_init(cond) \
41
   InitializeConditionVariable(cond)
42
#define ort_pool_cond_wait(cond, mutex) \
43
   SleepConditionVariableCS(cond, mutex, INFINITE)
44
#define ort_pool_cond_signal(cond) \
45
   WakeConditionVariable(cond)
46
#define ort_pool_cond_broadcast(cond) \
47
   WakeAllConditionVariable(cond)
48
#define ort_pool_cond_destroy(cond)
49
#define ort_pool_thread_self() \
50
   GetCurrentThread()
51
#else
52
# include <pthread.h>
53
# include <sched.h>
54
# include <unistd.h>
55
 typedef pthread_t ort_thread_t;
56
 typedef pthread_mutex_t ort_mutex_t;
57
 typedef pthread_cond_t ort_cond_t;
58
#define ort_pool_mutex_init(mutex) \
59
   pthread_mutex_init(mutex, NULL)
60
#define ort_pool_mutex_lock(mutex) \
61
   pthread_mutex_lock(mutex)
62
#define ort_pool_mutex_unlock(mutex) \
63
   pthread_mutex_unlock(mutex)
64
#define ort_pool_mutex_destroy(mutex) \
65
   pthread_mutex_destroy(mutex)
66
#define ort_pool_cond_init(cond) \
67
   pthread_cond_init(cond, NULL)
68
#define ort_pool_cond_wait(cond, mutex) \
69
   pthread_cond_wait(cond, mutex)
70
#define ort_pool_cond_signal(cond) \
71
   pthread_cond_signal(cond)
72
#define ort_pool_cond_broadcast(cond) \
73
   pthread_cond_broadcast(cond)
74
#define ort_pool_cond_destroy(cond) \
75
   pthread_cond_destroy(cond)
76
#define ort_pool_thread_self() \
77
   pthread_self()
78
#endif
79

80
#define _ORT_POOL_PAD(TYPE, CACHE, SIZE) \
81
    TYPE pad[CACHE - (SIZE)]
82

83
typedef struct _ort_task_t {
84
    ort_task_func_t func;
85
    void *arg;
86
    size_t count;
87
    volatile int completed;
88
    /* 
89
    This wastes a few bytes, but eliminates the possibility of false sharing:
90
    
91
      Essentially, because this structure doesn't fill a cache line, we padd to that width.
92
      This means that no two adjacent structures share a cache line, which avoids false sharing.
93

94
    Exhaustive explanation: https://en.wikipedia.org/wiki/False_sharing
95

96
    TL;DR: It's worth a few bytes ...
97
    */
98
    _ORT_POOL_PAD(
99
        uint8_t,
100
        64,
101
        sizeof(ort_task_func_t) +
102
        sizeof(void*)           +
103
        sizeof(size_t)          +
104
        sizeof(int));
105
} ort_task_t;
106

107
typedef struct _ort_pool_t {
108
   ort_thread_t *threads;
109
   size_t     size;
110
   ort_task_t *slots;
111
   volatile size_t activated;
112
   ort_mutex_t mutex;
113
   ort_cond_t cond;
114
   int stop;
115
} ort_pool_t;
116

117
ORT_TLS ort_pool_t __ort_pool;
118
ORT_TLS size_t     __ort_pool_cores = 0;
119
ORT_TLS size_t     __ort_pool_threshold = 0;
120

121
void ort_pool_binary_worker(void *arg, size_t index, size_t count) {
6,688✔
122
   ort_pool_binary_ctx_t *ctx =
6,688✔
123
       (ort_pool_binary_ctx_t *)arg;
124

125
   size_t chunk = ctx->layout.chunk;
6,688✔
126
   size_t start = index * chunk;
6,688✔
127
   size_t end   = start + chunk;
6,688✔
128

129
   if (end > ctx->layout.total)
6,688✔
130
       end = ctx->layout.total;
131
   
132
   size_t n = end - start;
6,688✔
133
   if (n > 0) {
6,688✔
134
       ctx->op(
6,688✔
135
           (char*)ctx->result    + start * ctx->layout.element,
6,688✔
136
           (char*)ctx->a         + start * ctx->layout.element,
6,688✔
137
           (char*)ctx->b         + start * ctx->layout.element,
6,688✔
138
           n
139
       );
140
   }
141
}
6,688✔
142

143
void ort_pool_unary_worker(void *arg, size_t index, size_t count) {
8,016✔
144
   ort_pool_unary_ctx_t *ctx =
8,016✔
145
       (ort_pool_unary_ctx_t *)arg;
146

147
   size_t chunk = ctx->layout.chunk;
8,016✔
148
   size_t start = index * chunk;
8,016✔
149
   size_t end   = start + chunk;
8,016✔
150

151
   if (end > ctx->layout.total)
8,016✔
152
       end = ctx->layout.total;
153
   
154
   size_t n = end - start;
8,016✔
155

156
   if (n > 0) {
8,016✔
157
       ctx->op(
8,016✔
158
           (char*)ctx->result    + start * ctx->layout.element,
8,016✔
159
           (char*)ctx->a         + start * ctx->layout.element,
8,016✔
160
           n
161
       );
162
   }
163
}
8,016✔
164

165
void ort_pool_scalar_worker(void *arg, size_t index, size_t count) {
5,936✔
166
   ort_pool_scalar_ctx_t *ctx =
5,936✔
167
       (ort_pool_scalar_ctx_t *)arg;
168
   size_t chunk = ctx->layout.chunk;
5,936✔
169
   size_t start = index * chunk;
5,936✔
170
   size_t end   = start + chunk;
5,936✔
171

172
   if (end > ctx->layout.total)
5,936✔
173
       end = ctx->layout.total;
174
   
175
   size_t n = end - start;
5,936✔
176

177
   if (n > 0) {
5,936✔
178
       ctx->op(
5,936✔
179
           (char*)ctx->result    + start * ctx->layout.element,
5,936✔
180
           (char*)ctx->a         + start * ctx->layout.element,
5,936✔
181
           ctx->b,
5,936✔
182
           n
183
       );
184
   }
185
}
5,936✔
186

187
void ort_pool_reduce_tensor_worker(void *arg, size_t index, size_t count) {
592✔
188
   ort_pool_reduce_tensor_ctx_t *ctx =
592✔
189
       (ort_pool_reduce_tensor_ctx_t *)arg;
190
   size_t chunk = ctx->layout.chunk;
592✔
191
   size_t start = index * chunk;
592✔
192
   size_t end = start + chunk;
592✔
193

194
   if (end > ctx->layout.total)
592✔
195
       end = ctx->layout.total;
196
   
197
   size_t n = end - start;
592✔
198

199
   if (n > 0) {
592✔
200
       // Each thread reduces a chunk of the input, but for full reduction to scalar,
201
       // only one thread should do the reduction (to avoid race conditions).
202
       // For now, only thread 0 does the reduction.
203
       if (start == 0) {
592✔
204
           ctx->op(
592✔
205
               ctx->result, ctx->a, ctx->elements);
206
       }
207
   }
208
}
592✔
209

210
void ort_pool_reduce_axis_worker(void *arg, size_t index, size_t count) {
2,480✔
211
   ort_pool_reduce_axis_ctx_t *ctx =
2,480✔
212
       (ort_pool_reduce_axis_ctx_t *)arg;
213

214
   size_t chunk     = ctx->layout.chunk;
2,480✔
215
   size_t outer     = ctx->layout.outer;
2,480✔
216
   size_t inner     = ctx->layout.inner;
2,480✔
217
   size_t axis_size = ctx->layout.axis_size;
2,480✔
218
   size_t element   = ctx->layout.element;
2,480✔
219

220
   size_t start = index * chunk;
2,480✔
221
   size_t end = start + chunk;
2,480✔
222
   if (end > outer)
2,480✔
223
       end = outer;
224

225
   if (start < end) {
2,480✔
226
       void *result_ptr =
2,480✔
227
           (char*)ctx->result +
2,480✔
228
               start * inner * element;
2,480✔
229
       const void *a_ptr =
2,480✔
230
           (const char*)ctx->a + 
2,480✔
231
               start * axis_size * inner * element;
2,480✔
232
       ctx->op(
2,480✔
233
           result_ptr,
234
           a_ptr,
235
           end - start,
236
           axis_size,
237
           inner);
238
   }
239
}
2,480✔
240

241
void ort_pool_matmul_worker(void *arg, size_t index, size_t count) {
464✔
242
   ort_pool_matmul_ctx_t *ctx = 
464✔
243
       (ort_pool_matmul_ctx_t *)arg;
244
   size_t chunk = ctx->layout.chunk;
464✔
245
   size_t start = index * chunk;
464✔
246
   size_t end = start + chunk;
464✔
247

248
   if (end > ctx->layout.total)
464✔
249
       end = ctx->layout.total;
250

251
   // Each thread computes a range of output rows
252
   for (size_t row = start; row < end; ++row) {
39,904✔
253
       void *result_ptr =
39,440✔
254
           (char*)ctx->result +
39,440✔
255
               row * ctx->b_cols * ctx->type_size;
39,440✔
256
       const void *a_ptr =
39,440✔
257
           (const char*)ctx->a +
39,440✔
258
               row * ctx->a_cols * ctx->type_size;
39,440✔
259
       const void *b_ptr = ctx->b;
39,440✔
260

261
       ctx->op(
39,440✔
262
           result_ptr,
263
           a_ptr, b_ptr,
264
           ctx->a_cols, ctx->b_cols);
265
   }
266
}
464✔
267

268
void ort_pool_cast_worker(void *arg, size_t index, size_t count) {
32✔
269
   ort_pool_cast_ctx_t *ctx =
32✔
270
       (ort_pool_cast_ctx_t *)arg;
271

272
   size_t chunk = (ctx->count + count - 1) / count;
32✔
273
   size_t start = index * chunk;
32✔
274
   size_t end = start + chunk;
32✔
275

276
   if (end > ctx->count)
32✔
277
       end = ctx->count;
278

279
   size_t src_elem_size = php_ort_type_sizeof(ctx->src_type);
32✔
280
   size_t dst_elem_size = php_ort_type_sizeof(ctx->dst_type);
32✔
281

282
   const char *src =
32✔
283
       (const char *)ctx->src +
32✔
284
           start * src_elem_size;
32✔
285
   char *dst =
32✔
286
       (char *)ctx->dst +
32✔
287
           start * dst_elem_size;
32✔
288

289
   for (size_t i = 0; i < end - start; ++i) {
98,336✔
290
       ctx->op(
98,304✔
291
           src + i * src_elem_size,
98,304✔
292
           dst + i * dst_elem_size,
98,304✔
293
           ctx->src_type,
294
           ctx->dst_type
295
       );
296
   }
297
}
32✔
298

299
/*
300
* Pin the current thread to the target core.
301
*
302
* We don't leave it to the scheduler to decide how to distribute threads, beause this leads to non-determinisic behavior.
303
* 
304
* Instead, we pin each thread to the target core (determined by topology or ORT_SCALE_CORES), this distributes the threads evenly.
305
*/
306
static void ort_pool_pin(size_t index) {
6,704✔
307
   /* We set affinity and wait for the scheduler to migrate the thread (if necessary) */
308
#ifdef _WIN32
309
   DWORD_PTR mask =
310
       ((DWORD_PTR)1) << (DWORD)index;
311
   SetThreadAffinityMask(
312
       GetCurrentThread(), mask);
313
   /* The windows scheduler can ignore us ...*/
314
   uint32_t max = 100;
315
   while (GetCurrentProcessorNumber() != (DWORD) index) {
316
       Sleep(0);
317
       if (--max) {
318
           /* So we ignore it right back ... */
319

320
           /*
321
             This is a platform limitation, the scheduler does not
322
             have to migrate the thread.
323

324
             We stick around a little, to give it a chance, then move on
325
           */
326
           break;
327
       }
328
   }
329
#else
330
   cpu_set_t set;
6,704✔
331
   CPU_ZERO(&set);
6,704✔
332
   CPU_SET(index, &set);
6,704✔
333
       pthread_setaffinity_np(
6,704✔
334
           pthread_self(), sizeof(set), &set);
335

336
   while (sched_getcpu() != index) {
6,704✔
NEW
337
       sched_yield();
×
338
   }
339
#endif
340
   /* The thread is now executing on the target core, as determined by topology or ORT_SCALE_CORES */
341
}
6,704✔
342

343
static zend_always_inline size_t
344
   ort_pool_worker_indexof(
3,392✔
345
       ort_pool_t* pool, ort_thread_t thread) {
346
   for (size_t i = 0; i < pool->size; ++i) {
3,520✔
347
       if (pool->threads[i] == thread) {
3,520✔
348
           return i;
349
       }
350
   }
351

352
   assert(0);
353
   /* This is unreachable */
354
   return SIZE_MAX;
355
}
356

357
static void *ort_pool_worker(void *arg) {
3,392✔
358
   ort_pool_t *pool = (ort_pool_t *)arg;
3,392✔
359

360
   /* Retrieve ident of thread */
361
   size_t idx = ort_pool_worker_indexof(
3,392✔
362
       pool, ort_pool_thread_self());
363

364
   /* Pin this thread to the target core */
365
   ort_pool_pin(idx);
3,392✔
366

367
   /* Startup the allocator for this thread */
368
   ort_alloc_startup();
3,392✔
369

370
   /* Startup math library for this thread */
371
   ort_math_startup();
3,392✔
372

373
   while (1) {
36,638✔
374
       ort_pool_mutex_lock(&pool->mutex);
36,638✔
375
       while (!pool->activated && !pool->stop) {
60,820✔
376
           ort_pool_cond_wait(&pool->cond, &pool->mutex);
24,182✔
377
       }
378
       if (pool->stop) {
36,657✔
379
           ort_pool_mutex_unlock(&pool->mutex);
3,392✔
380
           break;
3,392✔
381
       }
382

383
       /* Check our task slot */
384
       ort_task_t *task = &pool->slots[idx];
33,265✔
385
       if (task->func && !task->completed) {
33,265✔
386
           ort_pool_mutex_unlock(&pool->mutex);
24,208✔
387

388
           /* Execute the task assigned to this thread */
389
           task->func(task->arg, idx, task->count);
24,208✔
390

391
           /* Mark task as completed */
392
#if defined(_WIN32)
393
           InterlockedExchange((volatile LONG*)&task->completed, 1);
394
           if (InterlockedDecrement64((volatile LONGLONG*)&pool->activated) == 0) {
395
#else
396
           __sync_lock_test_and_set(&task->completed, 1);
24,208✔
397
           if (__sync_fetch_and_sub(&pool->activated, 1) == 1) {
24,208✔
398
#endif
399
               /* Last thread to complete, signal main thread */
400
               ort_pool_cond_broadcast(&pool->cond);
24,208✔
401
           }
402
       } else {
403
           ort_pool_mutex_unlock(&pool->mutex);
9,057✔
404
       }
405
   }
406

407
   /* Shutdown math library for this thread */
408
   ort_math_shutdown();
3,392✔
409

410
   /* Shutdown the allocator for this thread */
411
   ort_alloc_shutdown();
3,391✔
412

413
#if defined(_WIN32)
414
   return 0;
415
#else
416
   return NULL;
3,390✔
417
#endif
418
}
419

420
static inline size_t ort_pool_cores_env() {
3,312✔
421
   const char *env = getenv("ORT_SCALE_CORES");
3,312✔
422
   if (env) {
3,312✔
423
       char *endptr;
3,312✔
424
       long n = strtol(env, &endptr, 10);
3,312✔
425
       if (*endptr == '\0' && n > 0) {
3,312✔
426
           return (size_t)n;
3,312✔
427
       }
428
   }
429
   return 0; // No valid environment variable set
430
}
431

432
static inline size_t ort_pool_threshold_env() {
1,328✔
433
   const char *env = getenv("ORT_SCALE_THRESHOLD");
1,328✔
434
   if (env) {
1,328✔
435
       char *endptr;
16✔
436
       long n = strtol(env, &endptr, 10);
16✔
437
       if (*endptr == '\0' && n > 0) {
16✔
438
           return (size_t)n;
16✔
439
       }
440
   }
441
   return 0; // No valid environment variable set
442
}
443

444
size_t ort_pool_max(void) {
80✔
445
   return __ort_pool.size;
80✔
446
}
447

448
/* {{{ Retrieve the actual number of cores available (regardless of env) */
449
static zend_always_inline size_t ort_pool_threads(void) {
3,312✔
450
   size_t threads = 0;
3,312✔
451

452
#if defined(_WIN32)
453
   SYSTEM_INFO sysinfo;
454
   GetSystemInfo(&sysinfo);
455

456
   threads = (size_t)
457
       sysinfo.dwNumberOfProcessors;
458
#else
459
   threads = (size_t) sysconf(
6,624✔
460
       _SC_NPROCESSORS_ONLN);
461
#endif
462

463
   return threads;
3,312✔
464
} /* }}} */
465

466
size_t ort_pool_cores(void) {
10,288✔
467
   if (__ort_pool_cores > 0) {
10,288✔
468
       return __ort_pool_cores;
469
   }
470

471
   __ort_pool_cores = ort_pool_cores_env();
3,312✔
472
   if (__ort_pool_cores > 0) {
3,312✔
473
       /*
474
        We prohibit over subscription
475
       */
476
       size_t check = ort_pool_threads();
3,312✔
477

478
       /* We silently clamp this, and will document this behavior; It is too early in startup 
479
           to raise a graceful error in all cases. */
480
       if (__ort_pool_cores > check) {
3,312✔
481
           __ort_pool_cores = check;
16✔
482
       }
483

484
       return __ort_pool_cores;
3,312✔
485
   }
486

NEW
487
   __ort_pool_cores =
×
NEW
488
       ort_pool_threads();
×
489

NEW
490
   return __ort_pool_cores;
×
491
}
492

493
size_t ort_pool_threshold(void) {
23,664✔
494
   if (__ort_pool_threshold > 0) {
23,664✔
495
       return __ort_pool_threshold;
496
   }
497

498
   __ort_pool_threshold = ort_pool_threshold_env();
1,328✔
499
   if (__ort_pool_threshold > 0) {
1,328✔
500
       return __ort_pool_threshold;
501
   }
502

503
   __ort_pool_threshold = ORT_SCALE_THRESHOLD;
1,312✔
504

505
   return __ort_pool_threshold;
1,312✔
506
}
507

508
ort_pool_scale_t ort_pool_scale(ort_pool_scale_t* scale) {
64✔
509
   if (scale->kind & ORT_POOL_SCALE_ERROR) {
64✔
510
       /* undefined behavior */
NEW
511
       return *scale;
×
512
   }
513

514
   ort_pool_scale_t restore =
64✔
515
       (ort_pool_scale_t) {
516
       .kind = ORT_POOL_SCALE_CORES |
517
               ORT_POOL_SCALE_THRESHOLD,
518
       .cores     = __ort_pool_cores,
519
       .threshold = __ort_pool_threshold
520
   };
521

522
   if (scale->cores <= 0 || scale->cores > ort_pool_max()) {
64✔
523
       /** 
524
        We cannot scale below zero or beyond the limit of the numbers of cores
525
        This is probably a programming error
526
       **/
NEW
527
       restore.kind |= 
×
528
           ORT_POOL_SCALE_ERROR;
NEW
529
       return restore;
×
530
   }
531

532
   /* Perform threshold adjustment only if requested */
533
   if (scale->kind & ORT_POOL_SCALE_THRESHOLD) {
64✔
534
       if (scale->threshold <= 0) {
48✔
535
           /**
536
            * This must be a programming error
537
            */
538
           restore.kind |=
16✔
539
               ORT_POOL_SCALE_ERROR;
540
           return restore;
16✔
541
       }
542

543
       __ort_pool_threshold = (size_t) scale->threshold;
32✔
544
   }
545

546
   /*
547
   We always scale cores
548
   */
549
   __ort_pool_cores = (size_t) scale->cores;
48✔
550

551
   return restore;
48✔
552
}
553

554
int ort_pool_init(size_t size) {
3,312✔
555
   ort_pool_pin(0);
3,312✔
556

557
   memset(&__ort_pool, 0, sizeof(__ort_pool));
3,312✔
558
   if (size == 0)
3,312✔
NEW
559
       size = ort_pool_cores();
×
560
   __ort_pool.size = size;
3,312✔
561
   __ort_pool.threads =
3,312✔
562
       (ort_thread_t*)calloc(size, sizeof(ort_thread_t));
3,312✔
563
   if (!__ort_pool.threads) {
3,312✔
NEW
564
       __ort_pool.size = 0;
×
NEW
565
       return FAILURE;
×
566
   }
567
   __ort_pool.slots =
3,312✔
568
       (ort_task_t*)calloc(size, sizeof(ort_task_t));
3,312✔
569
   if (!__ort_pool.slots) {
3,312✔
NEW
570
       free(__ort_pool.threads);
×
NEW
571
       __ort_pool.size = 0;
×
NEW
572
       return FAILURE;
×
573
   }
574
   ort_pool_mutex_init(&__ort_pool.mutex);
3,312✔
575
   ort_pool_cond_init(&__ort_pool.cond);
3,312✔
576
   __ort_pool.stop = 0;
3,312✔
577

578
   for (size_t i = 0; i < size; ++i) {
6,704✔
579
#if defined(_WIN32)
580
       __ort_pool.threads[i] = CreateThread(
581
           NULL, 0,
582
           (LPTHREAD_START_ROUTINE)ort_pool_worker,
583
           &__ort_pool, 0, NULL);
584
#else
585
       pthread_create(
3,392✔
586
           &__ort_pool.threads[i],
3,392✔
587
           NULL,
588
           ort_pool_worker,
589
           &__ort_pool);
590
#endif
591
   }
592
   return SUCCESS;
593
}
594

595
void ort_pool_destroy() {
3,312✔
596
   ort_pool_mutex_lock(&__ort_pool.mutex);
3,312✔
597
   __ort_pool.stop = 1;
3,312✔
598
   ort_pool_cond_broadcast(&__ort_pool.cond);
3,312✔
599
   ort_pool_mutex_unlock(&__ort_pool.mutex);
3,312✔
600

601
   for (size_t i = 0; i < __ort_pool.size; ++i) {
6,704✔
602
#if defined(_WIN32)
603
       WaitForSingleObject(__ort_pool.threads[i], INFINITE);
604
       CloseHandle(__ort_pool.threads[i]);
605
#else
606
       pthread_join(__ort_pool.threads[i], NULL);
3,392✔
607
#endif
608
   }
609

610
   ort_pool_mutex_destroy(&__ort_pool.mutex);
3,312✔
611
   ort_pool_cond_destroy(&__ort_pool.cond);
3,312✔
612
   free(__ort_pool.threads);
3,312✔
613
   free(__ort_pool.slots);
3,312✔
614
}
3,312✔
615

616
void ort_pool_submit(ort_task_func_t func, void *arg, size_t count) {
24,208✔
617
   /* Prepare task slots for each thread */
618
   ort_pool_mutex_lock(&__ort_pool.mutex);
24,208✔
619

620
   /* Clear all task slots */
621
   memset(__ort_pool.slots, 0, __ort_pool.size * sizeof(ort_task_t));
24,208✔
622

623
   /* Distribute work across threads */
624
   __ort_pool.activated = 0;
24,208✔
625
   for (size_t i = 0; i < __ort_pool.size && i < count; ++i) {
48,416✔
626
       __ort_pool.slots[i].func = func;
24,208✔
627
       __ort_pool.slots[i].arg = arg;
24,208✔
628
       __ort_pool.slots[i].count = count;
24,208✔
629
       __ort_pool.activated++;
24,208✔
630
   }
631
   
632
   /* Wake all threads */
633
   ort_pool_cond_broadcast(&__ort_pool.cond);
24,208✔
634
   ort_pool_mutex_unlock(&__ort_pool.mutex);
24,208✔
635

636
   /* Wait for completion */
637
   ort_pool_mutex_lock(&__ort_pool.mutex);
24,208✔
638
   while (__ort_pool.activated > 0) {
28,241✔
639
      ort_pool_cond_wait(
4,033✔
640
         &__ort_pool.cond, &__ort_pool.mutex);
641
   }
642

643
   /* Clearup slots (dont do strange things during spurious wakeups) */
644
   for (size_t i = 0; i < __ort_pool.size && i < count; ++i) {
48,416✔
645
       __ort_pool.slots[i].func = NULL;
24,208✔
646
       __ort_pool.slots[i].arg = NULL;
24,208✔
647
       __ort_pool.slots[i].completed = 1;
24,208✔
648
   }
649

650
   ort_pool_mutex_unlock(&__ort_pool.mutex);
24,208✔
651
}
24,208✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc