• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

krakjoe / parallel / 20458908037

23 Dec 2025 11:00AM UTC coverage: 93.79% (-1.2%) from 94.99%
20458908037

Pull #363

github

web-flow
Merge a62b0c626 into 3c00d4ed1
Pull Request #363: Fix cache key collision

7 of 9 new or added lines in 1 file covered. (77.78%)

31 existing lines in 4 files now uncovered.

2764 of 2947 relevant lines covered (93.79%)

749.68 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.95
/src/scheduler.c
1
/*
2
  +----------------------------------------------------------------------+
3
  | parallel                                                             |
4
  +----------------------------------------------------------------------+
5
  | Copyright (c) Joe Watkins 2019-2024                                  |
6
  +----------------------------------------------------------------------+
7
  | This source file is subject to version 3.01 of the PHP license,      |
8
  | that is bundled with this package in the file LICENSE, and is        |
9
  | available through the world-wide-web at the following url:           |
10
  | http://www.php.net/license/3_01.txt                                  |
11
  | If you did not receive a copy of the PHP license and are unable to   |
12
  | obtain it through the world-wide-web, please send a note to          |
13
  | license@php.net so we can mail you a copy immediately.               |
14
  +----------------------------------------------------------------------+
15
  | Author: krakjoe                                                      |
16
  +----------------------------------------------------------------------+
17
 */
18
#ifndef HAVE_PARALLEL_SCHEDULER
19
#define HAVE_PARALLEL_SCHEDULER
20

21
#include "../php_parallel.h"
22
#include "parallel.h"
23
#include "zend_types.h"
24

25
#ifdef _WIN32
26
#include <windows.h>
27
#else
28
#include <signal.h>
29
#endif
30

31
TSRM_TLS php_parallel_runtime_t *php_parallel_scheduler_context = NULL;
32
TSRM_TLS php_parallel_future_t  *php_parallel_scheduler_future = NULL;
33

34
void (*zend_interrupt_handler)(zend_execute_data *) = NULL;
35

36
#ifdef _WIN32
37
void       *php_parallel_veh_handle = NULL;
38

39
LONG WINAPI php_parallel_veh_handler(PEXCEPTION_POINTERS exceptionInfo)
40
{
41
        if (exceptionInfo->ExceptionRecord->ExceptionCode == EXCEPTION_ACCESS_VIOLATION) {
42
                if (php_parallel_scheduler_context) {
43
                        php_parallel_scheduler_context->crashed = 1;
44

45
                        if (EG(current_execute_data)) {
46
                                if (EG(current_execute_data)->opline) {
47
                                        const zend_op *opline = EG(current_execute_data)->opline;
48

49
                                        php_parallel_scheduler_context->line = opline->lineno;
50
                                        if (opline->opcode == ZEND_INIT_FCALL) {
51
                                                zval *name = RT_CONSTANT(opline, opline->op2);
52

53
                                                if (Z_TYPE_P(name) == IS_STRING) {
54
                                                        if (!zend_hash_exists(EG(function_table), Z_STR_P(name))) {
55
                                                                php_parallel_scheduler_context->missing = Z_STR_P(name);
56
                                                        }
57
                                                }
58
                                        }
59
                                }
60
                                if (EG(current_execute_data)->func && EG(current_execute_data)->func->op_array.filename) {
61
                                        php_parallel_scheduler_context->file = EG(current_execute_data)->func->op_array.filename;
62
                                }
63
                        }
64
                        zend_bailout();
65
                }
66
        }
67
        return EXCEPTION_CONTINUE_SEARCH;
68
}
69
#else
70
struct sigaction php_parallel_old_sigsegv_action;
71

72
static void      php_parallel_sigsegv_handler(int sig, siginfo_t *info, void *context)
×
73
{
74
        // Only handle this SIGSEGV if this is a parallel thread
75
        if (php_parallel_scheduler_context) {
×
76
                php_parallel_scheduler_context->crashed = 1;
×
77

78
                if (EG(current_execute_data)) {
×
79
                        if (EG(current_execute_data)->opline) {
×
80
                                const zend_op *opline = EG(current_execute_data)->opline;
×
81

82
                                php_parallel_scheduler_context->line = opline->lineno;
×
83
                                if (opline->opcode == ZEND_INIT_FCALL) {
×
84
                                        zval *name = RT_CONSTANT(opline, opline->op2);
×
85

86
                                        if (Z_TYPE_P(name) == IS_STRING) {
×
87
                                                if (!zend_hash_exists(EG(function_table), Z_STR_P(name))) {
×
88
                                                        php_parallel_scheduler_context->missing = Z_STR_P(name);
×
89
                                                }
90
                                        }
91
                                }
92
                        }
93
                        if (EG(current_execute_data)->func && EG(current_execute_data)->func->op_array.filename) {
×
94
                                php_parallel_scheduler_context->file = EG(current_execute_data)->func->op_array.filename;
×
95
                        }
96
                }
97
                zend_bailout();
×
98
        }
99

100
        if (php_parallel_old_sigsegv_action.sa_flags & SA_SIGINFO) {
×
101
                php_parallel_old_sigsegv_action.sa_sigaction(sig, info, context);
×
102
        } else if (php_parallel_old_sigsegv_action.sa_handler == SIG_DFL) {
×
103
                signal(sig, SIG_DFL);
×
104
                raise(sig);
×
105
        } else if (php_parallel_old_sigsegv_action.sa_handler == SIG_IGN) {
×
106
                // ignore
107
        } else {
108
                php_parallel_old_sigsegv_action.sa_handler(sig);
×
109
        }
110
}
×
111
#endif
112

113
static zend_always_inline int php_parallel_scheduler_list_delete(void *lhs, void *rhs) { return lhs == rhs; }
236✔
114

115
static void                   php_parallel_schedule_free_function(zend_function *function)
10✔
116
{
117
        if (function->op_array.static_variables) {
10✔
118
                php_parallel_copy_hash_dtor(function->op_array.static_variables, 1);
×
119
                ZEND_MAP_PTR_SET(function->op_array.static_variables_ptr, NULL);
×
120
                function->op_array.static_variables = NULL;
×
121
        }
122

123
#if PHP_VERSION_ID >= 80100
124
        if (function->op_array.num_dynamic_func_defs) {
10✔
125
                uint32_t it = 0;
126

UNCOV
127
                while (it < function->op_array.num_dynamic_func_defs) {
×
UNCOV
128
                        php_parallel_schedule_free_function((zend_function *)function->op_array.dynamic_func_defs[it]);
×
UNCOV
129
                        it++;
×
130
                }
131
        }
132
#endif
133
}
10✔
134

135
static void php_parallel_schedule_free(void *scheduleed)
238✔
136
{
137
        php_parallel_schedule_el_t *el = (php_parallel_schedule_el_t *)scheduleed;
238✔
138
        zval                       *slot = (zval *)ZEND_CALL_ARG(el->frame, 1), *end = slot + ZEND_CALL_NUM_ARGS(el->frame);
238✔
139

140
        while (slot < end) {
318✔
141
                PARALLEL_ZVAL_DTOR(slot);
80✔
142
                slot++;
80✔
143
        }
144

145
        if (el->frame->func) {
238✔
146
                php_parallel_schedule_free_function(el->frame->func);
10✔
147
                pefree(el->frame->func, 1);
10✔
148
        }
149

150
        pefree(el->frame, 1);
238✔
151
}
238✔
152

153
void php_parallel_scheduler_init(php_parallel_runtime_t *runtime)
262✔
154
{
155
        zend_llist_init(&runtime->schedule, sizeof(php_parallel_schedule_el_t), php_parallel_schedule_free, 1);
262✔
156
}
262✔
157

158
void php_parallel_scheduler_destroy(php_parallel_runtime_t *runtime) { zend_llist_destroy(&runtime->schedule); }
262✔
159

160
static zend_always_inline php_parallel_runtime_t *php_parallel_scheduler_setup(php_parallel_runtime_t *runtime)
262✔
161
{
162
        php_parallel_scheduler_context = runtime;
262✔
163

164
        ts_resource(0);
262✔
165

166
        TSRMLS_CACHE_UPDATE();
262✔
167

168
        SG(server_context) = runtime->parent.server;
262✔
169
        SG(request_info).argc = runtime->parent.argc;
262✔
170
        SG(request_info).argv = runtime->parent.argv;
262✔
171

172
        runtime->child.interrupt = &EG(vm_interrupt);
262✔
173

174
        PG(expose_php) = 0;
262✔
175
        PG(auto_globals_jit) = 1;
262✔
176
#if PHP_VERSION_ID >= 80100
177
        PG(enable_dl) = false;
262✔
178
#else
179
        PG(enable_dl) = 0;
180
#endif
181

182
        php_request_startup();
262✔
183

184
        PG(during_request_startup) = 0;
262✔
185
        SG(sapi_started) = 0;
262✔
186
        SG(headers_sent) = 1;
262✔
187
        SG(request_info).no_headers = 1;
262✔
188

189
        return runtime;
262✔
190
}
191

192
static zend_always_inline void php_parallel_scheduler_exit(php_parallel_runtime_t *runtime)
262✔
193
{
194
        php_parallel_monitor_set(runtime->monitor, PHP_PARALLEL_DONE);
262✔
195

196
        php_request_shutdown(NULL);
262✔
197

198
        ts_free_thread();
262✔
199

200
        pthread_exit(NULL);
262✔
201
}
202

203
static zend_always_inline void php_parallel_scheduler_add(php_parallel_runtime_t *runtime,
238✔
204
                                                          const zend_function *function, zval *argv,
205
                                                          php_parallel_future_t *future)
206
{
207
        php_parallel_schedule_el_t el;
238✔
208
        uint32_t                   argc = argv && Z_TYPE_P(argv) == IS_ARRAY ? zend_hash_num_elements(Z_ARRVAL_P(argv)) : 0;
90✔
209

210
        zend_execute_data         *frame =
238✔
211
            (zend_execute_data *)pecalloc(1, zend_vm_calc_used_stack(argc, (zend_function *)function), 1);
238✔
212

213
        frame->func = php_parallel_cache_closure(function, NULL);
238✔
214

215
        php_parallel_dependencies_store(frame->func);
238✔
216

217
        if (argv && Z_TYPE_P(argv) == IS_ARRAY) {
328✔
218
                zval                        *slot = ZEND_CALL_ARG(frame, 1);
90✔
219
                zval                        *param;
90✔
220
                uint32_t                     argc = 0;
90✔
221
                php_parallel_copy_context_t *context, *restore;
90✔
222

223
                context = php_parallel_copy_context_start(PHP_PARALLEL_COPY_DIRECTION_PERSISTENT, &restore);
90✔
224

225
                ZEND_HASH_FOREACH_VAL(Z_ARRVAL_P(argv), param)
170✔
226
                {
227
                        PARALLEL_ZVAL_COPY(slot, param, 1);
80✔
228
                        slot++;
80✔
229
                        argc++;
80✔
230
                }
231
                ZEND_HASH_FOREACH_END();
232

233
                ZEND_CALL_NUM_ARGS(frame) = argc;
90✔
234

235
                php_parallel_copy_context_end(context, restore);
90✔
236
        } else {
237
                ZEND_CALL_NUM_ARGS(frame) = 0;
148✔
238
        }
239

240
        if (future) {
238✔
241
                frame->return_value = &future->value;
136✔
242
                Z_PTR(frame->This) = future;
136✔
243
        }
244

245
        el.frame = frame;
238✔
246

247
        zend_llist_add_element(&runtime->schedule, &el);
238✔
248

249
        if (future) {
238✔
250
                future->runtime = runtime;
136✔
251
                future->handle = runtime->schedule.tail->data;
136✔
252
                GC_ADDREF(&runtime->std);
136✔
253
        }
254
}
255

256
static zend_always_inline bool php_parallel_scheduler_empty(php_parallel_runtime_t *runtime)
357✔
257
{
258
        return !zend_llist_count(&runtime->schedule);
357✔
259
}
260

261
bool php_parallel_scheduler_busy(php_parallel_runtime_t *runtime)
98✔
262
{
263
        bool busy = 1;
98✔
264

265
        php_parallel_monitor_lock(runtime->monitor);
98✔
266
        if (php_parallel_scheduler_empty(runtime)) {
98✔
267
                if (!php_parallel_monitor_check(runtime->monitor, PHP_PARALLEL_RUNNING)) {
78✔
268
                        busy = 0;
2✔
269
                }
270
        }
271
        php_parallel_monitor_unlock(runtime->monitor);
98✔
272

273
        return busy;
98✔
274
}
275

276
static void php_parallel_scheduler_pull(zend_function *function)
258✔
277
{
278
        if (function->op_array.static_variables) {
258✔
279
                HashTable *statics = function->op_array.static_variables;
38✔
280

281
                function->op_array.static_variables = php_parallel_copy_hash_ctor(statics, 0);
38✔
282

283
#if PHP_VERSION_ID >= 80200
284
                ZEND_MAP_PTR_INIT(function->op_array.static_variables_ptr, function->op_array.static_variables);
38✔
285
#else
286
                ZEND_MAP_PTR_INIT(function->op_array.static_variables_ptr, &function->op_array.static_variables);
287
#endif
288

289
                if (GC_FLAGS(statics) & IS_ARRAY_PERSISTENT) {
38✔
290
                        php_parallel_copy_hash_dtor(statics, 1);
20✔
291
                }
292
        }
293

294
#if PHP_VERSION_ID < 80200
295
        ZEND_MAP_PTR_NEW(function->op_array.run_time_cache);
296
#else
297
        ZEND_MAP_PTR_INIT(function->op_array.run_time_cache, NULL);
258✔
298
#endif
299

300
#if PHP_VERSION_ID >= 80100
301
        if (function->op_array.num_dynamic_func_defs) {
258✔
302
                uint32_t it = 0;
303

304
                while (it < function->op_array.num_dynamic_func_defs) {
56✔
305
                        php_parallel_scheduler_pull((zend_function *)function->op_array.dynamic_func_defs[it]);
30✔
306
                        it++;
30✔
307
                }
308
        }
309
#endif
310
}
258✔
311

312
static void php_parallel_scheduler_clean(zend_function *function)
258✔
313
{
314
        if (function->op_array.static_variables) {
258✔
315
                HashTable *statics = ZEND_MAP_PTR_GET(function->op_array.static_variables_ptr);
38✔
316

317
                if (!(GC_FLAGS(statics) & IS_ARRAY_IMMUTABLE)) {
38✔
318
                        zend_array_destroy(statics);
38✔
319
                        ZEND_MAP_PTR_SET(function->op_array.static_variables_ptr, NULL);
38✔
320
                        function->op_array.static_variables = NULL;
38✔
321
                }
322
        }
323

324
#if PHP_VERSION_ID >= 80100
325
        if (function->op_array.num_dynamic_func_defs) {
258✔
326
                uint32_t it = 0;
327

328
                while (it < function->op_array.num_dynamic_func_defs) {
56✔
329
                        php_parallel_scheduler_clean((zend_function *)function->op_array.dynamic_func_defs[it]);
30✔
330
                        pefree(function->op_array.dynamic_func_defs[it], 1);
30✔
331
                        it++;
30✔
332
                }
333
                /* Free the dynamic_func_defs array itself */
334
                pefree(function->op_array.dynamic_func_defs, 1);
26✔
335
                function->op_array.dynamic_func_defs = NULL;
26✔
336
        }
337
#endif
338
}
258✔
339

340
static zend_always_inline bool php_parallel_scheduler_pop(php_parallel_runtime_t     *runtime,
696✔
341
                                                          php_parallel_schedule_el_t *el)
342
{
343
        php_parallel_schedule_el_t *head;
696✔
344
        zend_class_entry           *scope = NULL;
696✔
345
        zend_function              *function = NULL;
696✔
346

347
        if (zend_llist_count(&runtime->schedule) == 0) {
696✔
348
                return 0;
349
        }
350

351
        head = zend_llist_get_first(&runtime->schedule);
228✔
352
        function = head->frame->func;
228✔
353
        head->frame->func = NULL;
228✔
354

355
        if (function->op_array.scope && function->op_array.scope->type == ZEND_USER_CLASS) {
228✔
356
                scope = php_parallel_copy_scope(function->op_array.scope);
2✔
357
        }
358

359
        el->frame = zend_vm_stack_push_call_frame(ZEND_CALL_TOP_FUNCTION, php_parallel_copy_function(function, 0),
228✔
360
                                                  ZEND_CALL_NUM_ARGS(head->frame), scope);
228✔
361

362
        if (scope != function->op_array.scope) {
228✔
363
                el->frame->func->op_array.scope = scope;
2✔
364
        }
365

366
        php_parallel_scheduler_pull(el->frame->func);
228✔
367

368
        if (ZEND_CALL_NUM_ARGS(head->frame)) {
228✔
369
                zval *slot = (zval *)ZEND_CALL_ARG(head->frame, 1), *end = slot + ZEND_CALL_NUM_ARGS(head->frame);
68✔
370
                zval *param = ZEND_CALL_ARG(el->frame, 1);
68✔
371

372
                php_parallel_copy_context_t *context, *restore;
68✔
373

374
                context = php_parallel_copy_context_start(PHP_PARALLEL_COPY_DIRECTION_THREAD, &restore);
68✔
375

376
                while (slot < end) {
144✔
377
                        PARALLEL_ZVAL_COPY(param, slot, 0);
76✔
378
                        slot++;
76✔
379
                        param++;
76✔
380
                }
381

382
                php_parallel_copy_context_end(context, restore);
68✔
383
        }
384

385
        zend_init_func_execute_data(el->frame, &el->frame->func->op_array, head->frame->return_value);
228✔
386

387
        php_parallel_scheduler_future = Z_PTR(head->frame->This);
228✔
388

389
        zend_llist_del_element(&runtime->schedule, head, php_parallel_scheduler_list_delete);
228✔
390

391
        return 1;
228✔
392
}
393

394
static void php_parallel_scheduler_run(php_parallel_runtime_t *runtime, zend_execute_data *frame)
228✔
395
{
396
        php_parallel_future_t *future = php_parallel_scheduler_future;
228✔
397

398
        runtime->crashed = 0;
228✔
399
        runtime->missing = NULL;
228✔
400
        runtime->file = NULL;
228✔
401
        runtime->line = 0;
228✔
402

403
        zend_first_try
228✔
404
        {
405
                zend_try
240✔
406
                {
407
                        zend_execute_ex(frame);
228✔
408

409
                        if (UNEXPECTED(EG(exception))) {
218✔
410
                                if (future) {
10✔
411
                                        php_parallel_exceptions_save(frame->return_value, EG(exception));
8✔
412

413
                                        php_parallel_monitor_set(future->monitor, PHP_PARALLEL_ERROR);
8✔
414
                                } else {
415
                                        zend_throw_exception_internal(NULL);
2✔
416
                                }
417
                        }
418
                }
419
                zend_catch
12✔
420
                {
421
                        if (runtime->crashed) {
12✔
422
                                zend_object *exception = NULL;
×
423
                                if (future) {
×
424
                                        char *message = NULL;
×
425

426
                                        zend_clear_exception();
×
427

428
                                        if (runtime->missing) {
×
429
                                                spprintf(
×
430
                                                    &message, 0,
431
                                                    "Call to undefined function %s() in task, please provide the function via a bootstrap file",
432
                                                    ZSTR_VAL(runtime->missing));
×
433
                                                exception = php_parallel_exception_object(php_parallel_runtime_error_illegal_instruction_ce,
×
434
                                                                                          message, 0, runtime->file, runtime->line);
×
435
                                                efree(message);
×
436
                                        } else {
437
                                                exception = php_parallel_exception_object(php_parallel_runtime_error_illegal_instruction_ce,
×
438
                                                                                          "illegal instruction (segmentation fault) in task", 0,
439
                                                                                          runtime->file, runtime->line);
×
440
                                        }
441

442
                                        php_parallel_monitor_lock(future->monitor);
×
443
                                        if (exception) {
×
444
                                                php_parallel_exceptions_save(frame->return_value, exception);
×
445
                                                OBJ_RELEASE(exception);
×
446
                                        }
447
                                        php_parallel_monitor_set(future->monitor, PHP_PARALLEL_READY | PHP_PARALLEL_ERROR);
×
448
                                        php_parallel_monitor_unlock(future->monitor);
×
449
                                }
450

451
                                // This runtime crashed, as such we can not give any guarantees
452
                                // anymore and it is best to shut down the thread as gracefuly
453
                                // as possible. By setting the `runtime->monitor` to
454
                                // `PHP_PARALLEL_KILLED` we make sure that this thread goes
455
                                // through shutdown and calls `pthread_ext()`
456
                                php_parallel_monitor_lock(runtime->monitor);
×
457
                                php_parallel_monitor_set(runtime->monitor, PHP_PARALLEL_DONE | PHP_PARALLEL_KILLED);
×
458
                                php_parallel_monitor_unlock(runtime->monitor);
×
459
                        } else if (future) {
12✔
460
                                php_parallel_monitor_lock(future->monitor);
8✔
461
                                if (!php_parallel_monitor_check(future->monitor, PHP_PARALLEL_CANCELLED)) {
8✔
462
                                        php_parallel_monitor_set(future->monitor, PHP_PARALLEL_KILLED);
2✔
463
                                }
464
                                php_parallel_monitor_unlock(future->monitor);
8✔
465
                        }
466
                }
467
                zend_end_try();
228✔
468

469
                if (frame->return_value && !Z_ISUNDEF_P(frame->return_value)) {
228✔
470
                        zval garbage = *frame->return_value;
120✔
471

472
                        if (Z_OPT_REFCOUNTED(garbage)) {
120✔
473
                                PARALLEL_ZVAL_COPY(frame->return_value, &garbage, 1);
18✔
474

475
                                zval_ptr_dtor(&garbage);
18✔
476
                        }
477
                }
478

479
                php_parallel_scheduler_clean(frame->func);
228✔
480

481
                pefree(frame->func, 1);
228✔
482

483
                zend_vm_stack_free_call_frame(frame);
228✔
484
        }
485
        zend_end_try();
228✔
486

487
        if (future) {
228✔
488
                php_parallel_monitor_set(future->monitor, PHP_PARALLEL_READY);
128✔
489
        }
490

491
        php_parallel_scheduler_future = NULL;
228✔
492
}
228✔
493

494
static void php_parallel_scheduler_killed(void *scheduled)
6✔
495
{
496
        php_parallel_schedule_el_t *el = (php_parallel_schedule_el_t *)scheduled;
6✔
497
        php_parallel_future_t      *future = Z_PTR(el->frame->This);
6✔
498

499
        if (future) {
6✔
500
                php_parallel_monitor_set(future->monitor, PHP_PARALLEL_READY | PHP_PARALLEL_KILLED);
4✔
501
        }
502
}
6✔
503

504
static zend_always_inline void php_parallel_scheduler_kill(php_parallel_runtime_t *runtime)
8✔
505
{
506
        zend_llist_apply(&runtime->schedule, php_parallel_scheduler_killed);
8✔
507
        zend_llist_clean(&runtime->schedule);
8✔
508
}
509

510
static zend_always_inline int php_parallel_thread_bootstrap(zend_string *file)
262✔
511
{
512
        zend_file_handle fh;
262✔
513
        zend_op_array   *ops;
262✔
514
        zval             rv;
262✔
515
        int              result;
262✔
516

517
        if (!file) {
262✔
518
                return SUCCESS;
519
        }
520

521
#if PHP_VERSION_ID >= 80100
522
        zend_stream_init_filename_ex(&fh, file);
42✔
523

524
        result = php_stream_open_for_zend_ex(&fh, USE_PATH | REPORT_ERRORS | STREAM_OPEN_FOR_INCLUDE);
42✔
525
#else
526
        result = php_stream_open_for_zend_ex(ZSTR_VAL(file), &fh, USE_PATH | REPORT_ERRORS | STREAM_OPEN_FOR_INCLUDE);
527
#endif
528

529
        if (result != SUCCESS) {
42✔
530
                return FAILURE;
531
        }
532

533
        zend_hash_add_empty_element(&EG(included_files), fh.opened_path ? fh.opened_path : file);
80✔
534

535
        ops = zend_compile_file(&fh, ZEND_REQUIRE);
40✔
536

537
        zend_destroy_file_handle(&fh);
40✔
538

539
        if (ops) {
40✔
540
                ZVAL_UNDEF(&rv);
38✔
541
                zend_execute(ops, &rv);
38✔
542
                destroy_op_array(ops);
38✔
543
                efree(ops);
38✔
544

545
                if (EG(exception)) {
38✔
546
                        zend_clear_exception();
2✔
547
                        return FAILURE;
2✔
548
                }
549

550
                zval_ptr_dtor(&rv);
36✔
551
                return SUCCESS;
36✔
552
        }
553

554
        if (EG(exception)) {
2✔
555
                zend_clear_exception();
2✔
556
        }
557

558
        return FAILURE;
559
}
560

561
// Implements the thread main loop. This bootstraps the thread by including the
562
// bootstrap PHP file in case one was specified and afterwards set the runtimes
563
// monitor to ready and running (and with this unblocking the calling
564
// `php_parallel_scheduler_start()` function).
565
static void *php_parallel_thread(void *arg)
262✔
566
{
567
        int32_t                 state = 0;
262✔
568

569
        php_parallel_runtime_t *runtime = php_parallel_scheduler_setup((php_parallel_runtime_t *)arg);
262✔
570

571
        if (php_parallel_thread_bootstrap(runtime->bootstrap) != SUCCESS) {
304✔
572
                php_parallel_monitor_set(runtime->monitor, PHP_PARALLEL_FAILURE);
6✔
573

574
                goto _php_parallel_thread_exit;
6✔
575
        }
576

577
        php_parallel_monitor_set(runtime->monitor, PHP_PARALLEL_READY | PHP_PARALLEL_RUNNING);
256✔
578

579
        do {
712✔
580
                php_parallel_schedule_el_t el;
484✔
581

582
                if (php_parallel_monitor_lock(runtime->monitor) != SUCCESS) {
484✔
583
                        break;
584
                }
585

586
                if (php_parallel_monitor_check(runtime->monitor, PHP_PARALLEL_KILLED)) {
484✔
587
                _php_parallel_thread_killed:
4✔
588
                        php_parallel_scheduler_kill(runtime);
8✔
589

590
                        php_parallel_monitor_unlock(runtime->monitor);
8✔
591
                        goto _php_parallel_thread_exit;
8✔
592
                }
593

594
                while (!php_parallel_scheduler_pop(runtime, &el)) {
696✔
595
                        php_parallel_monitor_remove(runtime->monitor, PHP_PARALLEL_RUNNING);
468✔
596

597
                        if (!(state & PHP_PARALLEL_CLOSE)) {
468✔
598
                                state = php_parallel_monitor_wait_locked(runtime->monitor,
457✔
599
                                                                         PHP_PARALLEL_EXEC | PHP_PARALLEL_CLOSE | PHP_PARALLEL_KILLED);
600
                        }
601

602
                        if ((state & (PHP_PARALLEL_CLOSE | PHP_PARALLEL_KILLED))) {
468✔
603
                                if ((state & PHP_PARALLEL_KILLED)) {
263✔
604
                                        goto _php_parallel_thread_killed;
4✔
605
                                }
606

607
                                if (php_parallel_scheduler_empty(runtime)) {
259✔
608
                                        php_parallel_monitor_unlock(runtime->monitor);
248✔
609
                                        goto _php_parallel_thread_exit;
248✔
610
                                }
611
                        }
612
                }
613

614
                php_parallel_monitor_add(runtime->monitor, PHP_PARALLEL_RUNNING);
228✔
615

616
                php_parallel_monitor_unlock(runtime->monitor);
228✔
617

618
                php_parallel_scheduler_run(runtime, el.frame);
228✔
619
        } while (1);
620

621
_php_parallel_thread_exit:
262✔
622
        php_parallel_scheduler_exit(runtime);
262✔
623

624
        return NULL;
625
}
626

627
// Creates a monitor for the `runtime` and spawns a new thread. After spawning
628
// blocks until the newly created thread enters either the ready or the failure
629
// state. Throws a userland exception in case the thread creation failed.
630
void php_parallel_scheduler_start(php_parallel_runtime_t *runtime, zend_string *bootstrap)
262✔
631
{
632
        uint32_t state = SUCCESS;
262✔
633

634
        if (bootstrap) {
262✔
635
                runtime->bootstrap = php_parallel_copy_string_interned(bootstrap);
42✔
636
        }
637

638
        runtime->monitor = php_parallel_monitor_create();
262✔
639

640
        if (pthread_create(&runtime->thread, NULL, php_parallel_thread, runtime) != SUCCESS) {
262✔
641
                php_parallel_exception_ex(php_parallel_runtime_error_ce, "cannot create thread %s", strerror(errno));
×
642
                php_parallel_monitor_set(runtime->monitor, PHP_PARALLEL_FAILURE);
×
643
                php_parallel_scheduler_stop(runtime);
×
644
                return;
×
645
        }
646

647
        state = php_parallel_monitor_wait(runtime->monitor, PHP_PARALLEL_READY | PHP_PARALLEL_FAILURE);
262✔
648

649
        if ((state == FAILURE) || (state & PHP_PARALLEL_FAILURE)) {
262✔
650
                php_parallel_exception_ex(php_parallel_runtime_error_bootstrap_ce, "bootstrapping failed with %s",
6✔
651
                                          ZSTR_VAL(runtime->bootstrap));
652
                php_parallel_scheduler_stop(runtime);
6✔
653
        }
654
}
655

656
void php_parallel_scheduler_stop(php_parallel_runtime_t *runtime)
268✔
657
{
658
        if (!runtime->monitor) {
268✔
659
                return;
660
        }
661

662
        php_parallel_monitor_lock(runtime->monitor);
262✔
663

664
        if (!php_parallel_monitor_check(runtime->monitor, PHP_PARALLEL_CLOSED)) {
262✔
665
                php_parallel_monitor_set(runtime->monitor, PHP_PARALLEL_CLOSE);
242✔
666

667
                if (!php_parallel_monitor_check(runtime->monitor, PHP_PARALLEL_FAILURE)) {
242✔
668
                        php_parallel_monitor_wait_locked(runtime->monitor, PHP_PARALLEL_DONE);
242✔
669
                }
670

671
                php_parallel_monitor_unlock(runtime->monitor);
242✔
672

673
                pthread_join(runtime->thread, NULL);
242✔
674
        } else {
675
                php_parallel_monitor_unlock(runtime->monitor);
20✔
676
        }
677

678
        php_parallel_monitor_destroy(runtime->monitor);
262✔
679

680
        runtime->monitor = NULL;
262✔
681
}
682

683
/// Adds the task in `closure` to the thread referenced by `runtime`. In case
684
/// the task returns anything it also creates the future to return.
685
void php_parallel_scheduler_push(php_parallel_runtime_t *runtime, zval *closure, zval *argv, zval *return_value)
304✔
686
{
687
        zend_execute_data     *caller = EG(current_execute_data)->prev_execute_data;
304✔
688
        const zend_function   *function = zend_get_closure_method_def(Z_OBJ_P(closure));
304✔
689
        bool                   returns = 0;
304✔
690
        php_parallel_future_t *future = NULL;
304✔
691

692
        php_parallel_monitor_lock(runtime->monitor);
304✔
693

694
        if (!php_parallel_check_task(runtime, caller, function, argv, &returns)) {
304✔
695
                php_parallel_monitor_unlock(runtime->monitor);
66✔
696
                return;
66✔
697
        }
698

699
        if (returns) {
238✔
700
                future = php_parallel_future_construct(return_value);
136✔
701
        }
702

703
        php_parallel_scheduler_add(runtime, function, argv, future);
238✔
704

705
        php_parallel_monitor_set(runtime->monitor, PHP_PARALLEL_EXEC);
238✔
706
        php_parallel_monitor_unlock(runtime->monitor);
238✔
707
}
708

709
void php_parallel_scheduler_join(php_parallel_runtime_t *runtime, bool kill)
22✔
710
{
711
        php_parallel_monitor_lock(runtime->monitor);
22✔
712

713
        if (php_parallel_monitor_check(runtime->monitor, PHP_PARALLEL_CLOSED)) {
22✔
714
                php_parallel_exception_ex(php_parallel_runtime_error_closed_ce, "Runtime closed");
2✔
715
                php_parallel_monitor_unlock(runtime->monitor);
2✔
716
                return;
2✔
717
        }
718

719
        if (kill) {
20✔
720
                php_parallel_monitor_set(runtime->monitor, PHP_PARALLEL_KILLED);
8✔
721

722
                zend_atomic_bool_store(runtime->child.interrupt, true);
8✔
723
        } else {
724
                php_parallel_monitor_set(runtime->monitor, PHP_PARALLEL_CLOSE);
12✔
725
        }
726

727
        php_parallel_monitor_wait_locked(runtime->monitor, PHP_PARALLEL_DONE);
20✔
728

729
        php_parallel_monitor_unlock(runtime->monitor);
20✔
730

731
        php_parallel_monitor_set(runtime->monitor, PHP_PARALLEL_CLOSED);
20✔
732

733
        pthread_join(runtime->thread, NULL);
20✔
734
}
735

736
bool php_parallel_scheduler_cancel(php_parallel_future_t *future)
12✔
737
{
738
        size_t in, out = 0;
12✔
739

740
        php_parallel_monitor_lock(future->runtime->monitor);
12✔
741

742
        in = zend_llist_count(&future->runtime->schedule);
12✔
743

744
        zend_llist_del_element(&future->runtime->schedule, future->handle, php_parallel_scheduler_list_delete);
12✔
745

746
        out = zend_llist_count(&future->runtime->schedule);
12✔
747

748
        php_parallel_monitor_unlock(future->runtime->monitor);
12✔
749

750
        if (in == out) {
12✔
751
                bool cancelled = 0;
8✔
752

753
                php_parallel_monitor_lock(future->monitor);
8✔
754

755
                if (!php_parallel_monitor_check(future->monitor, PHP_PARALLEL_READY)) {
8✔
756
                        zend_atomic_bool_store(future->runtime->child.interrupt, true);
6✔
757

758
                        php_parallel_monitor_set(future->monitor, PHP_PARALLEL_CANCELLED);
6✔
759
                        php_parallel_monitor_wait_locked(future->monitor, PHP_PARALLEL_READY);
6✔
760
                        php_parallel_monitor_set(future->monitor, PHP_PARALLEL_READY | PHP_PARALLEL_DONE);
6✔
761

762
                        cancelled = 1;
6✔
763
                }
764

765
                php_parallel_monitor_unlock(future->monitor);
8✔
766

767
                return cancelled;
8✔
768
        } else {
769
                php_parallel_monitor_set(future->monitor, PHP_PARALLEL_READY | PHP_PARALLEL_DONE | PHP_PARALLEL_CANCELLED);
4✔
770

771
                return 1;
4✔
772
        }
773
}
774

775
PARALLEL_API bool php_parallel_is_parallel_worker_thread(void) { return php_parallel_scheduler_context != NULL; }
×
776

777
static void       php_parallel_scheduler_interrupt(zend_execute_data *execute_data)
10✔
778
{
779
        if (php_parallel_scheduler_context) {
10✔
780
                php_parallel_monitor_lock(php_parallel_scheduler_context->monitor);
10✔
781
                if (php_parallel_monitor_check(php_parallel_scheduler_context->monitor, PHP_PARALLEL_KILLED)) {
10✔
782
                        php_parallel_monitor_unlock(php_parallel_scheduler_context->monitor);
4✔
783
                        zend_bailout();
4✔
784
                }
785
                php_parallel_monitor_unlock(php_parallel_scheduler_context->monitor);
6✔
786

787
                if (php_parallel_scheduler_future) {
6✔
788
                        php_parallel_monitor_lock(php_parallel_scheduler_future->monitor);
6✔
789
                        if (php_parallel_monitor_check(php_parallel_scheduler_future->monitor, PHP_PARALLEL_CANCELLED)) {
6✔
790
                                php_parallel_monitor_unlock(php_parallel_scheduler_future->monitor);
6✔
791
                                zend_bailout();
6✔
792
                        }
793
                        php_parallel_monitor_unlock(php_parallel_scheduler_future->monitor);
×
794
                }
795
        }
796

797
        if (zend_interrupt_handler) {
×
798
                zend_interrupt_handler(execute_data);
×
799
        }
800
}
×
801

802
PHP_MINIT_FUNCTION(PARALLEL_SCHEDULER)
328✔
803
{
804
        zend_interrupt_handler = zend_interrupt_function;
328✔
805
        zend_interrupt_function = php_parallel_scheduler_interrupt;
328✔
806

807
#ifdef _WIN32
808
        php_parallel_veh_handle = AddVectoredExceptionHandler(1, php_parallel_veh_handler);
809
#else
810
        struct sigaction sa;
328✔
811

812
        memset(&sa, 0, sizeof(struct sigaction));
328✔
813
        sa.sa_sigaction = php_parallel_sigsegv_handler;
328✔
814
        sa.sa_flags = SA_SIGINFO;
328✔
815

816
        sigaction(SIGSEGV, &sa, &php_parallel_old_sigsegv_action);
328✔
817
#endif
818

819
        PHP_MINIT(PARALLEL_RUNTIME)(INIT_FUNC_ARGS_PASSTHRU);
328✔
820

821
        return SUCCESS;
328✔
822
}
823

824
PHP_MSHUTDOWN_FUNCTION(PARALLEL_SCHEDULER)
328✔
825
{
826
#ifdef _WIN32
827
        if (php_parallel_veh_handle) {
828
                RemoveVectoredExceptionHandler(php_parallel_veh_handle);
829
        }
830
#else
831
        sigaction(SIGSEGV, &php_parallel_old_sigsegv_action, NULL);
328✔
832
#endif
833

834
        zend_interrupt_function = zend_interrupt_handler;
328✔
835

836
        PHP_MSHUTDOWN(PARALLEL_RUNTIME)(INIT_FUNC_ARGS_PASSTHRU);
328✔
837

838
        return SUCCESS;
328✔
839
}
840
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc