• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Automattic / stream-builder / 15542748463

09 Jun 2025 07:22PM UTC coverage: 80.223% (+0.7%) from 79.573%
15542748463

push

github

web-flow
Propagate inner stream is_exhaustive through recursive calls (#37)

Co-authored-by: Lesian Lengare <lesian.lengare@automattic.com>
Co-authored-by: Lucila Stancato <lucila@users.noreply.github.com>

17 of 18 new or added lines in 2 files covered. (94.44%)

1 existing line in 1 file now uncovered.

3018 of 3762 relevant lines covered (80.22%)

6.74 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.74
/lib/Tumblr/StreamBuilder/Streams/CursorlessFilteredStream.php
1
<?php
2
/**
3
 * The StreamBuilder framework.
4
 * Copyright 2023 Automattic, Inc.
5
 *
6
 * This program is free software; you can redistribute it and/or modify
7
 * it under the terms of the GNU General Public License as published by
8
 * the Free Software Foundation; either version 2 of the License, or
9
 * (at your option) any later version.
10
 *
11
 * This program is distributed in the hope that it will be useful,
12
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
 * GNU General Public License for more details.
15
 *
16
 * You should have received a copy of the GNU General Public License along
17
 * with this program; if not, write to the Free Software Foundation, Inc.,
18
 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19
 */
20

21
namespace Tumblr\StreamBuilder\Streams;
22

23
use Tumblr\StreamBuilder\EnumerationOptions\EnumerationOptions;
24
use Tumblr\StreamBuilder\Helpers;
25
use Tumblr\StreamBuilder\StreamBuilder;
26
use Tumblr\StreamBuilder\StreamContext;
27
use Tumblr\StreamBuilder\StreamCursors\StreamCursor;
28
use Tumblr\StreamBuilder\StreamElements\DerivedStreamElement;
29
use Tumblr\StreamBuilder\StreamElements\StreamElement;
30
use Tumblr\StreamBuilder\StreamFilters\StreamFilter;
31
use Tumblr\StreamBuilder\StreamResult;
32
use Tumblr\StreamBuilder\StreamTracers\StreamTracer;
33

34
/**
35
 * A FilteredStream without the cursor stuff. Useful for debugging!
36
 * It is itself a stream, which only enumerates items which pass the filter.
37
 * FilteredStreams support configurable backfill and over-fetching.
38
 */
39
final class CursorlessFilteredStream extends WrapStream
40
{
41
    /**
42
     * @var StreamFilter
43
     */
44
    private StreamFilter $filter;
45

46
    /**
47
     * @var int
48
     */
49
    private int $retry_count;
50

51
    /**
52
     * @var float
53
     */
54
    private float $overfetch_ratio;
55

56
    /**
57
     * @param Stream $inner The stream to filter.
58
     * @param StreamFilter $filter The filter to apply to the stream.
59
     * @param string $identity The string identifies the stream.
60
     * @param int|null $retry_count If fetching does not yield the requested number of elements
61
     * (after filtering), retry up to this many times to fetch more. The default value of 2
62
     * will therefore try a total of three times (two retries).
63
     * @param float|null $overfetch_ratio If you expect the filter has high selectivity, and the stream
64
     * is relatively cheap to over-enumerate, you can crank this up to preemptively over-fetch.
65
     * A value of zero means to not over-fetch, whereas a value of 1.0 means to fetch double the
66
     * number of results requested. No more than $count results will be returned.
67
     */
68
    public function __construct(
69
        Stream $inner,
70
        StreamFilter $filter,
71
        string $identity,
72
        ?int $retry_count = null,
73
        ?float $overfetch_ratio = null
74
    ) {
75
        if (is_null($retry_count)) {
4✔
76
            $retry_count = FilteredStream::DEFAULT_RETRY_COUNT;
2✔
77
        }
78
        if (is_null($overfetch_ratio)) {
4✔
79
            $overfetch_ratio = FilteredStream::DEFAULT_OVERFETCH_RATIO;
2✔
80
        }
81
        parent::__construct($inner, $identity);
4✔
82
        $this->filter = $filter;
4✔
83
        $this->retry_count = $retry_count;
4✔
84
        $this->overfetch_ratio = $overfetch_ratio;
4✔
85
    }
86

87
    /**
88
     * @inheritDoc
89
     */
90
    #[\Override]
91
    public function to_template(): array
92
    {
93
        return [
×
94
            '_type' => get_class($this),
×
95
            'stream' => $this->getInner()->to_template(),
×
96
            'stream_filter' => $this->filter->to_template(),
×
97
            'retry_count' => $this->retry_count,
×
98
            'overfetch_ratio' => $this->overfetch_ratio,
×
99
        ];
×
100
    }
101

102
    /**
103
     * @inheritDoc
104
     */
105
    #[\Override]
106
    public static function from_template(StreamContext $context): self
107
    {
108
        $stream = $context->deserialize_required_property('stream');
×
109
        $filter = $context->deserialize_required_property('stream_filter');
×
110

111
        return new self(
×
112
            $stream,
×
113
            $filter,
×
114
            $context->get_current_identity(),
×
115
            $context->get_optional_property('retry_count'),
×
116
            $context->get_optional_property('overfetch_ratio')
×
117
        );
×
118
    }
119

120
    /**
121
     * @inheritDoc
122
     */
123
    #[\Override]
124
    protected function _enumerate(
125
        int $count,
126
        ?StreamCursor $cursor = null,
127
        ?StreamTracer $tracer = null,
128
        ?EnumerationOptions $option = null
129
    ): StreamResult {
130
        return $this->_filter_rec($count, $cursor, $this->retry_count, $tracer, $option);
2✔
131
    }
132

133
    /**
134
     * Iteratively executed code block to fetch and filter results.
135
     * @param int $want_count How many elements are desired
136
     * @param StreamCursor|null $inner_cursor The cursor from which to fetch the inner stream.
137
     * @param int $depth The number of retries remaining.
138
     * @param StreamTracer|null $tracer The tracer traces filter process.
139
     * @param EnumerationOptions|null $option The option for enumeration
140
     * @param bool|null &$propagated_is_exhaustive Populated by reference during recursion. Carries the inner stream's
141
     *        is_exhaustive value from the deepest recursive call, allowing this method to report exhaustion only when
142
     *        the inner stream actually ran out-avoiding false-positives if the deepest filtered batch is empty
143
     * @return StreamResult
144
     */
145
    private function _filter_rec(
146
        int $want_count,
147
        ?StreamCursor $inner_cursor,
148
        int $depth,
149
        ?StreamTracer $tracer = null,
150
        ?EnumerationOptions $option = null,
151
        ?bool &$propagated_is_exhaustive = null
152
    ): StreamResult {
153
        $fetch_count = intval(ceil($want_count * (1.0 + max(0.0, $this->overfetch_ratio))));
2✔
154

155
        // get stuff from the inner stream:
156
        $inner_result = $this->getInner()->enumerate($fetch_count, $inner_cursor, $tracer, $option);
2✔
157
        $inner_combined_cursor = $inner_result->get_combined_cursor();
2✔
158

159
        // if we got nothing, abort. we can't recurse because the cursor would be unchanged:
160
        if ($inner_result->get_size() == 0) {
2✔
161
            $tracer && $tracer->filter_abort($this, $want_count, $inner_cursor, $depth);
×
NEW
162
            $propagated_is_exhaustive = true;
×
UNCOV
163
            return new StreamResult(true, []);
×
164
        }
165

166
        // we got some stuff, run the filter on it:
167
        $filter_result = $this->filter->filter($inner_result->get_elements(), null, $tracer);
2✔
168
        $tracer && $tracer->filter_apply($this, $inner_result->get_size(), $filter_result);
2✔
169

170
        // we only care about the retained elements
171
        $retained = array_map(function (StreamElement $e) {
2✔
172
            return new DerivedStreamElement($e, $this->get_identity(), $e->get_cursor());
2✔
173
        }, $filter_result->get_retained());
2✔
174

175
        // decide whether to terminate or retry:
176
        $inner_exhausted = $inner_result->is_exhaustive() || is_null($inner_combined_cursor);
2✔
177
        if ($depth <= 0 || count($retained) >= $want_count || $inner_exhausted) {
2✔
178
            // because of over-fetching, even if the inner result was exhaustive, we might not use all of it, so we need to
179
            // trust that flag only if we will return everything (i.e. it provided us no more than what we wanted):
180
            $is_exhaustive = $inner_exhausted && (count($retained) <= $want_count);
2✔
181
            $tracer && $tracer->filter_terminate($this, $inner_cursor, $depth, $is_exhaustive);
2✔
182

183
            // we're done, return all the elements.
184
            $propagated_is_exhaustive = $is_exhaustive;
2✔
185
            return new StreamResult($is_exhaustive, array_splice($retained, 0, $want_count));
2✔
186
        } else {
187
            // not done yet. need to figure out the next cursor and filter state with which to recurse!
188
            // the retry cursor needs to be based on this iteration, because streams need to make progress:
189
            $retry_cursor = StreamCursor::combine_all([$inner_cursor, $inner_combined_cursor]);
2✔
190

191
            $tracer && $tracer->filter_retry($this, $inner_cursor, $retry_cursor, $depth, $want_count, $inner_result->get_size(), count($retained));
2✔
192

193
            $rec_result = $this->_filter_rec(
2✔
194
                $want_count - count($retained),
2✔
195
                $retry_cursor,
2✔
196
                $depth - 1,
2✔
197
                $tracer,
2✔
198
                $option,
2✔
199
                $propagated_is_exhaustive
2✔
200
            );
2✔
201

202
            $result = array_merge($retained, $rec_result->get_elements());
2✔
203
            return new StreamResult($propagated_is_exhaustive, $result);
2✔
204
        }
205
    }
206

207
    /**
208
     * @inheritDoc
209
     */
210
    #[\Override]
211
    protected function can_enumerate_with_time_range(): bool
212
    {
213
        return $this->getInner()->can_enumerate_with_time_range();
×
214
    }
215

216
    /**
217
     * @param string $query_string Query.
218
     * @return void
219
     */
220
    public function setQueryString(string $query_string)
221
    {
222
        $inner_stream = $this->getInner();
2✔
223
        if (method_exists($inner_stream, 'setQueryString')) {
2✔
224
            $inner_stream->setQueryString($query_string);
1✔
225
        } else {
226
            StreamBuilder::getDependencyBag()
1✔
227
                ->getLog()
1✔
228
                ->warning('Trying to fetch posts from stream without setting the query string');
1✔
229
        }
230
    }
231
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc