• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

azjezz / psl / 13230804660

09 Feb 2025 11:57PM UTC coverage: 98.466% (+0.001%) from 98.465%
13230804660

Pull #504

github

web-flow
Merge 497739740 into 96a3e7279
Pull Request #504: chore: benchmarks for `Collection` component

17 of 18 new or added lines in 9 files covered. (94.44%)

1 existing line in 1 file now uncovered.

5326 of 5409 relevant lines covered (98.47%)

50.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.83
/src/Psl/Async/KeyedSemaphore.php
1
<?php
2

3
declare(strict_types=1);
4

5
namespace Psl\Async;
6

7
use Closure;
8
use Exception;
9
use Revolt\EventLoop;
10
use Revolt\EventLoop\Suspension;
11

12
use function array_key_exists;
13
use function array_shift;
14
use function array_sum;
15
use function count;
16

17
/**
18
 * Run an operation with a limit on number of ongoing asynchronous jobs for a specific key.
19
 *
20
 * All operations must have the same input type (Tin) and output type (Tout), and be processed by the same function.
21
 *
22
 * `Tin` may be a callable invoked by the `$operation` for maximum flexibility,
23
 * however this pattern is best avoided in favor of creating semaphores with a more narrow process.
24
 *
25
 * @template Tk of array-key
26
 * @template Tin
27
 * @template Tout
28
 */
29
final class KeyedSemaphore
30
{
31
    /**
32
     * @var array<Tk, int<0, max>>
33
     */
34
    private array $ingoing = [];
35

36
    /**
37
     * @var array<Tk, list<Suspension>>
38
     */
39
    private array $pending = [];
40

41
    /**
42
     * @var array<Tk, list<Suspension>>
43
     */
44
    private array $waits = [];
45

46
    /**
47
     * @param positive-int $concurrencyLimit
48
     * @param (Closure(Tk, Tin): Tout) $operation
49
     */
50
    public function __construct(
51
        private readonly int $concurrencyLimit,
52
        private readonly Closure $operation,
53
    ) {
54
    }
11✔
55

56
    /**
57
     * Run the operation using the given `$input`.
58
     *
59
     * If the concurrency limit has been reached for the given `$key`, this method will wait until one of the ingoing operations has completed.
60
     *
61
     * @param Tk $key
62
     * @param Tin $input
63
     *
64
     * @return Tout
65
     *
66
     * @see Semaphore::cancel()
67
     */
68
    public function waitFor(string|int $key, mixed $input): mixed
69
    {
70
        $this->ingoing[$key] = $this->ingoing[$key] ?? 0;
11✔
71
        if ($this->ingoing[$key] === $this->concurrencyLimit) {
11✔
72
            $this->pending[$key][] = $suspension = EventLoop::getSuspension();
7✔
73

74
            $suspension->suspend();
7✔
75
        }
76

77
        $this->ingoing[$key]++;
11✔
78

79
        try {
80
            return ($this->operation)($key, $input);
11✔
81
        } finally {
82
            if (($this->pending[$key] ?? []) !== []) {
11✔
83
                $suspension = array_shift($this->pending[$key]);
5✔
84
                if ([] === $this->pending[$key]) {
5✔
85
                    unset($this->pending[$key]);
5✔
86
                }
87

88
                if ($suspension !== null) {
5✔
89
                    $suspension->resume();
5✔
90
                }
91

92
                $this->ingoing[$key]--;
5✔
93
            } else {
94
                foreach ($this->waits[$key] ?? [] as $suspension) {
11✔
95
                    $suspension->resume();
1✔
96
                }
97

98
                unset($this->waits[$key]);
11✔
99

100
                /** @psalm-suppress InvalidPropertyAssignmentValue */
101
                $this->ingoing[$key]--;
11✔
102
                if ($this->ingoing[$key] === 0) {
11✔
103
                    unset($this->ingoing[$key]);
11✔
104
                }
105
            }
106
        }
107
    }
108

109
    /**
110
     * Cancel pending operations for the given key.
111
     *
112
     * Pending operation will fail with the given exception.
113
     *
114
     * Future operations will continue execution as usual.
115
     *
116
     * @param Tk $key
117
     */
118
    public function cancel(string|int $key, Exception $exception): void
119
    {
120
        $suspensions = $this->pending[$key] ?? [];
1✔
121
        unset($this->pending[$key]);
1✔
122
        foreach ($suspensions as $suspension) {
1✔
123
            $suspension->throw($exception);
1✔
124
        }
125
    }
126

127
    /**
128
     * Cancel all pending operations.
129
     *
130
     * Pending operation will fail with the given exception.
131
     *
132
     * Future operations will continue execution as usual.
133
     */
134
    public function cancelAll(Exception $exception): void
135
    {
136
        $pending = $this->pending;
2✔
137
        $this->pending = [];
2✔
138
        foreach ($pending as $suspensions) {
2✔
139
            foreach ($suspensions as $suspension) {
1✔
140
                $suspension->throw($exception);
1✔
141
            }
142
        }
143
    }
144

145
    /**
146
     * Get the concurrency limit of the semaphore.
147
     *
148
     * @return positive-int
149
     */
150
    public function getConcurrencyLimit(): int
151
    {
152
        return $this->concurrencyLimit;
1✔
153
    }
154

155
    /**
156
     * Get the number of operations pending execution for the given key.
157
     *
158
     * @param Tk $key
159
     *
160
     * @return int<0, max>
161
     */
162
    public function getPendingOperations(string|int $key): int
163
    {
164
        /** @var int<0, max> */
165
        return count($this->pending[$key] ?? []);
1✔
166
    }
167

168
    /**
169
     * Get the number of total operations pending execution.
170
     *
171
     * @return int<0, max>
172
     */
173
    public function getTotalPendingOperations(): int
174
    {
175
        $count = 0;
1✔
176
        foreach ($this->pending as $suspensions) {
1✔
177
            $count += count($suspensions);
1✔
178
        }
179

180
        /** @var int<0, max> */
181
        return $count;
1✔
182
    }
183

184
    /**
185
     * Check if there's any operations pending execution for the given key.
186
     *
187
     * If this method returns `true`, it means the semaphore has reached it's limits, future calls to `waitFor` will wait.
188
     *
189
     * @param Tk $key
190
     */
191
    public function hasPendingOperations(string|int $key): bool
192
    {
193
        return array_key_exists($key, $this->pending);
2✔
194
    }
195

196
    /**
197
     * Check if there's any operations pending execution.
198
     */
199
    public function hasAnyPendingOperations(): bool
200
    {
201
        return $this->pending !== [];
1✔
202
    }
203

204
    /**
205
     * Get the number of ingoing operations for the given key.
206
     *
207
     * The returned number will always be lower, or equal to the concurrency limit.
208
     *
209
     * @param Tk $key
210
     *
211
     * @return int<0, max>
212
     */
213
    public function getIngoingOperations(string|int $key): int
214
    {
215
        return $this->ingoing[$key] ?? 0;
1✔
216
    }
217

218
    /**
219
     * Get the number of total ingoing operations.
220
     *
221
     * The returned number can be higher than the concurrency limit, as it is the sum of all ingoing operations using different keys.
222
     *
223
     * @return int<0, max>
224
     */
225
    public function getTotalIngoingOperations(): int
226
    {
227
        /** @var int<0, max> */
228
        return array_sum($this->ingoing);
1✔
229
    }
230

231
    /**
232
     * Check if the semaphore has any ingoing operations for the given key.
233
     *
234
     * If this method returns `true`, it does not mean future calls to `waitFor` will wait, since a semaphore can have multiple ingoing operations
235
     * at the same time for the same key.
236
     *
237
     * @param Tk $key
238
     */
239
    public function hasIngoingOperations(string|int $key): bool
240
    {
241
        return array_key_exists($key, $this->ingoing);
2✔
242
    }
243

244
    /**
245
     * Check if the semaphore has any ingoing operations.
246
     */
247
    public function hasAnyIngoingOperations(): bool
248
    {
249
        return $this->ingoing !== [];
1✔
250
    }
251

252
    /**
253
     * Wait for all pending operations associated with the given key to start execution.
254
     *
255
     * If the semaphore is has not reached the concurrency limit the given key, this method will return immediately.
256
     *
257
     * @param Tk $key
258
     */
259
    public function waitForPending(string|int $key): void
260
    {
261
        if (($this->ingoing[$key] ?? 0) !== $this->concurrencyLimit) {
1✔
UNCOV
262
            return;
×
263
        }
264

265
        $suspension = EventLoop::getSuspension();
1✔
266
        $this->waits[$key][] = $suspension;
1✔
267
        $suspension->suspend();
1✔
268
    }
269
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc