• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

sanmai / pipeline / 15340407640

30 May 2025 06:08AM UTC coverage: 98.558% (-1.4%) from 100.0%
15340407640

Pull #182

github

web-flow
Merge e418831b8 into 0700248fa
Pull Request #182: Add stream() to ensure lazy processing

0 of 6 new or added lines in 1 file covered. (0.0%)

2 existing lines in 1 file now uncovered.

410 of 416 relevant lines covered (98.56%)

403.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.33
/src/Standard.php
1
<?php
2

3
/**
4
 * Copyright 2017, 2018 Alexey Kopytko <alexey@kopytko.com>
5
 *
6
 * Licensed under the Apache License, Version 2.0 (the "License");
7
 * you may not use this file except in compliance with the License.
8
 * You may obtain a copy of the License at
9
 *
10
 * http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing, software
13
 * distributed under the License is distributed on an "AS IS" BASIS,
14
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15
 * See the License for the specific language governing permissions and
16
 * limitations under the License.
17
 */
18

19
declare(strict_types=1);
20

21
namespace Pipeline;
22

23
use ArrayIterator;
24
use CallbackFilterIterator;
25
use Countable;
26
use EmptyIterator;
27
use Generator;
28
use Iterator;
29
use IteratorAggregate;
30
use Traversable;
31
use Override;
32

33
use function array_chunk;
34
use function array_filter;
35
use function array_flip;
36
use function array_map;
37
use function array_merge;
38
use function array_reduce;
39
use function array_shift;
40
use function array_slice;
41
use function array_values;
42
use function assert;
43
use function count;
44
use function is_array;
45
use function is_string;
46
use function iterator_count;
47
use function iterator_to_array;
48
use function max;
49
use function min;
50
use function mt_getrandmax;
51
use function mt_rand;
52
use function array_keys;
53

54
/**
55
 * Concrete pipeline with sensible default callbacks.
56
 *
57
 * @template-implements IteratorAggregate<mixed, mixed>
58
 */
59
class Standard implements IteratorAggregate, Countable
60
{
61
    /**
62
     * Pre-primed pipeline.
63
     *
64
     * This is not a full `iterable` per se because we exclude IteratorAggregate before assigning a value.
65
     */
66
    private iterable $pipeline;
67

68
    /**
69
     * Constructor with an optional source of data.
70
     */
71
    public function __construct(?iterable $input = null)
72
    {
73
        if (null !== $input) {
8,636✔
74
            $this->replace($input);
6,476✔
75
        }
76
    }
77

78
    private function replace(iterable $input): void
79
    {
80
        if (is_array($input)) {
6,500✔
81
            $this->pipeline = $input;
2,984✔
82

83
            return;
2,984✔
84
        }
85

86
        // IteratorAggregate is a nuance best we avoid dealing with.
87
        // For example, CallbackFilterIterator needs a plain Iterator.
88
        while ($input instanceof IteratorAggregate) {
3,532✔
89
            $input = $input->getIterator();
68✔
90
        }
91

92
        $this->pipeline = $input;
3,532✔
93
    }
94

95
    /**
96
     * @psalm-suppress TypeDoesNotContainType
97
     */
98
    private function empty(): bool
99
    {
100
        return !isset($this->pipeline) || [] === $this->pipeline;
8,616✔
101
    }
102

103
    /**
104
     * @phan-suppress PhanTypeObjectUnsetDeclaredProperty
105
     */
106
    private function discard(): void
107
    {
108
        unset($this->pipeline);
760✔
109
    }
110

111
    /**
112
     * Appends the contents of an interable to the end of the pipeline.
113
     */
114
    public function append(?iterable $values = null): self
115
    {
116
        // Do we need to do anything here?
117
        if ($this->willReplace($values)) {
128✔
118
            return $this;
40✔
119
        }
120

121
        // Static analyzer hints
122
        assert(null !== $values);
123

124
        return $this->join($this->pipeline, $values);
120✔
125
    }
126

127
    /**
128
     * Appends a list of values to the end of the pipeline.
129
     *
130
     * @param mixed ...$vector
131
     */
132
    public function push(...$vector): self
133
    {
134
        return $this->append($vector);
32✔
135
    }
136

137
    /**
138
     * Prepends the pipeline with the contents of an iterable.
139
     */
140
    public function prepend(?iterable $values = null): self
141
    {
142
        // Do we need to do anything here?
143
        if ($this->willReplace($values)) {
128✔
144
            return $this;
40✔
145
        }
146

147
        // Static analyzer hints
148
        assert(null !== $values);
149

150
        return $this->join($values, $this->pipeline);
120✔
151
    }
152

153
    /**
154
     * Prepends the pipeline with a list of values.
155
     *
156
     * @param mixed ...$vector
157
     */
158
    public function unshift(...$vector): self
159
    {
160
        return $this->prepend($vector);
32✔
161
    }
162

163
    /**
164
     * Determine if the internal pipeline will be replaced when appending/prepending.
165
     *
166
     * Utility method for appending/prepending methods.
167
     */
168
    private function willReplace(?iterable $values = null): bool
169
    {
170
        // Nothing needs to be done here.
171
        /** @phan-suppress-next-line PhanTypeComparisonFromArray */
172
        if (null === $values || [] === $values) {
256✔
173
            return true;
24✔
174
        }
175

176
        // No shortcuts are applicable if the pipeline was initialized.
177
        if (!$this->empty()) {
256✔
178
            return false;
240✔
179
        }
180

181
        // Handle edge cases there
182
        $this->replace($values);
72✔
183

184
        return true;
72✔
185
    }
186

187
    /**
188
     * Replace the internal pipeline with a combination of two non-empty iterables, array-optimized.
189
     *
190
     * Utility method for appending/prepending methods.
191
     */
192
    private function join(iterable $left, iterable $right): self
193
    {
194
        // We got two arrays, that's what we will use.
195
        if (is_array($left) && is_array($right)) {
240✔
196
            $this->pipeline = array_merge($left, $right);
112✔
197

198
            return $this;
112✔
199
        }
200

201
        // Last, join the hard way.
202
        $this->pipeline = self::joinYield($left, $right);
128✔
203

204
        return $this;
128✔
205
    }
206

207
    /**
208
     * Replace the internal pipeline with a combination of two non-empty iterables, generator-way.
209
     */
210
    private static function joinYield(iterable $left, iterable $right): iterable
211
    {
212
        yield from $left;
128✔
213
        yield from $right;
128✔
214
    }
215

216
    /**
217
     * Flattens inputs: arrays become lists.
218
     *
219
     * @return $this
220
     */
221
    public function flatten(): self
222
    {
223
        return $this->map(static function (iterable $args = []) {
28✔
224
            yield from $args;
20✔
225
        });
28✔
226
    }
227

228
    /**
229
     * An extra variant of `map` which unpacks arrays into arguments. Flattens inputs if no callback provided.
230
     *
231
     * @param ?callable $func
232
     *
233
     * @return $this
234
     */
235
    public function unpack(?callable $func = null): self
236
    {
237
        if (null === $func) {
56✔
238
            return $this->flatten();
24✔
239
        }
240

241
        return $this->map(static function (iterable $args = []) use ($func) {
32✔
242
            /** @psalm-suppress InvalidArgument */
243
            return $func(...$args);
32✔
244
        });
32✔
245
    }
246

247
    /**
248
     * Chunks the pipeline into arrays with length elements. The last chunk may contain less than length elements.
249
     *
250
     * @param int<1, max> $length        the size of each chunk
251
     * @param bool        $preserve_keys When set to true keys will be preserved. Default is false which will reindex the chunk numerically.
252
     *
253
     * @return $this
254
     */
255
    public function chunk(int $length, bool $preserve_keys = false): self
256
    {
257
        // No-op: an empty array or null.
258
        if ($this->empty()) {
100✔
259
            return $this;
8✔
260
        }
261

262
        // Array shortcut
263
        if (is_array($this->pipeline)) {
92✔
264
            $this->pipeline = array_chunk($this->pipeline, $length, $preserve_keys);
20✔
265

266
            return $this;
20✔
267
        }
268

269
        $this->pipeline = self::toChunks(
72✔
270
            self::makeNonRewindable($this->pipeline),
72✔
271
            $length,
72✔
272
            $preserve_keys
72✔
273
        );
72✔
274

275
        return $this;
72✔
276
    }
277

278
    /**
279
     * @psalm-param positive-int $length
280
     */
281
    private static function toChunks(Generator $input, int $length, bool $preserve_keys): iterable
282
    {
283
        while ($input->valid()) {
72✔
284
            yield iterator_to_array(self::take($input, $length), $preserve_keys);
60✔
285
        }
286
    }
287

288
    /**
289
     * Takes a callback that for each input value may return one or yield many. Also takes an initial generator, where it must not require any arguments.
290
     *
291
     * With no callback is a no-op (can safely take a null).
292
     *
293
     * @param ?callable $func a callback must either return a value or yield values (return a generator)
294
     *
295
     * @return $this
296
     */
297
    public function map(?callable $func = null): self
298
    {
299
        if (null === $func) {
2,168✔
300
            return $this;
4✔
301
        }
302

303
        // That's the standard case for any next stages.
304
        if (isset($this->pipeline)) {
2,168✔
305
            $this->pipeline = self::apply($this->pipeline, $func);
184✔
306

307
            return $this;
184✔
308
        }
309

310
        // Let's check what we got for a start.
311
        $value = $func();
2,064✔
312

313
        // Generator is a generator, moving along
314
        if ($value instanceof Generator) {
2,052✔
315
            // It is possible to detect if callback is a generator like so:
316
            // (new \ReflectionFunction($func))->isGenerator();
317
            // Yet this will restrict users from replacing the pipeline and has unknown performance impact.
318
            // But, again, we could add a direct internal method to replace the pipeline, e.g. as done by unpack()
319
            $this->pipeline = $value;
2,008✔
320

321
            return $this;
2,008✔
322
        }
323

324
        // Not a generator means we were given a simple value to be treated as an array.
325
        // We do not cast to an array here because casting a null to an array results in
326
        // an empty array; that's surprising and not how it works for other values.
327
        $this->pipeline = [
44✔
328
            $value,
44✔
329
        ];
44✔
330

331
        return $this;
44✔
332
    }
333

334
    private static function apply(iterable $previous, callable $func): iterable
335
    {
336
        foreach ($previous as $key => $value) {
164✔
337
            $result = $func($value);
152✔
338

339
            // For generators we use keys they provide
340
            if ($result instanceof Generator) {
148✔
341
                yield from $result;
72✔
342

343
                continue;
72✔
344
            }
345

346
            // In case of a plain old mapping function we use the original key
347
            yield $key => $result;
88✔
348
        }
349
    }
350

351
    /**
352
     * Takes a callback that for each input value expected to return another single value. Unlike map(), it assumes no special treatment for generators.
353
     *
354
     * With no callback is a no-op (can safely take a null).
355
     *
356
     * @param ?callable $func a callback must return a value
357
     *
358
     * @psalm-suppress RedundantCondition
359
     *
360
     * @return $this
361
     */
362
    public function cast(?callable $func = null): self
363
    {
364
        if (null === $func) {
44✔
365
            return $this;
4✔
366
        }
367

368
        // We got an array, that's what we need. Moving along.
369
        if (isset($this->pipeline) && is_array($this->pipeline)) {
40✔
370
            $this->pipeline = array_map($func, $this->pipeline);
12✔
371

372
            return $this;
12✔
373
        }
374

375
        if (isset($this->pipeline)) {
28✔
376
            $this->pipeline = self::applyOnce($this->pipeline, $func);
24✔
377

378
            return $this;
24✔
379
        }
380

381
        // Else get the seed value.
382
        // We do not cast to an array here because casting a null to an array results in
383
        // an empty array; that's surprising and not how it works for other values.
384
        $this->pipeline = [
4✔
385
            $func(),
4✔
386
        ];
4✔
387

388
        return $this;
4✔
389
    }
390

391
    private static function applyOnce(iterable $previous, callable $func): iterable
392
    {
393
        foreach ($previous as $key => $value) {
24✔
394
            yield $key => $func($value);
24✔
395
        }
396
    }
397

398
    /**
399
     * Removes elements unless a callback returns true.
400
     *
401
     * With no callback drops all null and false values (not unlike array_filter does by default).
402
     *
403
     * @param ?callable $func
404
     *
405
     * @return $this
406
     */
407
    public function filter(?callable $func = null): self
408
    {
409
        // No-op: an empty array or null.
410
        if ($this->empty()) {
116✔
411
            return $this;
12✔
412
        }
413

414
        $func = self::resolvePredicate($func);
112✔
415

416
        // We got an array, that's what we need. Moving along.
417
        if (is_array($this->pipeline)) {
112✔
418
            $this->pipeline = array_filter($this->pipeline, $func);
36✔
419

420
            return $this;
36✔
421
        }
422

423
        assert($this->pipeline instanceof Iterator);
424

425
        /** @psalm-suppress ArgumentTypeCoercion */
426
        $this->pipeline = new CallbackFilterIterator($this->pipeline, $func);
76✔
427

428
        return $this;
76✔
429
    }
430

431
    /**
432
     * Resolves a nullable predicate into a sensible non-null callable.
433
     */
434
    private static function resolvePredicate(?callable $func): callable
435
    {
436
        if (null === $func) {
112✔
437
            return static function ($value) {
60✔
438
                // Cast is unnecessary for non-stict filtering
439
                return $value;
36✔
440
            };
60✔
441
        }
442

443
        // Strings usually are internal functions, which typically require exactly one parameter.
444
        if (is_string($func)) {
56✔
445
            return static function ($value) use ($func) {
8✔
446
                return $func($value);
8✔
447
            };
8✔
448
        }
449

450
        return $func;
48✔
451
    }
452

453
    /**
454
     * Skips elements while the predicate returns true, and keeps everything after the predicate return false just once.
455
     *
456
     * @param callable $predicate a callback returning boolean value
457
     */
458
    public function skipWhile(callable $predicate): self
459
    {
460
        // No-op: an empty array or null.
461
        if ($this->empty()) {
16✔
462
            return $this;
4✔
463
        }
464

465
        $predicate = self::resolvePredicate($predicate);
12✔
466

467
        $this->filter(static function ($value) use ($predicate): bool {
12✔
468
            static $done = false;
12✔
469

470
            if ($predicate($value) && !$done) {
12✔
471
                return false;
8✔
472
            }
473

474
            $done = true;
12✔
475

476
            return true;
12✔
477
        });
12✔
478

479
        return $this;
12✔
480
    }
481

482
    /**
483
     * Reduces input values to a single value. Defaults to summation. This is a terminal operation.
484
     *
485
     * @template T
486
     *
487
     * @param ?callable $func    function (mixed $carry, mixed $item) { must return updated $carry }
488
     * @param T         $initial The initial initial value for a $carry
489
     *
490
     * @return int|T
491
     */
492
    public function reduce(?callable $func = null, $initial = null)
493
    {
494
        return $this->fold($initial ?? 0, $func);
84✔
495
    }
496

497
    /**
498
     * Reduces input values to a single value. Defaults to summation. Requires an initial value. This is a terminal operation.
499
     *
500
     * @template T
501
     *
502
     * @param T         $initial initial value for a $carry
503
     * @param ?callable $func    function (mixed $carry, mixed $item) { must return updated $carry }
504
     *
505
     * @return T
506
     */
507
    public function fold($initial, ?callable $func = null)
508
    {
509
        if ($this->empty()) {
88✔
510
            return $initial;
4✔
511
        }
512

513
        $func = self::resolveReducer($func);
84✔
514

515
        if (is_array($this->pipeline)) {
84✔
516
            return array_reduce($this->pipeline, $func, $initial);
20✔
517
        }
518

519
        foreach ($this->pipeline as $value) {
64✔
520
            $initial = $func($initial, $value);
60✔
521
        }
522

523
        return $initial;
64✔
524
    }
525

526
    /**
527
     * Resolves a nullable reducer into a sensible callable.
528
     */
529
    private static function resolveReducer(?callable $func): callable
530
    {
531
        if (null !== $func) {
84✔
532
            return $func;
4✔
533
        }
534

535
        return static function ($carry, $item) {
80✔
536
            $carry += $item;
76✔
537

538
            return $carry;
76✔
539
        };
80✔
540
    }
541

542
    #[Override]
543
    public function getIterator(): Traversable
544
    {
545
        if (!isset($this->pipeline)) {
2,240✔
546
            return new EmptyIterator();
4✔
547
        }
548

549
        if ($this->pipeline instanceof Traversable) {
2,236✔
550
            return $this->pipeline;
2,220✔
551
        }
552

553
        return new ArrayIterator($this->pipeline);
84✔
554
    }
555

556
    /**
557
     * By default returns all values regardless of keys used, discarding all keys in the process. Has an option to keep the keys. This is a terminal operation.
558
     */
559
    public function toArray(bool $preserve_keys = false): array
560
    {
561
        // No-op: an empty array or null.
562
        if ($this->empty()) {
4,672✔
563
            return [];
1,420✔
564
        }
565

566
        // We got what we need, moving along.
567
        if (is_array($this->pipeline)) {
3,256✔
568
            if ($preserve_keys) {
1,100✔
569
                return $this->pipeline;
504✔
570
            }
571

572
            return array_values($this->pipeline);
600✔
573
        }
574

575
        // Because `yield from` does not reset keys we have to ignore them on export by default to return every item.
576
        // http://php.net/manual/en/language.generators.syntax.php#control-structures.yield.from
577
        return iterator_to_array($this, $preserve_keys);
2,156✔
578
    }
579

580
    /**
581
     * Returns all values preserving keys. This is a terminal operation.
582
     * @deprecated Use toAssoc() instead
583
     */
584
    public function toArrayPreservingKeys(): array
585
    {
586
        return $this->toArray(true);
4✔
587
    }
588

589
    /**
590
     * Returns all values preserving keys. This is a terminal operation.
591
     */
592
    public function toAssoc(): array
593
    {
594
        return $this->toArray(preserve_keys: true);
12✔
595
    }
596

597
    /**
598
     * Counts seen values online.
599
     *
600
     * @param ?int &$count the current count; initialized unless provided
601
     *
602
     * @param-out int $count
603
     *
604
     * @return $this
605
     */
606
    public function runningCount(
607
        ?int &$count
608
    ): self {
609
        $count ??= 0;
12✔
610

611
        $this->cast(static function ($input) use (&$count) {
12✔
612
            ++$count;
12✔
613

614
            return $input;
12✔
615
        });
12✔
616

617
        return $this;
12✔
618
    }
619

620
    /**
621
     * {@inheritdoc}
622
     *
623
     * This is a terminal operation.
624
     *
625
     * @see \Countable::count()
626
     */
627
    #[Override]
628
    public function count(): int
629
    {
630
        if ($this->empty()) {
32✔
631
            // With non-primed pipeline just return zero.
632
            return 0;
12✔
633
        }
634

635
        if (is_array($this->pipeline)) {
20✔
636
            return count($this->pipeline);
8✔
637
        }
638

639
        return iterator_count($this->pipeline);
12✔
640
    }
641

642
    /**
643
     * @return $this
644
     */
645
    public function stream()
646
    {
NEW
647
        if ($this->empty()) {
×
NEW
648
            return $this;
×
649
        }
650

NEW
651
        if ($this->pipeline instanceof Generator) {
×
652
            // If the pipeline is a generator, we can just return it.
NEW
653
            return $this;
×
654
        }
655

NEW
UNCOV
656
        $this->pipeline = self::generatorFromIterable($this->pipeline);
×
657

NEW
UNCOV
658
        return $this;
×
659
    }
660

661
    private static function makeNonRewindable(iterable $input): Generator
662
    {
663
        if ($input instanceof Generator) {
4,488✔
664
            return $input;
2,884✔
665
        }
666

667
        return self::generatorFromIterable($input);
1,604✔
668
    }
669

670
    private static function generatorFromIterable(iterable $input): Generator
671
    {
672
        yield from $input;
1,604✔
673
    }
674

675
    /**
676
     * Extracts a slice from the inputs. Keys are not discarded intentionally.
677
     *
678
     * @see \array_slice()
679
     *
680
     * @param int  $offset If offset is non-negative, the sequence will start at that offset. If offset is negative, the sequence will start that far from the end.
681
     * @param ?int $length If length is given and is positive, then the sequence will have up to that many elements in it. If length is given and is negative then the sequence will stop that many elements from the end.
682
     *
683
     * @return $this
684
     */
685
    public function slice(int $offset, ?int $length = null)
686
    {
687
        if ($this->empty()) {
3,540✔
688
            // With non-primed pipeline just move along.
689
            return $this;
200✔
690
        }
691

692
        if (0 === $length) {
3,340✔
693
            // We're not consuming anything assuming total laziness.
694
            $this->discard();
688✔
695

696
            return $this;
688✔
697
        }
698

699
        // Shortcut to array_slice() for actual arrays.
700
        if (is_array($this->pipeline)) {
2,652✔
701
            $this->pipeline = array_slice($this->pipeline, $offset, $length, true);
1,240✔
702

703
            return $this;
1,240✔
704
        }
705

706
        $this->pipeline = self::makeNonRewindable($this->pipeline);
1,412✔
707

708
        if ($offset < 0) {
1,412✔
709
            // If offset is negative, the sequence will start that far from the end of the array.
710
            $this->pipeline = self::tail($this->pipeline, -$offset);
596✔
711
        }
712

713
        if ($offset > 0) {
1,412✔
714
            // If offset is non-negative, the sequence will start at that offset in the array.
715
            $this->pipeline = self::skip($this->pipeline, $offset);
452✔
716
        }
717

718
        if ($length < 0) {
1,412✔
719
            // If length is given and is negative then the sequence will stop that many elements from the end of the array.
720
            $this->pipeline = self::head($this->pipeline, -$length);
660✔
721
        }
722

723
        if ($length > 0) {
1,412✔
724
            // If length is given and is positive, then the sequence will have up to that many elements in it.
725
            $this->pipeline = self::take($this->pipeline, $length);
452✔
726
        }
727

728
        return $this;
1,412✔
729
    }
730

731
    /**
732
     * @psalm-param positive-int $skip
733
     */
734
    private static function skip(Iterator $input, int $skip): Generator
735
    {
736
        // Consume until seen enough.
737
        foreach ($input as $_) {
452✔
738
            /** @psalm-suppress DocblockTypeContradiction */
739
            if (0 === $skip--) {
404✔
740
                break;
212✔
741
            }
742
        }
743

744
        // Avoid yielding from an exhausted generator. Gives error:
745
        // Generator passed to yield from was aborted without proper return and is unable to continue
746
        if (!$input->valid()) {
452✔
747
            return;
252✔
748
        }
749

750
        yield from $input;
212✔
751
    }
752

753
    /**
754
     * @psalm-param positive-int $take
755
     */
756
    private static function take(Generator $input, int $take): Generator
757
    {
758
        while ($input->valid()) {
668✔
759
            yield $input->key() => $input->current();
556✔
760
            $input->next();
556✔
761

762
            // Stop once taken enough.
763
            if (0 === --$take) {
556✔
764
                break;
332✔
765
            }
766
        }
767
    }
768

769
    private static function tail(iterable $input, int $length): Generator
770
    {
771
        $buffer = [];
596✔
772

773
        foreach ($input as $key => $value) {
596✔
774
            if (count($buffer) < $length) {
532✔
775
                // Read at most N records.
776
                $buffer[] = [$key, $value];
532✔
777

778
                continue;
532✔
779
            }
780

781
            // Remove and add one record each time.
782
            array_shift($buffer);
316✔
783
            $buffer[] = [$key, $value];
316✔
784
        }
785

786
        foreach ($buffer as [$key, $value]) {
596✔
787
            yield $key => $value;
532✔
788
        }
789
    }
790

791
    /**
792
     * Allocates a buffer of $length, and reads records into it, proceeding with FIFO when buffer is full.
793
     */
794
    private static function head(iterable $input, int $length): Generator
795
    {
796
        $buffer = [];
660✔
797

798
        foreach ($input as $key => $value) {
660✔
799
            $buffer[] = [$key, $value];
524✔
800

801
            if (count($buffer) > $length) {
524✔
802
                [$key, $value] = array_shift($buffer);
260✔
803
                yield $key => $value;
260✔
804
            }
805
        }
806
    }
807

808
    /**
809
     * Performs a lazy zip operation on iterables, not unlike that of
810
     * array_map with first argument set to null. Also known as transposition.
811
     *
812
     * @return $this
813
     */
814
    public function zip(iterable ...$inputs)
815
    {
816
        if ([] === $inputs) {
52✔
817
            return $this;
4✔
818
        }
819

820
        if (!isset($this->pipeline)) {
48✔
821
            $input = array_shift($inputs);
12✔
822
            /** @var iterable $input */
823
            $this->pipeline = $input;
12✔
824
        }
825

826
        if ([] === $inputs) {
48✔
827
            return $this;
8✔
828
        }
829

830
        $this->map(static function ($item): array {
40✔
831
            return [$item];
36✔
832
        });
40✔
833

834
        foreach (self::toIterators(...$inputs) as $iterator) {
40✔
835
            // MultipleIterator won't work here because it'll stop at first invalid iterator.
836
            $this->map(static function (array $current) use ($iterator) {
40✔
837
                if (!$iterator->valid()) {
36✔
838
                    $current[] = null;
8✔
839

840
                    return $current;
8✔
841
                }
842

843
                $current[] = $iterator->current();
36✔
844
                $iterator->next();
36✔
845

846
                return $current;
36✔
847
            });
40✔
848
        }
849

850
        return $this;
40✔
851
    }
852

853
    /**
854
     * @return Iterator[]
855
     */
856
    private static function toIterators(iterable ...$inputs): array
857
    {
858
        return array_map(static function (iterable $input): Iterator {
40✔
859
            while ($input instanceof IteratorAggregate) {
40✔
860
                $input = $input->getIterator();
12✔
861
            }
862

863
            if ($input instanceof Iterator) {
40✔
864
                return $input;
20✔
865
            }
866

867
            // IteratorAggregate and Iterator are out of picture, which leaves... an array.
868

869
            /** @var array $input */
870
            return new ArrayIterator($input);
24✔
871
        }, $inputs);
40✔
872
    }
873

874
    /**
875
     * Reservoir sampling method with an optional weighting function. Uses the most optimal algorithm.
876
     *
877
     * @see https://en.wikipedia.org/wiki/Reservoir_sampling
878
     *
879
     * @param int       $size       The desired sample size
880
     * @param ?callable $weightFunc The optional weighting function
881
     */
882
    public function reservoir(int $size, ?callable $weightFunc = null): array
883
    {
884
        if ($this->empty()) {
208✔
885
            return [];
4✔
886
        }
887

888
        if ($size <= 0) {
204✔
889
            // Discard the state to emulate full consumption
890
            $this->discard();
48✔
891

892
            return [];
48✔
893
        }
894

895
        // Algorithms below assume inputs are non-rewindable
896
        $this->pipeline = self::makeNonRewindable($this->pipeline);
156✔
897

898
        $result = null === $weightFunc ?
156✔
899
            self::reservoirRandom($this->pipeline, $size) :
96✔
900
            self::reservoirWeighted($this->pipeline, $size, $weightFunc);
60✔
901

902
        return iterator_to_array($result, true);
156✔
903
    }
904

905
    private static function drainValues(Generator $input): Generator
906
    {
907
        while ($input->valid()) {
108✔
908
            yield $input->current();
108✔
909
            // @infection-ignore-all
910
            $input->next();
108✔
911
        }
912
    }
913

914
    /**
915
     * Simple and slow algorithm, commonly known as Algorithm R.
916
     *
917
     * @see https://en.wikipedia.org/wiki/Reservoir_sampling#Simple_algorithm
918
     *
919
     * @psalm-param positive-int $size
920
     */
921
    private static function reservoirRandom(Generator $input, int $size): Generator
922
    {
923
        // Take an initial sample (AKA fill the reservoir array)
924
        foreach (self::take($input, $size) as $output) {
96✔
925
            yield $output;
96✔
926
        }
927

928
        // Return if there's nothing more to fetch
929
        if (!$input->valid()) {
96✔
930
            return;
24✔
931
        }
932

933
        $counter = $size;
72✔
934

935
        // Produce replacement elements with gradually decreasing probability
936
        foreach (self::drainValues($input) as $value) {
72✔
937
            $key = mt_rand(0, $counter);
72✔
938

939
            if ($key < $size) {
72✔
940
                yield $key => $value;
60✔
941
            }
942

943
            ++$counter;
72✔
944
        }
945
    }
946

947
    /**
948
     * Weighted random sampling.
949
     *
950
     * @see https://en.wikipedia.org/wiki/Reservoir_sampling#Algorithm_A-Chao
951
     *
952
     * @psalm-param positive-int $size
953
     */
954
    private static function reservoirWeighted(Generator $input, int $size, callable $weightFunc): Generator
955
    {
956
        $sum = 0.0;
60✔
957

958
        // Take an initial sample (AKA fill the reservoir array)
959
        foreach (self::take($input, $size) as $output) {
60✔
960
            yield $output;
60✔
961
            $sum += $weightFunc($output);
60✔
962
        }
963

964
        // Return if there's nothing more to fetch
965
        if (!$input->valid()) {
60✔
966
            return;
24✔
967
        }
968

969
        foreach (self::drainValues($input) as $value) {
36✔
970
            $weight = $weightFunc($value);
36✔
971
            $sum += $weight;
36✔
972

973
            // probability for this item
974
            $probability = $weight / $sum;
36✔
975

976
            // @infection-ignore-all
977
            if (self::random() <= $probability) {
36✔
978
                yield mt_rand(0, $size - 1) => $value;
36✔
979
            }
980
        }
981
    }
982

983
    /**
984
     * Returns a pseudorandom value between zero (inclusive) and one (exclusive).
985
     *
986
     * @infection-ignore-all
987
     */
988
    private static function random(): float
989
    {
990
        return mt_rand(0, mt_getrandmax() - 1) / mt_getrandmax();
36✔
991
    }
992

993
    /**
994
     * Find lowest value using the standard comparison rules. Returns null for empty sequences.
995
     *
996
     * @return null|mixed
997
     */
998
    public function min()
999
    {
1000
        if ($this->empty()) {
1,784✔
1001
            return null;
8✔
1002
        }
1003

1004
        if (is_array($this->pipeline)) {
1,776✔
1005
            /** @psalm-suppress ArgumentTypeCoercion */
1006
            return min($this->pipeline);
352✔
1007
        }
1008

1009
        $this->pipeline = self::makeNonRewindable($this->pipeline);
1,424✔
1010

1011
        $min = null;
1,424✔
1012

1013
        foreach ($this->pipeline as $min) {
1,424✔
1014
            break;
1,408✔
1015
        }
1016

1017
        // Return if there's nothing more to fetch
1018
        if (!$this->pipeline->valid()) {
1,424✔
1019
            return $min;
16✔
1020
        }
1021

1022
        foreach ($this->pipeline as $value) {
1,408✔
1023
            if ($value < $min) {
1,408✔
1024
                $min = $value;
800✔
1025
            }
1026
        }
1027

1028
        return $min;
1,408✔
1029
    }
1030

1031
    /**
1032
     * Find highest value using the standard comparison rules. Returns null for empty sequences.
1033
     *
1034
     * @return null|mixed
1035
     */
1036
    public function max()
1037
    {
1038
        if ($this->empty()) {
1,784✔
1039
            return null;
8✔
1040
        }
1041

1042
        if (is_array($this->pipeline)) {
1,776✔
1043
            /** @psalm-suppress ArgumentTypeCoercion */
1044
            return max($this->pipeline);
352✔
1045
        }
1046

1047
        $this->pipeline = self::makeNonRewindable($this->pipeline);
1,424✔
1048

1049
        // Everything is greater than null
1050
        $max = null;
1,424✔
1051

1052
        foreach ($this->pipeline as $value) {
1,424✔
1053
            if ($value > $max) {
1,408✔
1054
                $max = $value;
1,376✔
1055
            }
1056
        }
1057

1058
        return $max;
1,424✔
1059
    }
1060

1061
    /**
1062
     * @return $this
1063
     */
1064
    public function values()
1065
    {
1066
        if ($this->empty()) {
16✔
1067
            // No-op: null.
1068
            return $this;
4✔
1069
        }
1070

1071
        if (is_array($this->pipeline)) {
12✔
1072
            $this->pipeline = array_values($this->pipeline);
4✔
1073

1074
            return $this;
4✔
1075
        }
1076

1077
        $this->pipeline = self::valuesOnly($this->pipeline);
8✔
1078

1079
        return $this;
8✔
1080
    }
1081

1082
    private static function valuesOnly(iterable $previous): iterable
1083
    {
1084
        foreach ($previous as $value) {
8✔
1085
            yield $value;
8✔
1086
        }
1087
    }
1088

1089
    /**
1090
     * @return $this
1091
     */
1092
    public function keys()
1093
    {
1094
        if ($this->empty()) {
16✔
1095
            // No-op: null.
1096
            return $this;
4✔
1097
        }
1098

1099
        if (is_array($this->pipeline)) {
12✔
1100
            $this->pipeline = array_keys($this->pipeline);
4✔
1101

1102
            return $this;
4✔
1103
        }
1104

1105
        $this->pipeline = self::keysOnly($this->pipeline);
8✔
1106

1107
        return $this;
8✔
1108
    }
1109

1110
    private static function keysOnly(iterable $previous): iterable
1111
    {
1112
        foreach ($previous as $key => $_) {
8✔
1113
            yield $key;
8✔
1114
        }
1115
    }
1116

1117
    /**
1118
     * @return $this
1119
     */
1120
    public function flip()
1121
    {
1122
        if ($this->empty()) {
392✔
1123
            // No-op: null.
1124
            return $this;
8✔
1125
        }
1126

1127
        if (is_array($this->pipeline)) {
384✔
1128
            $this->pipeline = array_flip($this->pipeline);
72✔
1129

1130
            return $this;
72✔
1131
        }
1132

1133
        $this->pipeline = self::flipKeysAndValues($this->pipeline);
312✔
1134

1135
        return $this;
312✔
1136
    }
1137

1138
    private static function flipKeysAndValues(iterable $previous): iterable
1139
    {
1140
        foreach ($previous as $key => $value) {
312✔
1141
            yield $value => $key;
296✔
1142
        }
1143
    }
1144

1145
    /**
1146
     * @return $this
1147
     */
1148
    public function tuples()
1149
    {
1150
        if ($this->empty()) {
120✔
1151
            // No-op: null.
1152
            return $this;
8✔
1153
        }
1154

1155
        if (is_array($this->pipeline)) {
112✔
1156
            $this->pipeline = array_map(
24✔
1157
                fn($key, $value) => [$key, $value],
24✔
1158
                array_keys($this->pipeline),
24✔
1159
                $this->pipeline
24✔
1160
            );
24✔
1161

1162
            return $this;
24✔
1163
        }
1164

1165

1166
        $this->pipeline = self::toTuples($this->pipeline);
88✔
1167

1168
        return $this;
88✔
1169
    }
1170

1171
    private static function toTuples(iterable $previous): iterable
1172
    {
1173
        foreach ($previous as $key => $value) {
88✔
1174
            yield [$key, $value];
76✔
1175
        }
1176
    }
1177

1178
    private function feedRunningVariance(Helper\RunningVariance $variance, ?callable $castFunc): self
1179
    {
1180
        if (null === $castFunc) {
36✔
1181
            $castFunc = 'floatval';
20✔
1182
        }
1183

1184
        return $this->cast(static function ($value) use ($variance, $castFunc) {
36✔
1185
            $float = $castFunc($value);
36✔
1186

1187
            if (null !== $float) {
36✔
1188
                $variance->observe($float);
36✔
1189
            }
1190

1191
            // Returning the original value here
1192
            return $value;
36✔
1193
        });
36✔
1194
    }
1195

1196
    /**
1197
     * Feeds in an instance of RunningVariance.
1198
     *
1199
     * @param ?Helper\RunningVariance &$variance the instance of RunningVariance; initialized unless provided
1200
     * @param ?callable               $castFunc  the cast callback, returning ?float; null values are not counted
1201
     *
1202
     * @param-out Helper\RunningVariance $variance
1203
     *
1204
     * @return $this
1205
     */
1206
    public function runningVariance(
1207
        ?Helper\RunningVariance &$variance,
1208
        ?callable $castFunc = null
1209
    ): self {
1210
        $variance ??= new Helper\RunningVariance();
16✔
1211

1212
        $this->feedRunningVariance($variance, $castFunc);
16✔
1213

1214
        return $this;
16✔
1215
    }
1216

1217
    /**
1218
     * Computes final statistics for the sequence.
1219
     *
1220
     * @param ?callable               $castFunc the cast callback, returning ?float; null values are not counted
1221
     * @param ?Helper\RunningVariance $variance the optional instance of RunningVariance
1222
     */
1223
    public function finalVariance(
1224
        ?callable $castFunc = null,
1225
        ?Helper\RunningVariance $variance = null
1226
    ): Helper\RunningVariance {
1227
        $variance ??= new Helper\RunningVariance();
28✔
1228

1229
        if ($this->empty()) {
28✔
1230
            // No-op: an empty array.
1231
            return $variance;
8✔
1232
        }
1233

1234
        $this->feedRunningVariance($variance, $castFunc);
20✔
1235

1236
        if (is_array($this->pipeline)) {
20✔
1237
            // We are done!
1238
            return $variance;
8✔
1239
        }
1240

1241
        // Consume every available item
1242
        $_ = iterator_count($this->pipeline);
12✔
1243

1244
        return $variance;
12✔
1245
    }
1246

1247
    /**
1248
     * Eagerly iterates over the sequence using the provided callback. Discards the sequence after iteration.
1249
     *
1250
     * @param callable $func
1251
     * @param bool $discard Whenever to discard the pipeline's iterator.
1252
     */
1253
    public function each(callable $func, bool $discard = true): void
1254
    {
1255
        if ($this->empty()) {
32✔
1256
            return;
16✔
1257
        }
1258

1259
        try {
1260
            foreach ($this->pipeline as $key => $value) {
24✔
1261
                $func($value, $key);
20✔
1262
            }
1263
        } finally {
1264
            if ($discard) {
24✔
1265
                $this->discard();
24✔
1266
            }
1267
        }
1268
    }
1269
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc