• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

azjezz / psl / 17432557268

03 Sep 2025 11:52AM UTC coverage: 98.446% (-0.07%) from 98.513%
17432557268

push

github

web-flow
chore: migrate from `psalm` to `mago` (#527)

232 of 241 new or added lines in 81 files covered. (96.27%)

14 existing lines in 12 files now uncovered.

5510 of 5597 relevant lines covered (98.45%)

52.23 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.87
/src/Psl/Async/KeyedSemaphore.php
1
<?php
2

3
declare(strict_types=1);
4

5
namespace Psl\Async;
6

7
use Closure;
8
use Exception;
9
use Revolt\EventLoop;
10
use Revolt\EventLoop\Suspension;
11

12
use function array_key_exists;
13
use function array_shift;
14
use function array_sum;
15
use function count;
16

17
/**
18
 * Run an operation with a limit on number of ongoing asynchronous jobs for a specific key.
19
 *
20
 * All operations must have the same input type (Tin) and output type (Tout), and be processed by the same function.
21
 *
22
 * `Tin` may be a callable invoked by the `$operation` for maximum flexibility,
23
 * however this pattern is best avoided in favor of creating semaphores with a more narrow process.
24
 *
25
 * @template Tk of array-key
26
 * @template Tin
27
 * @template Tout
28
 *
29
 * @mago-expect lint:no-else-clause
30
 */
31
final class KeyedSemaphore
32
{
33
    /**
34
     * @var array<Tk, int<0, max>>
35
     */
36
    private array $ingoing = [];
37

38
    /**
39
     * @var array<Tk, list<Suspension>>
40
     */
41
    private array $pending = [];
42

43
    /**
44
     * @var array<Tk, list<Suspension>>
45
     */
46
    private array $waits = [];
47

48
    /**
49
     * @param positive-int $concurrencyLimit
50
     * @param (Closure(Tk, Tin): Tout) $operation
51
     */
52
    public function __construct(
53
        private readonly int $concurrencyLimit,
54
        private readonly Closure $operation,
55
    ) {}
11✔
56

57
    /**
58
     * Run the operation using the given `$input`.
59
     *
60
     * If the concurrency limit has been reached for the given `$key`, this method will wait until one of the ingoing operations has completed.
61
     *
62
     * @param Tk $key
63
     * @param Tin $input
64
     *
65
     * @return Tout
66
     *
67
     * @see Semaphore::cancel()
68
     */
69
    public function waitFor(string|int $key, mixed $input): mixed
70
    {
71
        $this->ingoing[$key] ??= 0;
11✔
72
        if ($this->ingoing[$key] === $this->concurrencyLimit) {
11✔
73
            $suspension = EventLoop::getSuspension();
7✔
74
            $this->pending[$key][] = $suspension;
7✔
75

76
            $suspension->suspend();
7✔
77
        }
78

79
        $this->ingoing[$key]++;
11✔
80

81
        try {
82
            return ($this->operation)($key, $input);
11✔
83
        } finally {
84
            if (($this->pending[$key] ?? []) !== []) {
11✔
85
                $suspension = array_shift($this->pending[$key]);
5✔
86
                if ([] === $this->pending[$key]) {
5✔
87
                    unset($this->pending[$key]);
5✔
88
                }
89

90
                if ($suspension !== null) {
5✔
91
                    $suspension->resume();
5✔
92
                }
93

94
                $this->ingoing[$key]--;
5✔
95
            } else {
96
                foreach ($this->waits[$key] ?? [] as $suspension) {
11✔
97
                    $suspension->resume();
1✔
98
                }
99

100
                unset($this->waits[$key]);
11✔
101

102
                $this->ingoing[$key]--;
11✔
103
                if ($this->ingoing[$key] === 0) {
11✔
104
                    unset($this->ingoing[$key]);
11✔
105
                }
106
            }
107
        }
108
    }
109

110
    /**
111
     * Cancel pending operations for the given key.
112
     *
113
     * Pending operation will fail with the given exception.
114
     *
115
     * Future operations will continue execution as usual.
116
     *
117
     * @param Tk $key
118
     */
119
    public function cancel(string|int $key, Exception $exception): void
120
    {
121
        $suspensions = $this->pending[$key] ?? [];
1✔
122
        unset($this->pending[$key]);
1✔
123
        foreach ($suspensions as $suspension) {
1✔
124
            $suspension->throw($exception);
1✔
125
        }
126
    }
127

128
    /**
129
     * Cancel all pending operations.
130
     *
131
     * Pending operation will fail with the given exception.
132
     *
133
     * Future operations will continue execution as usual.
134
     */
135
    public function cancelAll(Exception $exception): void
136
    {
137
        $pending = $this->pending;
2✔
138
        $this->pending = [];
2✔
139
        foreach ($pending as $suspensions) {
2✔
140
            foreach ($suspensions as $suspension) {
1✔
141
                $suspension->throw($exception);
1✔
142
            }
143
        }
144
    }
145

146
    /**
147
     * Get the concurrency limit of the semaphore.
148
     *
149
     * @return positive-int
150
     */
151
    public function getConcurrencyLimit(): int
152
    {
153
        return $this->concurrencyLimit;
1✔
154
    }
155

156
    /**
157
     * Get the number of operations pending execution for the given key.
158
     *
159
     * @param Tk $key
160
     *
161
     * @return int<0, max>
162
     */
163
    public function getPendingOperations(string|int $key): int
164
    {
165
        /** @var int<0, max> */
166
        return count($this->pending[$key] ?? []);
1✔
167
    }
168

169
    /**
170
     * Get the number of total operations pending execution.
171
     *
172
     * @return int<0, max>
173
     */
174
    public function getTotalPendingOperations(): int
175
    {
176
        $count = 0;
1✔
177
        foreach ($this->pending as $suspensions) {
1✔
178
            $count += count($suspensions);
1✔
179
        }
180

181
        /** @var int<0, max> */
182
        return $count;
1✔
183
    }
184

185
    /**
186
     * Check if there's any operations pending execution for the given key.
187
     *
188
     * If this method returns `true`, it means the semaphore has reached it's limits, future calls to `waitFor` will wait.
189
     *
190
     * @param Tk $key
191
     */
192
    public function hasPendingOperations(string|int $key): bool
193
    {
194
        return array_key_exists($key, $this->pending);
2✔
195
    }
196

197
    /**
198
     * Check if there's any operations pending execution.
199
     */
200
    public function hasAnyPendingOperations(): bool
201
    {
202
        return $this->pending !== [];
1✔
203
    }
204

205
    /**
206
     * Get the number of ingoing operations for the given key.
207
     *
208
     * The returned number will always be lower, or equal to the concurrency limit.
209
     *
210
     * @param Tk $key
211
     *
212
     * @return int<0, max>
213
     */
214
    public function getIngoingOperations(string|int $key): int
215
    {
216
        return $this->ingoing[$key] ?? 0;
1✔
217
    }
218

219
    /**
220
     * Get the number of total ingoing operations.
221
     *
222
     * The returned number can be higher than the concurrency limit, as it is the sum of all ingoing operations using different keys.
223
     *
224
     * @return int<0, max>
225
     */
226
    public function getTotalIngoingOperations(): int
227
    {
228
        /** @var int<0, max> */
229
        return array_sum($this->ingoing);
1✔
230
    }
231

232
    /**
233
     * Check if the semaphore has any ingoing operations for the given key.
234
     *
235
     * If this method returns `true`, it does not mean future calls to `waitFor` will wait, since a semaphore can have multiple ingoing operations
236
     * at the same time for the same key.
237
     *
238
     * @param Tk $key
239
     */
240
    public function hasIngoingOperations(string|int $key): bool
241
    {
242
        return array_key_exists($key, $this->ingoing);
2✔
243
    }
244

245
    /**
246
     * Check if the semaphore has any ingoing operations.
247
     */
248
    public function hasAnyIngoingOperations(): bool
249
    {
250
        return $this->ingoing !== [];
1✔
251
    }
252

253
    /**
254
     * Wait for all pending operations associated with the given key to start execution.
255
     *
256
     * If the semaphore is has not reached the concurrency limit the given key, this method will return immediately.
257
     *
258
     * @param Tk $key
259
     */
260
    public function waitForPending(string|int $key): void
261
    {
262
        if (($this->ingoing[$key] ?? 0) !== $this->concurrencyLimit) {
1✔
UNCOV
263
            return;
×
264
        }
265

266
        $suspension = EventLoop::getSuspension();
1✔
267
        $this->waits[$key][] = $suspension;
1✔
268
        $suspension->suspend();
1✔
269
    }
270
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc