• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

azjezz / psl / 12713643030

10 Jan 2025 04:47PM UTC coverage: 98.483% (-0.2%) from 98.691%
12713643030

push

github

web-flow
chore: switch to mago (#501)

* chore: switch from php-cs-fixer and phpcs to mago

Signed-off-by: azjezz <azjezz@protonmail.com>

682 of 699 new or added lines in 176 files covered. (97.57%)

3 existing lines in 2 files now uncovered.

5322 of 5404 relevant lines covered (98.48%)

50.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

69.05
/src/Psl/IO/streaming.php
1
<?php
2

3
declare(strict_types=1);
4

5
namespace Psl\IO;
6

7
use Generator;
8
use Psl;
9
use Psl\Channel;
10
use Psl\DateTime\Duration;
11
use Psl\Result;
12
use Psl\Str;
13
use Revolt\EventLoop;
14

15
use function max;
16

17
/**
18
 * Streaming the output of the given read stream handles using a generator.
19
 *
20
 * Example:
21
 *
22
 *  $handles = [
23
 *    'foo' => get_read_stream('foo'),
24
 *    'bar' => get_read_stream('bar'),
25
 *  ];
26
 *
27
 *  foreach(IO\streaming($handles) as $type => $chunk) {
28
 *    IO\write_line('received chunk "%s" from "%s" stream', $chunk, $type);
29
 *  }
30
 *
31
 * @template T of array-key
32
 *
33
 * @param iterable<T, ReadStreamHandleInterface> $handles
34
 *
35
 * @throws Exception\AlreadyClosedException If one of the handles has been already closed.
36
 * @throws Exception\RuntimeException If an error occurred during the operation.
37
 * @throws Exception\TimeoutException If $timeout is reached before being able to read all the handles until the end.
38
 *
39
 * @return Generator<T, string, mixed, null>
40
 */
41
function streaming(iterable $handles, null|Duration $timeout = null): Generator
42
{
43
    /**
44
     * @psalm-suppress UnnecessaryVarAnnotation
45
     *
46
     * @var Channel\ReceiverInterface<array{0: T|null, 1: Result\ResultInterface<string>}> $receiver
47
     * @var Channel\SenderInterface<array{0: T|null, 1: Result\ResultInterface<string>}> $sender
48
     */
49
    [$receiver, $sender] = Channel\unbounded();
20✔
50

51
    /** @var Psl\Ref<array<T, string>> $watchers */
52
    $watchers = new Psl\Ref([]);
20✔
53
    foreach ($handles as $index => $handle) {
20✔
54
        $stream = $handle->getStream();
20✔
55
        if ($stream === null) {
20✔
56
            throw new Exception\AlreadyClosedException(Str\format('Handle "%s" is already closed.', (string) $index));
×
57
        }
58

59
        $watchers->value[$index] = EventLoop::onReadable($stream, static function (string $watcher) use (
20✔
60
            $index,
20✔
61
            $handle,
20✔
62
            $sender,
20✔
63
            $watchers,
20✔
64
        ): void {
20✔
65
            try {
66
                $result = Result\wrap($handle->tryRead(...));
20✔
67
                if ($result->isFailed() || $result->isSucceeded() && $result->getResult() === '') {
20✔
68
                    EventLoop::cancel($watcher);
20✔
69
                    unset($watchers->value[$index]);
20✔
70
                }
71

72
                $sender->send([$index, $result]);
20✔
73
            } finally {
74
                if ($watchers->value === []) {
20✔
75
                    $sender->close();
20✔
76
                }
77
            }
78
        });
20✔
79
    }
80

81
    $timeout_watcher = null;
20✔
82
    if ($timeout !== null) {
20✔
83
        $timeout = max($timeout->getTotalSeconds(), 0.0);
×
84

85
        $timeout_watcher = EventLoop::delay($timeout, static function () use ($sender): void {
×
86
            /** @var Result\ResultInterface<string> $failure */
87
            $failure = new Result\Failure(
×
NEW
88
                new Exception\TimeoutException(
×
NEW
89
                    'Reached timeout before being able to read all the handles until the end.',
×
NEW
90
                ),
×
UNCOV
91
            );
×
92

93
            $sender->send([null, $failure]);
×
94
        });
×
95
    }
96

97
    try {
98
        while (true) {
20✔
99
            [$index, $result] = $receiver->receive();
20✔
100
            if (null === $index || $result->isFailed()) {
20✔
101
                throw $result->getThrowable();
×
102
            }
103

104
            yield $index => $result->getResult();
20✔
105
        }
106
    } catch (Channel\Exception\ClosedChannelException) {
20✔
107
        // completed.
108
        return;
20✔
109
    } finally {
110
        if ($timeout_watcher !== null) {
20✔
111
            EventLoop::cancel($timeout_watcher);
×
112
        }
113

114
        foreach ($watchers->value as $watcher) {
20✔
115
            EventLoop::cancel($watcher);
×
116
        }
117
    }
118
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc