• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Automattic / stream-builder / 24049140942

06 Apr 2026 08:14PM UTC coverage: 80.255% (+0.05%) from 80.208%
24049140942

Pull #43

github

web-flow
Merge 59f2531de into e3fc00812
Pull Request #43: fix an issue when a full page is filtered out

3 of 6 new or added lines in 1 file covered. (50.0%)

2 existing lines in 1 file now uncovered.

3020 of 3763 relevant lines covered (80.26%)

6.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.21
/lib/Tumblr/StreamBuilder/Streams/CursorlessFilteredStream.php
1
<?php
2
/**
3
 * The StreamBuilder framework.
4
 * Copyright 2023 Automattic, Inc.
5
 *
6
 * This program is free software; you can redistribute it and/or modify
7
 * it under the terms of the GNU General Public License as published by
8
 * the Free Software Foundation; either version 2 of the License, or
9
 * (at your option) any later version.
10
 *
11
 * This program is distributed in the hope that it will be useful,
12
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
 * GNU General Public License for more details.
15
 *
16
 * You should have received a copy of the GNU General Public License along
17
 * with this program; if not, write to the Free Software Foundation, Inc.,
18
 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19
 */
20

21
namespace Tumblr\StreamBuilder\Streams;
22

23
use Tumblr\StreamBuilder\EnumerationOptions\EnumerationOptions;
24
use Tumblr\StreamBuilder\Helpers;
25
use Tumblr\StreamBuilder\StreamBuilder;
26
use Tumblr\StreamBuilder\StreamContext;
27
use Tumblr\StreamBuilder\StreamCursors\StreamCursor;
28
use Tumblr\StreamBuilder\StreamElements\DerivedStreamElement;
29
use Tumblr\StreamBuilder\StreamElements\StreamElement;
30
use Tumblr\StreamBuilder\StreamFilters\StreamFilter;
31
use Tumblr\StreamBuilder\StreamResult;
32
use Tumblr\StreamBuilder\StreamTracers\StreamTracer;
33

34
/**
35
 * A FilteredStream without the cursor stuff. Useful for debugging!
36
 * It is itself a stream, which only enumerates items which pass the filter.
37
 * FilteredStreams support configurable backfill and over-fetching.
38
 */
39
final class CursorlessFilteredStream extends WrapStream
40
{
41
    /**
42
     * @var StreamFilter
43
     */
44
    private StreamFilter $filter;
45

46
    /**
47
     * @var int
48
     */
49
    private int $retry_count;
50

51
    /**
52
     * @var float
53
     */
54
    private float $overfetch_ratio;
55

56
    /** @var bool */
57
    private bool $skip_empty_pages;
58

59
    /**
60
     * @param Stream $inner The stream to filter.
61
     * @param StreamFilter $filter The filter to apply to the stream.
62
     * @param string $identity The string identifies the stream.
63
     * @param int|null $retry_count If fetching does not yield the requested number of elements
64
     * (after filtering), retry up to this many times to fetch more. The default value of 2
65
     * will therefore try a total of three times (two retries).
66
     * @param float|null $overfetch_ratio If you expect the filter has high selectivity, and the stream
67
     * is relatively cheap to over-enumerate, you can crank this up to preemptively over-fetch.
68
     * A value of zero means to not over-fetch, whereas a value of 1.0 means to fetch double the
69
     * number of results requested. No more than $count results will be returned.
70
     * @param bool|null $skip_empty_pages If true (default), fully-filtered pages do not count
71
     * against the retry budget, allowing the stream to scan past long runs of filtered content.
72
     * Set to `false` to count every inner enumeration as a retry.
73
     */
74
    public function __construct(
75
        Stream $inner,
76
        StreamFilter $filter,
77
        string $identity,
78
        ?int $retry_count = null,
79
        ?float $overfetch_ratio = null,
80
        ?bool $skip_empty_pages = true
81
    ) {
82
        if (is_null($retry_count)) {
7✔
83
            $retry_count = FilteredStream::DEFAULT_RETRY_COUNT;
2✔
84
        }
85
        if (is_null($overfetch_ratio)) {
7✔
86
            $overfetch_ratio = FilteredStream::DEFAULT_OVERFETCH_RATIO;
2✔
87
        }
88
        parent::__construct($inner, $identity);
7✔
89
        $this->filter = $filter;
7✔
90
        $this->retry_count = $retry_count;
7✔
91
        $this->overfetch_ratio = $overfetch_ratio;
7✔
92
        $this->skip_empty_pages = $skip_empty_pages ?? true;
7✔
93
    }
94

95
    /**
96
     * @inheritDoc
97
     */
98
    #[\Override]
99
    public function to_template(): array
100
    {
101
        return [
×
102
            '_type' => get_class($this),
×
103
            'stream' => $this->getInner()->to_template(),
×
104
            'stream_filter' => $this->filter->to_template(),
×
105
            'retry_count' => $this->retry_count,
×
106
            'overfetch_ratio' => $this->overfetch_ratio,
×
NEW
107
            'skip_empty_pages' => $this->skip_empty_pages,
×
UNCOV
108
        ];
×
109
    }
110

111
    /**
112
     * @inheritDoc
113
     */
114
    #[\Override]
115
    public static function from_template(StreamContext $context): self
116
    {
117
        $stream = $context->deserialize_required_property('stream');
×
118
        $filter = $context->deserialize_required_property('stream_filter');
×
119

120
        return new self(
×
121
            $stream,
×
122
            $filter,
×
123
            $context->get_current_identity(),
×
124
            $context->get_optional_property('retry_count'),
×
NEW
125
            $context->get_optional_property('overfetch_ratio'),
×
NEW
126
            $context->get_optional_property('skip_empty_pages')
×
UNCOV
127
        );
×
128
    }
129

130
    /**
131
     * @inheritDoc
132
     */
133
    #[\Override]
134
    protected function _enumerate(
135
        int $count,
136
        ?StreamCursor $cursor = null,
137
        ?StreamTracer $tracer = null,
138
        ?EnumerationOptions $option = null
139
    ): StreamResult {
140
        return $this->_filter_rec($count, $cursor, $this->retry_count, $tracer, $option);
5✔
141
    }
142

143
    /**
144
     * Iteratively executed code block to fetch and filter results.
145
     * @param int $want_count How many elements are desired
146
     * @param StreamCursor|null $inner_cursor The cursor from which to fetch the inner stream.
147
     * @param int $depth The number of retries remaining.
148
     * @param StreamTracer|null $tracer The tracer traces filter process.
149
     * @param EnumerationOptions|null $option The option for enumeration
150
     * @param bool|null &$propagated_is_exhaustive Populated by reference during recursion. Carries the inner stream's
151
     *        is_exhaustive value from the deepest recursive call, allowing this method to report exhaustion only when
152
     *        the inner stream actually ran out-avoiding false-positives if the deepest filtered batch is empty
153
     * @return StreamResult
154
     */
155
    private function _filter_rec(
156
        int $want_count,
157
        ?StreamCursor $inner_cursor,
158
        int $depth,
159
        ?StreamTracer $tracer = null,
160
        ?EnumerationOptions $option = null,
161
        ?bool &$propagated_is_exhaustive = null
162
    ): StreamResult {
163
        $fetch_count = intval(ceil($want_count * (1.0 + max(0.0, $this->overfetch_ratio))));
5✔
164

165
        // get stuff from the inner stream:
166
        $inner_result = $this->getInner()->enumerate($fetch_count, $inner_cursor, $tracer, $option);
5✔
167
        $inner_combined_cursor = $inner_result->get_combined_cursor();
5✔
168

169
        // if we got nothing, abort. we can't recurse because the cursor would be unchanged:
170
        if ($inner_result->get_size() == 0) {
5✔
171
            $tracer && $tracer->filter_abort($this, $want_count, $inner_cursor, $depth);
1✔
172
            $propagated_is_exhaustive = true;
1✔
173
            return new StreamResult(true, []);
1✔
174
        }
175

176
        // we got some stuff, run the filter on it:
177
        $filter_result = $this->filter->filter($inner_result->get_elements(), null, $tracer);
5✔
178
        $tracer && $tracer->filter_apply($this, $inner_result->get_size(), $filter_result);
5✔
179

180
        // we only care about the retained elements
181
        $retained = array_map(function (StreamElement $e) {
5✔
182
            return new DerivedStreamElement($e, $this->get_identity(), $e->get_cursor());
3✔
183
        }, $filter_result->get_retained());
5✔
184

185
        // decide whether to terminate or retry:
186
        $inner_exhausted = $inner_result->is_exhaustive() || is_null($inner_combined_cursor);
5✔
187
        if ($depth <= 0 || count($retained) >= $want_count || $inner_exhausted) {
5✔
188
            // because of over-fetching, even if the inner result was exhaustive, we might not use all of it, so we need to
189
            // trust that flag only if we will return everything (i.e. it provided us no more than what we wanted):
190
            $is_exhaustive = $inner_exhausted && (count($retained) <= $want_count);
4✔
191
            $tracer && $tracer->filter_terminate($this, $inner_cursor, $depth, $is_exhaustive);
4✔
192

193
            // we're done, return all the elements.
194
            $propagated_is_exhaustive = $is_exhaustive;
4✔
195
            return new StreamResult($is_exhaustive, array_splice($retained, 0, $want_count));
4✔
196
        } else {
197
            // not done yet. need to figure out the next cursor and filter state with which to recurse!
198
            // the retry cursor needs to be based on this iteration, because streams need to make progress:
199
            $retry_cursor = StreamCursor::combine_all([$inner_cursor, $inner_combined_cursor]);
5✔
200

201
            $tracer && $tracer->filter_retry($this, $inner_cursor, $retry_cursor, $depth, $want_count, $inner_result->get_size(), count($retained));
5✔
202

203
            // Don't count fully-filtered pages against retry budget
204
            $new_depth = count($retained) === 0 && $this->skip_empty_pages ? $depth : $depth - 1;
5✔
205

206
            $rec_result = $this->_filter_rec(
5✔
207
                $want_count - count($retained),
5✔
208
                $retry_cursor,
5✔
209
                $new_depth,
5✔
210
                $tracer,
5✔
211
                $option,
5✔
212
                $propagated_is_exhaustive
5✔
213
            );
5✔
214

215
            $result = array_merge($retained, $rec_result->get_elements());
5✔
216
            return new StreamResult($propagated_is_exhaustive, $result);
5✔
217
        }
218
    }
219

220
    /**
221
     * @inheritDoc
222
     */
223
    #[\Override]
224
    protected function can_enumerate_with_time_range(): bool
225
    {
226
        return $this->getInner()->can_enumerate_with_time_range();
×
227
    }
228

229
    /**
230
     * @param string $query_string Query.
231
     * @return void
232
     */
233
    public function setQueryString(string $query_string)
234
    {
235
        $inner_stream = $this->getInner();
2✔
236
        if (method_exists($inner_stream, 'setQueryString')) {
2✔
237
            $inner_stream->setQueryString($query_string);
1✔
238
        } else {
239
            StreamBuilder::getDependencyBag()
1✔
240
                ->getLog()
1✔
241
                ->warning('Trying to fetch posts from stream without setting the query string');
1✔
242
        }
243
    }
244
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc