• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 4594239788

pending completion
4594239788

Pull #772

github

GitHub
Merge 93719774d into 75538c8bc
Pull Request #772: bencher

134 of 134 new or added lines in 1 file covered. (100.0%)

96125 of 107670 relevant lines covered (89.28%)

72821.51 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.65
/operators/src/util/stream_zip/vec_zip.rs
1
use std::pin::Pin;
2

3
use futures::stream::{Fuse, FusedStream};
4
use futures::task::{Context, Poll};
5
use futures::{Stream, StreamExt};
6
use pin_project::pin_project;
7

8
/// An adapter for merging the outputs of multiple streams.
9
///
10
/// The merged stream produces items from one to all of the underlying
11
/// streams as they become available. Errors, however, are not merged: you
12
/// get at most one error at a time.
13
#[derive(Debug)]
×
14
#[pin_project(project = StreamArrayZipProjection)]
47✔
15
pub struct StreamArrayZip<St: Stream, const N: usize> {
16
    #[pin]
17
    streams: [Fuse<St>; N],
18
    values: [Option<St::Item>; N],
19
}
20

21
/// An adapter for merging the outputs of multiple streams.
22
///
23
/// The merged stream produces items from one to all of the underlying
24
/// streams as they become available. Errors, however, are not merged: you
25
/// get at most one error at a time.
26
#[derive(Debug)]
×
27
#[pin_project(project = StreamVectorZipProjection)]
12✔
28
pub struct StreamVectorZip<St: Stream> {
29
    #[pin]
30
    streams: Vec<Fuse<St>>,
31
    values: Vec<Option<St::Item>>,
32
}
33

34
impl<St: Stream, const N: usize> StreamArrayZip<St, N> {
35
    /// Creates a new stream zip.
36
    ///
37
    /// # Panics
38
    /// Panics if `streams` is empty.
39
    pub fn new(streams: [St; N]) -> Self {
13✔
40
        assert!(!streams.is_empty());
13✔
41

42
        Self {
13✔
43
            streams: streams.map(StreamExt::fuse),
13✔
44
            values: Self::array_of_none(),
13✔
45
        }
13✔
46
    }
13✔
47

48
    /// Since `St::Item` is not copy, we cannot use `[None; N]`
49
    #[inline]
50
    fn array_of_none() -> [Option<St::Item>; N] {
41✔
51
        [(); N].map(|_| None)
96✔
52
    }
41✔
53
}
54

55
impl<St: Stream> StreamVectorZip<St> {
56
    /// Creates a new stream zip.
57
    ///
58
    /// # Panics
59
    /// Panics if `streams` is empty.
60
    pub fn new(streams: Vec<St>) -> Self {
3✔
61
        assert!(!streams.is_empty());
3✔
62

63
        Self {
3✔
64
            values: Self::vec_of_none(streams.len()),
3✔
65
            streams: streams.into_iter().map(StreamExt::fuse).collect(),
3✔
66
        }
3✔
67
    }
3✔
68

69
    /// Since `St::Item` is not copy, we cannot use `vec![None; N]`
70
    #[inline]
71
    fn vec_of_none(len: usize) -> Vec<Option<St::Item>> {
3✔
72
        (0..len).map(|_| None).collect()
8✔
73
    }
3✔
74
}
75

76
impl<St, const N: usize> Stream for StreamArrayZip<St, N>
77
where
78
    St: Stream + Unpin,
79
{
80
    type Item = [St::Item; N];
81

82
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
47✔
83
        let mut this = self.project();
47✔
84

85
        for (stream, value) in this.streams.as_mut().iter_mut().zip(this.values.as_mut()) {
158✔
86
            if value.is_none() {
158✔
87
                match Pin::new(stream).poll_next(cx) {
127✔
88
                    Poll::Ready(Some(item)) => *value = Some(item),
64✔
89
                    Poll::Ready(None) | Poll::Pending => {}
63✔
90
                }
91
            }
31✔
92
        }
93

94
        if this.values.iter().all(Option::is_some) {
47✔
95
            let values: [Option<St::Item>; N] =
28✔
96
                std::mem::replace(this.values, Self::array_of_none());
28✔
97
            let tuple: [St::Item; N] = values.map(Option::unwrap);
28✔
98

28✔
99
            Poll::Ready(Some(tuple))
28✔
100
        } else if this.streams.iter().any(FusedStream::is_terminated) {
19✔
101
            Poll::Ready(None)
6✔
102
        } else {
103
            Poll::Pending
13✔
104
        }
105
    }
47✔
106

107
    fn size_hint(&self) -> (usize, Option<usize>) {
1✔
108
        let mut streams_lower = usize::MAX;
1✔
109
        let mut streams_upper = None::<usize>;
1✔
110

111
        for (stream, value) in self.streams.iter().zip(&self.values) {
2✔
112
            let value_len = usize::from(value.is_some());
2✔
113

2✔
114
            let (lower, upper) = stream.size_hint();
2✔
115

2✔
116
            streams_lower = streams_lower.min(lower.saturating_add(value_len));
2✔
117

2✔
118
            streams_upper = match (streams_upper, upper) {
2✔
119
                (Some(x), Some(y)) => Some(x.min(y.saturating_add(value_len))),
1✔
120
                (Some(x), None) => Some(x),
×
121
                (None, Some(y)) => y.checked_add(value_len),
1✔
122
                (None, None) => None,
×
123
            };
124
        }
125

126
        (streams_lower, streams_upper)
1✔
127
    }
1✔
128
}
129

130
impl<St> Stream for StreamVectorZip<St>
131
where
132
    St: Stream + Unpin,
133
{
134
    type Item = Vec<St::Item>;
135

136
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
12✔
137
        let mut this = self.project();
12✔
138

139
        for (stream, value) in this.streams.as_mut().iter_mut().zip(this.values.iter_mut()) {
32✔
140
            if value.is_none() {
32✔
141
                match Pin::new(stream).poll_next(cx) {
32✔
142
                    Poll::Ready(Some(item)) => *value = Some(item),
16✔
143
                    Poll::Ready(None) | Poll::Pending => {}
16✔
144
                }
145
            }
×
146
        }
147

148
        if this.values.iter().all(Option::is_some) {
12✔
149
            let tuple: Vec<St::Item> = this
6✔
150
                .values
6✔
151
                .iter_mut()
6✔
152
                .map(|o| o.take().expect("checked in if-condition"))
15✔
153
                .collect();
6✔
154

6✔
155
            Poll::Ready(Some(tuple))
6✔
156
        } else if this.streams.iter().any(FusedStream::is_terminated) {
6✔
157
            Poll::Ready(None)
3✔
158
        } else {
159
            Poll::Pending
3✔
160
        }
161
    }
12✔
162

163
    fn size_hint(&self) -> (usize, Option<usize>) {
1✔
164
        let mut streams_lower = usize::MAX;
1✔
165
        let mut streams_upper = None::<usize>;
1✔
166

167
        for (stream, value) in self.streams.iter().zip(&self.values) {
2✔
168
            let value_len = usize::from(value.is_some());
2✔
169

2✔
170
            let (lower, upper) = stream.size_hint();
2✔
171

2✔
172
            streams_lower = streams_lower.min(lower.saturating_add(value_len));
2✔
173

2✔
174
            streams_upper = match (streams_upper, upper) {
2✔
175
                (Some(x), Some(y)) => Some(x.min(y.saturating_add(value_len))),
1✔
176
                (Some(x), None) => Some(x),
×
177
                (None, Some(y)) => y.checked_add(value_len),
1✔
178
                (None, None) => None,
×
179
            };
180
        }
181

182
        (streams_lower, streams_upper)
1✔
183
    }
1✔
184
}
185

186
impl<St, const N: usize> FusedStream for StreamArrayZip<St, N>
187
where
188
    St: Stream + Unpin,
189
{
190
    fn is_terminated(&self) -> bool {
×
191
        self.streams.iter().all(FusedStream::is_terminated)
×
192
    }
×
193
}
194

195
impl<St> FusedStream for StreamVectorZip<St>
196
where
197
    St: Stream + Unpin,
198
{
199
    fn is_terminated(&self) -> bool {
×
200
        self.streams.iter().all(FusedStream::is_terminated)
×
201
    }
×
202
}
203

204
#[cfg(test)]
205
mod tests {
206
    use super::*;
207

208
    use async_stream::stream;
209
    use futures::stream::BoxStream;
210
    use futures::StreamExt;
211

212
    #[tokio::test]
1✔
213
    async fn concurrent_stream() {
1✔
214
        let st1 = stream! {
1✔
215
            for i in 1..=3 {
4✔
216
                tokio::time::sleep(std::time::Duration::from_millis(100)).await;
3✔
217
                yield i;
3✔
218
            }
219
        };
220

221
        let st2 = stream! {
1✔
222
            for i in 1..=3 {
4✔
223
                tokio::time::sleep(std::time::Duration::from_millis(100)).await;
3✔
224
                yield i * 10;
3✔
225
            }
226
        };
227

228
        let st1: BoxStream<'static, u32> = Box::pin(st1);
1✔
229
        let st2: BoxStream<'static, u32> = Box::pin(st2);
1✔
230

1✔
231
        let st_all = StreamArrayZip::new([st1, st2]);
1✔
232

1✔
233
        let start = std::time::Instant::now();
1✔
234

235
        let values: Vec<[u32; 2]> = st_all.collect().await;
3✔
236

237
        assert!(start.elapsed() < std::time::Duration::from_millis(500));
1✔
238

239
        assert_eq!(values, [[1, 10], [2, 20], [3, 30]]);
1✔
240
    }
241

242
    #[tokio::test]
1✔
243
    async fn compare_with_zip() {
1✔
244
        let st1 = futures::stream::iter(vec![1, 2, 3]);
1✔
245
        let st2 = futures::stream::iter(vec![1, 2, 3, 4]);
1✔
246

1✔
247
        // init
1✔
248
        let st_array_zip = StreamArrayZip::new([st1.clone().fuse(), st2.clone().fuse()]);
1✔
249
        let st_vector_zip = StreamVectorZip::new(vec![st1.clone().fuse(), st2.clone().fuse()]);
1✔
250
        let st_zip = st1.zip(st2);
1✔
251

1✔
252
        // size hints
1✔
253
        assert_eq!(st_array_zip.size_hint(), st_zip.size_hint());
1✔
254
        assert_eq!(st_vector_zip.size_hint(), st_zip.size_hint());
1✔
255

256
        // output
257
        let o1: Vec<[i32; 2]> = st_array_zip.collect().await;
1✔
258
        let o2: Vec<[i32; 2]> = st_zip.map(|(v1, v2)| [v1, v2]).collect().await;
3✔
259
        let o3: Vec<[i32; 2]> = st_vector_zip.map(|vs| [vs[0], vs[1]]).collect().await;
3✔
260
        assert_eq!(o1, o2);
1✔
261
        assert_eq!(o1, o3);
1✔
262
    }
263
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc