• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

azjezz / psl / 17432557268

03 Sep 2025 11:52AM UTC coverage: 98.446% (-0.07%) from 98.513%
17432557268

push

github

web-flow
chore: migrate from `psalm` to `mago` (#527)

232 of 241 new or added lines in 81 files covered. (96.27%)

14 existing lines in 12 files now uncovered.

5510 of 5597 relevant lines covered (98.45%)

52.23 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.5
/src/Psl/Async/KeyedSequence.php
1
<?php
2

3
declare(strict_types=1);
4

5
namespace Psl\Async;
6

7
use Closure;
8
use Exception;
9
use Revolt\EventLoop;
10
use Revolt\EventLoop\Suspension;
11

12
use function array_key_exists;
13
use function array_shift;
14
use function count;
15

16
/**
17
 * Run an operation with a limit on number of ongoing asynchronous jobs of 1.
18
 *
19
 * Just like {@see KeyedSemaphore}, all operations must have the same input type (Tin) and output type (Tout), and be processed by the same function;
20
 *
21
 * @template Tk of array-key
22
 * @template Tin
23
 * @template Tout
24
 *
25
 * @see KeyedSemaphore
26
 *
27
 * @mago-expect lint:no-else-clause
28
 */
29
final class KeyedSequence
30
{
31
    /**
32
     * @var array<Tk, bool>
33
     */
34
    private array $ingoing = [];
35

36
    /**
37
     * @var array<Tk, list<Suspension>>
38
     */
39
    private array $pending = [];
40

41
    /**
42
     * @var array<Tk, list<Suspension>>
43
     */
44
    private array $waits = [];
45

46
    /**
47
     * @param (Closure(Tk, Tin): Tout) $operation
48
     */
49
    public function __construct(
50
        private readonly Closure $operation,
51
    ) {}
10✔
52

53
    /**
54
     * Run the operation using the given `$input`, after all previous operations have completed.
55
     *
56
     * @param Tk $key
57
     * @param Tin $input
58
     *
59
     * @return Tout
60
     *
61
     * @see Sequence::cancel()
62
     */
63
    public function waitFor(string|int $key, mixed $input): mixed
64
    {
65
        if (array_key_exists($key, $this->ingoing)) {
10✔
66
            $suspension = EventLoop::getSuspension();
6✔
67
            $this->pending[$key][] = $suspension;
6✔
68
            $suspension->suspend();
6✔
69
        }
70

71
        $this->ingoing[$key] = true;
10✔
72

73
        try {
74
            return ($this->operation)($key, $input);
10✔
75
        } finally {
76
            $this->pending[$key] ??= [];
10✔
77
            $suspension = array_shift($this->pending[$key]);
10✔
78
            if ($this->pending[$key] === []) {
10✔
79
                unset($this->pending[$key]);
10✔
80
            }
81

82
            if ($suspension !== null) {
10✔
83
                $suspension->resume();
4✔
84
            } else {
85
                foreach ($this->waits[$key] ?? [] as $suspension) {
10✔
86
                    $suspension->resume();
1✔
87
                }
88

89
                unset($this->waits[$key], $this->ingoing[$key]);
10✔
90
            }
91
        }
92
    }
93

94
    /**
95
     * Cancel pending operations for the given key.
96
     *
97
     * Any pending operation will fail with the given exception.
98
     *
99
     * Future operations will continue execution as usual.
100
     *
101
     * @param Tk $key
102
     */
103
    public function cancel(string|int $key, Exception $exception): void
104
    {
105
        $suspensions = $this->pending[$key] ?? [];
1✔
106
        unset($this->pending[$key]);
1✔
107
        foreach ($suspensions as $suspension) {
1✔
108
            $suspension->throw($exception);
1✔
109
        }
110
    }
111

112
    /**
113
     * Cancel all pending operations.
114
     *
115
     * Pending operation will fail with the given exception.
116
     *
117
     * Future operations will continue execution as usual.
118
     */
119
    public function cancelAll(Exception $exception): void
120
    {
121
        $pending = $this->pending;
2✔
122
        $this->pending = [];
2✔
123
        foreach ($pending as $suspensions) {
2✔
124
            foreach ($suspensions as $suspension) {
1✔
125
                $suspension->throw($exception);
1✔
126
            }
127
        }
128
    }
129

130
    /**
131
     * Get the number of operations pending execution for the given key.
132
     *
133
     * @param Tk $key
134
     *
135
     * @return int<0, max>
136
     */
137
    public function getPendingOperations(string|int $key): int
138
    {
139
        /** @var int<0, max> */
140
        return count($this->pending[$key] ?? []);
1✔
141
    }
142

143
    /**
144
     * Get the number of total operations pending execution.
145
     *
146
     * @return int<0, max>
147
     */
148
    public function getTotalPendingOperations(): int
149
    {
150
        $count = 0;
1✔
151
        foreach ($this->pending as $suspensions) {
1✔
152
            $count += count($suspensions);
1✔
153
        }
154

155
        /** @var int<0, max> */
156
        return $count;
1✔
157
    }
158

159
    /**
160
     * Check if there's any operations pending execution for the given key.
161
     *
162
     * If this method returns `true`, it means the sequence is busy, future calls to `waitFor` will wait.
163
     *
164
     * @param Tk $key
165
     */
166
    public function hasPendingOperations(string|int $key): bool
167
    {
168
        return array_key_exists($key, $this->pending);
2✔
169
    }
170

171
    /**
172
     * Check if there's any operations pending execution.
173
     */
174
    public function hasAnyPendingOperations(): bool
175
    {
176
        return $this->pending !== [];
1✔
177
    }
178

179
    /**
180
     * Check if there's an ingoing operation for the given key.
181
     *
182
     * If this method returns `true`, it means the sequence is busy, future calls to `waitFor` will wait.
183
     * If this method returns `false`, it means the sequence is not busy, future calls to `waitFor` will execute immediately.
184
     *
185
     * @param Tk $key
186
     */
187
    public function hasIngoingOperations(string|int $key): bool
188
    {
189
        return array_key_exists($key, $this->ingoing);
2✔
190
    }
191

192
    /**
193
     * Check if the sequence has any ingoing operations.
194
     */
195
    public function hasAnyIngoingOperations(): bool
196
    {
197
        return $this->ingoing !== [];
1✔
198
    }
199

200
    /**
201
     * Get the number of total ingoing operations.
202
     *
203
     * @return int<0, max>
204
     */
205
    public function getTotalIngoingOperations(): int
206
    {
207
        /** @var int<0, max> */
208
        return count($this->ingoing);
1✔
209
    }
210

211
    /**
212
     * Wait for all pending operations associated with the given key to finish execution.
213
     *
214
     * If the sequence does not have any ingoing operations for the given key, this method will return immediately.
215
     *
216
     * @param Tk $key
217
     */
218
    public function waitForPending(string|int $key): void
219
    {
220
        if (!array_key_exists($key, $this->ingoing)) {
1✔
UNCOV
221
            return;
×
222
        }
223

224
        $suspension = EventLoop::getSuspension();
1✔
225
        $this->waits[$key][] = $suspension;
1✔
226
        $suspension->suspend();
1✔
227
    }
228
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc