• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

krakjoe / parallel / 20273776347

16 Dec 2025 03:42PM UTC coverage: 96.75% (-0.07%) from 96.815%
20273776347

Pull #357

github

web-flow
Merge 324a13a76 into 14042a874
Pull Request #357: Cleanup code formatting and docs

1527 of 1615 new or added lines in 27 files covered. (94.55%)

1 existing line in 1 file now uncovered.

2798 of 2892 relevant lines covered (96.75%)

6599.16 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

99.22
/src/poll.c
1
/*
2
  +----------------------------------------------------------------------+
3
  | parallel                                                             |
4
  +----------------------------------------------------------------------+
5
  | Copyright (c) Joe Watkins 2019-2024                                  |
6
  +----------------------------------------------------------------------+
7
  | This source file is subject to version 3.01 of the PHP license,      |
8
  | that is bundled with this package in the file LICENSE, and is        |
9
  | available through the world-wide-web at the following url:           |
10
  | http://www.php.net/license/3_01.txt                                  |
11
  | If you did not receive a copy of the PHP license and are unable to   |
12
  | obtain it through the world-wide-web, please send a note to          |
13
  | license@php.net so we can mail you a copy immediately.               |
14
  +----------------------------------------------------------------------+
15
  | Author: krakjoe                                                      |
16
  +----------------------------------------------------------------------+
17
 */
18
#ifndef HAVE_PARALLEL_EVENTS_POLL
19
#define HAVE_PARALLEL_EVENTS_POLL
20

21
#include "parallel.h"
22

23
#if PHP_VERSION_ID >= 80400
24
#include "ext/random/php_random.h"
25
#else
26
#include "ext/standard/php_mt_rand.h"
27
#endif
28

29
typedef struct _php_parallel_events_poll_t {
30
    uint32_t try;
31
    struct timeval stop;
32
    struct {
33
        zend_fcall_info fci;
34
        zend_fcall_info_cache fcc;
35
        zval ival;
36
    } block;
37
    php_parallel_events_state_t state;
38
} php_parallel_events_poll_t;
39

40
static zend_always_inline php_parallel_events_poll_t *php_parallel_events_poll_init(php_parallel_events_t *events)
270✔
41
{
42
    if (events->targets.nNumUsed == 0) {
270✔
43
        return NULL;
44
    }
45

46
    php_parallel_events_poll_t *poll = (php_parallel_events_poll_t *)pecalloc(1, sizeof(php_parallel_events_poll_t), 1);
234✔
47

48
    if (events->timeout > -1) {
234✔
49
        if (gettimeofday(&poll->stop, NULL) == SUCCESS) {
18✔
50
            poll->stop.tv_sec += (events->timeout / 1000000L);
18✔
51
            poll->stop.tv_sec += (poll->stop.tv_usec + (events->timeout % 1000000L)) / 1000000L;
18✔
52
            poll->stop.tv_usec = (poll->stop.tv_usec + (events->timeout % 1000000L)) % 1000000L;
18✔
53
        }
54
        /* return 0 ? */
55
    }
56

57
    if (!Z_ISUNDEF(events->blocker)) {
234✔
58
        zend_fcall_info_init(&events->blocker, 0, &poll->block.fci, &poll->block.fcc, NULL, NULL);
18✔
59
        poll->block.fci.retval = &poll->block.ival;
18✔
60
    } else {
61
        memset(&poll->block, 0, sizeof(poll->block));
216✔
62
    }
63

64
    return poll;
65
}
66

67
static zend_always_inline void php_parallel_events_poll_free(php_parallel_events_poll_t *poll) { pefree(poll, 1); }
234✔
68

69
static zend_always_inline void php_parallel_events_poll_end(php_parallel_events_poll_t *poll)
180✔
70
{
71
    if (poll->state.type == PHP_PARALLEL_EVENTS_LINK) {
180✔
72
        php_parallel_channel_t *channel = php_parallel_channel_fetch(poll->state.object);
108✔
73

74
        php_parallel_link_unlock(channel->link);
108✔
75
    } else {
76
        php_parallel_future_t *future = php_parallel_future_fetch(poll->state.object);
72✔
77

78
        php_parallel_future_unlock(future);
72✔
79
    }
80

81
    php_parallel_events_poll_free(poll);
180✔
82
}
180✔
83

84
static zend_always_inline bool php_parallel_events_poll_timeout(php_parallel_events_poll_t *poll,
290,003✔
85
                                                                php_parallel_events_t *events)
86
{
87
    struct timeval now;
290,003✔
88

89
    if (events->timeout > -1 && gettimeofday(&now, NULL) == SUCCESS) {
289,937✔
90
        if (now.tv_sec >= poll->stop.tv_sec && now.tv_usec >= poll->stop.tv_usec) {
289,937✔
91
            php_parallel_exception_ex(php_parallel_events_error_timeout_ce, "timeout occured");
18✔
92
            return 1;
18✔
93
        }
94
    }
95

96
    return 0;
97
}
98

99
static zend_always_inline bool php_parallel_events_poll_random(php_parallel_events_t *events, zend_string **name,
290,219✔
100
                                                               zend_object **object)
101
{
102
    uint32_t size = events->targets.nNumUsed;
290,219✔
103
    zend_long random = php_mt_rand_range(0, (zend_long)size - 1);
290,219✔
104

105
    do {
290,245✔
106
        Bucket *bucket = &events->targets.arData[random];
290,232✔
107

108
        if (!Z_ISUNDEF(bucket->val)) {
290,232✔
109
            *name = bucket->key;
290,219✔
110
            *object = Z_OBJ(bucket->val);
290,219✔
111

112
            return 1;
290,219✔
113
        }
114

115
        random = php_mt_rand_range(0, (zend_long)size - 1);
13✔
116
    } while (1);
117

118
    return 0;
119
}
120

121
static zend_always_inline bool php_parallel_events_poll_begin_link(php_parallel_events_t *events,
290,099✔
122
                                                                   php_parallel_events_state_t *state,
123
                                                                   zend_string *name, zend_object *object)
124
{
125
    php_parallel_channel_t *channel = php_parallel_channel_fetch(object);
290,099✔
126

127
    php_parallel_link_lock(channel->link);
290,099✔
128

129
    if (php_parallel_link_closed(channel->link)) {
290,099✔
130
        state->closed = 1;
18✔
131
    } else {
132
        if (php_parallel_events_input_exists(&events->input, name)) {
290,081✔
133
            state->writable = php_parallel_link_writable(channel->link);
36✔
134
        } else {
135
            state->readable = php_parallel_link_readable(channel->link);
290,045✔
136
        }
137
    }
138

139
    if (state->readable || state->writable || state->closed) {
290,099✔
140
        state->type = PHP_PARALLEL_EVENTS_LINK;
108✔
141
        state->name = name;
108✔
142
        state->object = object;
108✔
143

144
        return 1;
108✔
145
    }
146

147
    php_parallel_link_unlock(channel->link);
289,991✔
148
    return 0;
289,991✔
149
}
150

151
static zend_always_inline bool php_parallel_events_poll_begin_future(php_parallel_events_t *events,
120✔
152
                                                                     php_parallel_events_state_t *state,
153
                                                                     zend_string *name, zend_object *object)
154
{
155
    php_parallel_future_t *future = php_parallel_future_fetch(object);
120✔
156

157
    php_parallel_future_lock(future);
120✔
158

159
    state->readable = php_parallel_future_readable(future);
120✔
160

161
    if (state->readable) {
120✔
162
        state->type = PHP_PARALLEL_EVENTS_FUTURE;
72✔
163
        state->name = name;
72✔
164
        state->object = object;
72✔
165

166
        return 1;
72✔
167
    }
168

169
    php_parallel_future_unlock(future);
48✔
170
    return 0;
48✔
171
}
172

173
static zend_always_inline bool php_parallel_events_poll_begin(php_parallel_events_t *events,
290,219✔
174
                                                              php_parallel_events_state_t *state)
175
{
176
    zend_string *name;
290,219✔
177
    zend_object *object;
290,219✔
178

179
    if (!php_parallel_events_poll_random(events, &name, &object)) {
580,438✔
180
        return 0;
181
    }
182

183
    memset(state, 0, sizeof(php_parallel_events_state_t));
290,219✔
184

185
    if (instanceof_function(object->ce, php_parallel_channel_ce)) {
290,339✔
186
        return php_parallel_events_poll_begin_link(events, state, name, object);
290,099✔
187
    } else {
188
        return php_parallel_events_poll_begin_future(events, state, name, object);
120✔
189
    }
190

191
    return 0;
192
}
193

194
static zend_always_inline bool php_parallel_events_poll_link(php_parallel_events_t *events,
108✔
195
                                                             php_parallel_events_state_t *state, zval *retval)
196
{
197
    php_parallel_channel_t *channel = php_parallel_channel_fetch(state->object);
108✔
198
    zval *input;
108✔
199

200
    if (state->closed) {
108✔
201
        php_parallel_events_event_construct(events, PHP_PARALLEL_EVENTS_EVENT_CLOSE, state->name, state->object, NULL,
18✔
202
                                            retval);
203

204
        return 1;
18✔
205
    } else {
206
        if ((input = php_parallel_events_input_find(&events->input, state->name))) {
90✔
207

208
            if (state->writable) {
36✔
209

210
                if (php_parallel_link_send(channel->link, input)) {
36✔
211

212
                    php_parallel_events_event_construct(events, PHP_PARALLEL_EVENTS_EVENT_WRITE, state->name,
36✔
213
                                                        state->object, NULL, retval);
214

215
                    return 1;
36✔
216
                }
217
            }
218
        } else {
219
            if (state->readable) {
54✔
220
                zval read;
54✔
221

222
                if (php_parallel_link_recv(channel->link, &read)) {
54✔
223

224
                    php_parallel_events_event_construct(events, PHP_PARALLEL_EVENTS_EVENT_READ, state->name,
54✔
225
                                                        state->object, &read, retval);
226

227
                    return 1;
54✔
228
                }
229
            }
230
        }
231
    }
232

233
    return 0;
234
}
235

236
static zend_always_inline bool php_parallel_events_poll_future(php_parallel_events_t *events,
72✔
237
                                                               php_parallel_events_state_t *state, zval *retval)
238
{
239
    if (state->readable) {
72✔
240
        zval read;
72✔
241
        php_parallel_events_event_type_t type = PHP_PARALLEL_EVENTS_EVENT_READ;
72✔
242
        php_parallel_future_t *future = php_parallel_future_fetch(state->object);
72✔
243

244
        ZVAL_NULL(&read);
72✔
245

246
        if (php_parallel_monitor_check(future->monitor, PHP_PARALLEL_KILLED)) {
72✔
247
            type = PHP_PARALLEL_EVENTS_EVENT_KILL;
248
        } else if (php_parallel_monitor_check(future->monitor, PHP_PARALLEL_CANCELLED)) {
54✔
249
            type = PHP_PARALLEL_EVENTS_EVENT_CANCEL;
250
        } else {
251
            if (php_parallel_monitor_check(future->monitor, PHP_PARALLEL_ERROR)) {
36✔
252
                type = PHP_PARALLEL_EVENTS_EVENT_ERROR;
18✔
253
            }
254

255
            php_parallel_future_value(future, &read);
36✔
256
        }
257

258
        php_parallel_events_event_construct(events, type, state->name, state->object, &read, retval);
72✔
259

260
        return 1;
72✔
261
    }
262

263
    return 0;
264
}
265

266
void php_parallel_events_poll(php_parallel_events_t *events, zval *retval)
270✔
267
{
268
    php_parallel_events_poll_t *poll;
270✔
269

270
    if (!(poll = php_parallel_events_poll_init(events))) {
504✔
271
        ZVAL_NULL(retval);
36✔
272
        return;
36✔
273
    }
274

275
    do {
290,219✔
276
        if (!php_parallel_events_poll_begin(events, &poll->state)) {
290,219✔
277
            if (!events->blocking) {
290,039✔
278
                php_parallel_events_poll_free(poll);
18✔
279
                return;
18✔
280
            }
281

282
            if (poll->block.fci.size) {
290,021✔
283
                zend_call_function(&poll->block.fci, &poll->block.fcc);
18✔
284

285
                if (zend_is_true(&poll->block.ival)) {
18✔
286
                    zval_ptr_dtor(&poll->block.ival);
18✔
287
                    php_parallel_events_poll_free(poll);
18✔
288
                    return;
18✔
289
                }
290

NEW
291
                zval_ptr_dtor(&poll->block.ival);
×
292
            } else {
293
                if ((poll->try++ % 10) == 0) {
290,003✔
294
                    usleep(1);
29,057✔
295
                }
296
            }
297

298
            if (php_parallel_events_poll_timeout(poll, events)) {
579,922✔
299
                php_parallel_events_poll_free(poll);
18✔
300
                return;
18✔
301
            }
302

303
            continue;
289,985✔
304
        }
305

306
        if (poll->state.type == PHP_PARALLEL_EVENTS_LINK) {
180✔
307
            if (php_parallel_events_poll_link(events, &poll->state, retval)) {
198✔
308
                break;
309
            }
310
        } else {
311
            if (php_parallel_events_poll_future(events, &poll->state, retval)) {
144✔
312
                break;
313
            }
314
        }
315

316
        php_parallel_events_poll_end(poll);
290,219✔
317
    } while (1);
318

319
    php_parallel_events_poll_end(poll);
180✔
320
}
321
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc