• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

azjezz / psl / 23168991161

16 Mar 2026 10:30PM UTC coverage: 99.3% (-0.05%) from 99.347%
23168991161

push

github

web-flow
chore: remove internal circular dependencies, and reduce inter-dependencies (#648)

380 of 385 new or added lines in 157 files covered. (98.7%)

1 existing line in 1 file now uncovered.

10775 of 10851 relevant lines covered (99.3%)

37.35 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.44
/src/Psl/IO/streaming.php
1
<?php
2

3
declare(strict_types=1);
4

5
namespace Psl\IO;
6

7
use Generator;
8
use Psl;
9
use Psl\Async\CancellationTokenInterface;
10
use Psl\Async\Exception\CancelledException;
11
use Psl\Async\NullCancellationToken;
12
use Psl\Channel;
13
use Psl\Result;
14
use Revolt\EventLoop;
15

16
use function sprintf;
17

18
/**
19
 * Streaming the output of the given read stream handles using a generator.
20
 *
21
 * Example:
22
 *
23
 *  $handles = [
24
 *    'foo' => get_read_stream('foo'),
25
 *    'bar' => get_read_stream('bar'),
26
 *  ];
27
 *
28
 *  foreach(IO\streaming($handles) as $type => $chunk) {
29
 *    IO\write_line('received chunk "%s" from "%s" stream', $chunk, $type);
30
 *  }
31
 *
32
 * @template T of array-key
33
 *
34
 * @param iterable<T, ReadHandleInterface&StreamHandleInterface> $handles
35
 *
36
 * @throws Exception\AlreadyClosedException If one of the handles has been already closed.
37
 * @throws Exception\RuntimeException If an error occurred during the operation.
38
 * @throws CancelledException If the operation is cancelled.
39
 *
40
 * @return Generator<T, string, mixed, null>
41
 */
42
function streaming(iterable $handles, CancellationTokenInterface $cancellation = new NullCancellationToken()): Generator
43
{
44
    /**
45
     * @var Channel\ReceiverInterface<array{0: T|null, 1: Result\ResultInterface<string>}> $receiver
46
     * @var Channel\SenderInterface<array{0: T|null, 1: Result\ResultInterface<string>}> $sender
47
     */
48
    [$receiver, $sender] = Channel\unbounded();
55✔
49

50
    /** @var Psl\Ref<array<T, string>> $watchers */
51
    $watchers = new Psl\Ref([]);
55✔
52
    foreach ($handles as $index => $handle) {
55✔
53
        $stream = $handle->getStream();
55✔
54
        if (null === $stream) {
55✔
NEW
55
            throw new Exception\AlreadyClosedException(sprintf('Handle "%s" is already closed.', (string) $index));
×
56
        }
57

58
        // @mago-expect analysis:possibly-invalid-argument
59
        $watchers->value[$index] = EventLoop::onReadable($stream, static function (string $watcher) use (
55✔
60
            $index,
55✔
61
            $handle,
55✔
62
            $sender,
55✔
63
            $watchers,
55✔
64
        ): void {
55✔
65
            try {
66
                $result = Result\wrap($handle->tryRead(...));
51✔
67
                if ($result->isFailed() || $result->isSucceeded() && $result->getResult() === '') {
51✔
68
                    EventLoop::cancel($watcher);
45✔
69
                    unset($watchers->value[$index]);
45✔
70
                }
71

72
                $sender->send([$index, $result]);
51✔
73
            } finally {
74
                if ([] === $watchers->value) {
51✔
75
                    $sender->close();
51✔
76
                }
77
            }
78
        });
55✔
79
    }
80

81
    $cancellationSubscription = null;
55✔
82
    if ($cancellation->cancellable) {
55✔
83
        $cancellationSubscription = $cancellation->subscribe(static function (CancelledException $exception) use (
12✔
84
            $sender,
12✔
85
        ): void {
12✔
86
            /** @var Result\ResultInterface<string> $failure */
87
            $failure = new Result\Failure($exception);
10✔
88

89
            $sender->send([null, $failure]);
10✔
90
        });
12✔
91
    }
92

93
    try {
94
        while (true) {
55✔
95
            [$index, $result] = $receiver->receive();
55✔
96
            if (null === $index || $result->isFailed()) {
55✔
97
                throw $result->getThrowable();
10✔
98
            }
99

100
            yield $index => $result->getResult();
51✔
101
        }
102
    } catch (Channel\Exception\ClosedChannelException) {
55✔
103
        // completed.
104
        return;
45✔
105
    } finally {
106
        if (null !== $cancellationSubscription) {
55✔
107
            $cancellation->unsubscribe($cancellationSubscription);
12✔
108
        }
109

110
        foreach ($watchers->value as $watcher) {
55✔
111
            EventLoop::cancel($watcher);
10✔
112
        }
113
    }
114
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc