• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

azjezz / psl / 17432557268

03 Sep 2025 11:52AM UTC coverage: 98.446% (-0.07%) from 98.513%
17432557268

push

github

web-flow
chore: migrate from `psalm` to `mago` (#527)

232 of 241 new or added lines in 81 files covered. (96.27%)

14 existing lines in 12 files now uncovered.

5510 of 5597 relevant lines covered (98.45%)

52.23 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.43
/src/Psl/Async/Semaphore.php
1
<?php
2

3
declare(strict_types=1);
4

5
namespace Psl\Async;
6

7
use Closure;
8
use Exception;
9
use Revolt\EventLoop;
10
use Revolt\EventLoop\Suspension;
11

12
use function array_shift;
13
use function count;
14

15
/**
16
 * Run an operation with a limit on number of ongoing asynchronous jobs.
17
 *
18
 * All operations must have the same input type (Tin) and output type (Tout), and be processed by the same function;
19
 * `Tin` may be a callable invoked by the `$operation` for maximum flexibility,
20
 * however this pattern is best avoided in favor of creating semaphores with a more narrow process.
21
 *
22
 * @template Tin
23
 * @template Tout
24
 *
25
 * @mago-expect lint:no-else-clause
26
 */
27
final class Semaphore
28
{
29
    /**
30
     * @var int<0, max>
31
     */
32
    private int $ingoing = 0;
33

34
    /**
35
     * @var list<Suspension>
36
     */
37
    private array $pending = [];
38

39
    /**
40
     * @var list<Suspension>
41
     */
42
    private array $waits = [];
43

44
    /**
45
     * @param positive-int $concurrencyLimit
46
     * @param (Closure(Tin): Tout) $operation
47
     */
48
    public function __construct(
49
        private readonly int $concurrencyLimit,
50
        private readonly Closure $operation,
51
    ) {}
9✔
52

53
    /**
54
     * Run the operation using the given `$input`.
55
     *
56
     * If the concurrency limit has been reached, this method will wait until one of the ingoing operations has completed.
57
     *
58
     * @param Tin $input
59
     *
60
     * @return Tout
61
     *
62
     * @see Semaphore::cancel()
63
     */
64
    public function waitFor(mixed $input): mixed
65
    {
66
        if ($this->ingoing === $this->concurrencyLimit) {
9✔
67
            $suspension = EventLoop::getSuspension();
5✔
68
            $this->pending[] = $suspension;
5✔
69

70
            $suspension->suspend();
5✔
71
        }
72

73
        $this->ingoing++;
9✔
74

75
        try {
76
            return ($this->operation)($input);
9✔
77
        } finally {
78
            $suspension = array_shift($this->pending);
9✔
79
            if ($suspension !== null) {
9✔
80
                $suspension->resume();
4✔
81
            } else {
82
                foreach ($this->waits as $suspension) {
9✔
83
                    $suspension->resume();
1✔
84
                }
85

86
                $this->waits = [];
9✔
87
            }
88

89
            $this->ingoing--;
9✔
90
        }
91
    }
92

93
    /**
94
     * Cancel all pending operations.
95
     *
96
     * Any pending operation will fail with the given exception.
97
     *
98
     * Future operations will continue execution as usual.
99
     */
100
    public function cancel(Exception $exception): void
101
    {
102
        $suspensions = $this->pending;
2✔
103
        $this->pending = [];
2✔
104
        foreach ($suspensions as $suspension) {
2✔
105
            $suspension->throw($exception);
1✔
106
        }
107
    }
108

109
    /**
110
     * Get the concurrency limit of the semaphore.
111
     *
112
     * @return positive-int
113
     */
114
    public function getConcurrencyLimit(): int
115
    {
116
        return $this->concurrencyLimit;
1✔
117
    }
118

119
    /**
120
     * Get the number of operations pending execution.
121
     *
122
     * @return int<0, max>
123
     */
124
    public function getPendingOperations(): int
125
    {
126
        /** @var int<0, max> */
127
        return count($this->pending);
1✔
128
    }
129

130
    /**
131
     * Check if there's any operations pending execution.
132
     *
133
     * If this method returns `true`, it means the semaphore is full, future calls to `waitFor` will wait.
134
     */
135
    public function hasPendingOperations(): bool
136
    {
137
        return $this->getPendingOperations() > 0;
1✔
138
    }
139

140
    /**
141
     * Get the number of ingoing operations.
142
     *
143
     * The returned number will always be lower, or equal to the concurrency limit.
144
     *
145
     * @return int<0, max>
146
     */
147
    public function getIngoingOperations(): int
148
    {
149
        return $this->ingoing;
1✔
150
    }
151

152
    /**
153
     * Check if the semaphore has any ingoing operations.
154
     *
155
     * If this method returns `true`, it does not mean future calls to `waitFor` will wait, since a semaphore can have multiple ingoing operations
156
     * at the same time.
157
     */
158
    public function hasIngoingOperations(): bool
159
    {
160
        return $this->ingoing > 0;
1✔
161
    }
162

163
    /**
164
     * Wait for all pending operations to finish execution.
165
     */
166
    public function waitForPending(): void
167
    {
168
        if ($this->ingoing !== $this->concurrencyLimit) {
1✔
UNCOV
169
            return;
×
170
        }
171

172
        $suspension = EventLoop::getSuspension();
1✔
173
        $this->waits[] = $suspension;
1✔
174
        $suspension->suspend();
1✔
175
    }
176
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc