• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 10182401222

31 Jul 2024 02:42PM CUT coverage: 91.14% (+0.07%) from 91.068%
10182401222

Pull #970

github

web-flow
Merge f0fcf6203 into c97f87c56
Pull Request #970: FAIR dataset deletion

1798 of 1863 new or added lines in 13 files covered. (96.51%)

16 existing lines in 9 files now uncovered.

132740 of 145644 relevant lines covered (91.14%)

52956.57 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.22
/operators/src/util/stream_zip/vec_zip.rs
1
use std::pin::Pin;
2

3
use futures::stream::{Fuse, FusedStream};
4
use futures::task::{Context, Poll};
5
use futures::{Stream, StreamExt};
6
use pin_project::pin_project;
7

8
/// An adapter for merging the outputs of multiple streams.
9
///
10
/// The merged stream produces items from one to all of the underlying
11
/// streams as they become available. Errors, however, are not merged: you
12
/// get at most one error at a time.
13
#[derive(Debug)]
14
#[pin_project(project = StreamArrayZipProjection)]
36✔
15
pub struct StreamArrayZip<St: Stream, const N: usize> {
16
    #[pin]
17
    streams: [Fuse<St>; N],
18
    values: [Option<St::Item>; N],
19
}
20

21
/// An adapter for merging the outputs of multiple streams.
22
///
23
/// The merged stream produces items from one to all of the underlying
24
/// streams as they become available. Errors, however, are not merged: you
25
/// get at most one error at a time.
26
#[derive(Debug)]
27
#[pin_project(project = StreamVectorZipProjection)]
4✔
28
pub struct StreamVectorZip<St: Stream> {
29
    #[pin]
30
    streams: Vec<Fuse<St>>,
31
    values: Vec<Option<St::Item>>,
32
}
33

34
impl<St: Stream, const N: usize> StreamArrayZip<St, N> {
35
    /// Creates a new stream zip.
36
    ///
37
    /// # Panics
38
    /// Panics if `streams` is empty.
39
    pub fn new(streams: [St; N]) -> Self {
12✔
40
        assert!(!streams.is_empty());
12✔
41

42
        Self {
12✔
43
            streams: streams.map(StreamExt::fuse),
12✔
44
            values: Self::array_of_none(),
12✔
45
        }
12✔
46
    }
12✔
47

48
    /// Since `St::Item` is not copy, we cannot use `[None; N]`
49
    #[inline]
50
    fn array_of_none() -> [Option<St::Item>; N] {
39✔
51
        [(); N].map(|()| None)
80✔
52
    }
39✔
53
}
54

55
impl<St: Stream> StreamVectorZip<St> {
56
    /// Creates a new stream zip.
57
    ///
58
    /// # Panics
59
    /// Panics if `streams` is empty.
60
    pub fn new(streams: Vec<St>) -> Self {
1✔
61
        assert!(!streams.is_empty());
1✔
62

63
        Self {
1✔
64
            values: Self::vec_of_none(streams.len()),
1✔
65
            streams: streams.into_iter().map(StreamExt::fuse).collect(),
1✔
66
        }
1✔
67
    }
1✔
68

69
    /// Since `St::Item` is not copy, we cannot use `vec![None; N]`
70
    #[inline]
71
    fn vec_of_none(len: usize) -> Vec<Option<St::Item>> {
1✔
72
        (0..len).map(|_| None).collect()
2✔
73
    }
1✔
74
}
75

76
impl<St, const N: usize> Stream for StreamArrayZip<St, N>
77
where
78
    St: Stream + Unpin,
79
{
80
    type Item = [St::Item; N];
81

82
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
36✔
83
        let mut this = self.project();
36✔
84

85
        for (stream, value) in this.streams.as_mut().iter_mut().zip(this.values.as_mut()) {
75✔
86
            if value.is_none() {
75✔
87
                match Pin::new(stream).poll_next(cx) {
75✔
88
                    Poll::Ready(Some(item)) => *value = Some(item),
56✔
89
                    Poll::Ready(None) | Poll::Pending => {}
19✔
90
                }
UNCOV
91
            }
×
92
        }
93

94
        if this.values.iter().all(Option::is_some) {
36✔
95
            let values: [Option<St::Item>; N] =
27✔
96
                std::mem::replace(this.values, Self::array_of_none());
27✔
97
            let tuple: [St::Item; N] = values.map(Option::unwrap);
27✔
98

27✔
99
            Poll::Ready(Some(tuple))
27✔
100
        } else if this.streams.iter().any(FusedStream::is_terminated) {
9✔
101
            Poll::Ready(None)
5✔
102
        } else {
103
            Poll::Pending
4✔
104
        }
105
    }
36✔
106

107
    fn size_hint(&self) -> (usize, Option<usize>) {
1✔
108
        let mut streams_lower = usize::MAX;
1✔
109
        let mut streams_upper = None::<usize>;
1✔
110

111
        for (stream, value) in self.streams.iter().zip(&self.values) {
2✔
112
            let value_len = usize::from(value.is_some());
2✔
113

2✔
114
            let (lower, upper) = stream.size_hint();
2✔
115

2✔
116
            streams_lower = streams_lower.min(lower.saturating_add(value_len));
2✔
117

2✔
118
            streams_upper = match (streams_upper, upper) {
2✔
119
                (Some(x), Some(y)) => Some(x.min(y.saturating_add(value_len))),
1✔
120
                (Some(x), None) => Some(x),
×
121
                (None, Some(y)) => y.checked_add(value_len),
1✔
122
                (None, None) => None,
×
123
            };
124
        }
125

126
        (streams_lower, streams_upper)
1✔
127
    }
1✔
128
}
129

130
impl<St> Stream for StreamVectorZip<St>
131
where
132
    St: Stream + Unpin,
133
{
134
    type Item = Vec<St::Item>;
135

136
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
4✔
137
        let mut this = self.project();
4✔
138

139
        for (stream, value) in this.streams.as_mut().iter_mut().zip(this.values.iter_mut()) {
8✔
140
            if value.is_none() {
8✔
141
                match Pin::new(stream).poll_next(cx) {
8✔
142
                    Poll::Ready(Some(item)) => *value = Some(item),
7✔
143
                    Poll::Ready(None) | Poll::Pending => {}
1✔
144
                }
145
            }
×
146
        }
147

148
        if this.values.iter().all(Option::is_some) {
4✔
149
            let tuple: Vec<St::Item> = this
3✔
150
                .values
3✔
151
                .iter_mut()
3✔
152
                .map(|o| o.take().expect("checked in if-condition"))
6✔
153
                .collect();
3✔
154

3✔
155
            Poll::Ready(Some(tuple))
3✔
156
        } else if this.streams.iter().any(FusedStream::is_terminated) {
1✔
157
            Poll::Ready(None)
1✔
158
        } else {
159
            Poll::Pending
×
160
        }
161
    }
4✔
162

163
    fn size_hint(&self) -> (usize, Option<usize>) {
1✔
164
        let mut streams_lower = usize::MAX;
1✔
165
        let mut streams_upper = None::<usize>;
1✔
166

167
        for (stream, value) in self.streams.iter().zip(&self.values) {
2✔
168
            let value_len = usize::from(value.is_some());
2✔
169

2✔
170
            let (lower, upper) = stream.size_hint();
2✔
171

2✔
172
            streams_lower = streams_lower.min(lower.saturating_add(value_len));
2✔
173

2✔
174
            streams_upper = match (streams_upper, upper) {
2✔
175
                (Some(x), Some(y)) => Some(x.min(y.saturating_add(value_len))),
1✔
176
                (Some(x), None) => Some(x),
×
177
                (None, Some(y)) => y.checked_add(value_len),
1✔
178
                (None, None) => None,
×
179
            };
180
        }
181

182
        (streams_lower, streams_upper)
1✔
183
    }
1✔
184
}
185

186
impl<St, const N: usize> FusedStream for StreamArrayZip<St, N>
187
where
188
    St: Stream + Unpin,
189
{
190
    fn is_terminated(&self) -> bool {
×
191
        self.streams.iter().all(FusedStream::is_terminated)
×
192
    }
×
193
}
194

195
impl<St> FusedStream for StreamVectorZip<St>
196
where
197
    St: Stream + Unpin,
198
{
199
    fn is_terminated(&self) -> bool {
×
200
        self.streams.iter().all(FusedStream::is_terminated)
×
201
    }
×
202
}
203

204
#[cfg(test)]
205
mod tests {
206
    use super::*;
207

208
    use async_stream::stream;
209
    use futures::stream::BoxStream;
210
    use futures::StreamExt;
211

212
    #[tokio::test]
213
    async fn concurrent_stream() {
1✔
214
        let st1 = stream! {
1✔
215
            for i in 1..=3 {
1✔
216
                tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1✔
217
                yield i;
1✔
218
            }
1✔
219
        };
1✔
220

1✔
221
        let st2 = stream! {
1✔
222
            for i in 1..=3 {
1✔
223
                tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1✔
224
                yield i * 10;
1✔
225
            }
1✔
226
        };
1✔
227

1✔
228
        let st1: BoxStream<'static, u32> = Box::pin(st1);
1✔
229
        let st2: BoxStream<'static, u32> = Box::pin(st2);
1✔
230

1✔
231
        let st_all = StreamArrayZip::new([st1, st2]);
1✔
232

1✔
233
        let start = std::time::Instant::now();
1✔
234

1✔
235
        let values: Vec<[u32; 2]> = st_all.collect().await;
3✔
236

1✔
237
        assert!(start.elapsed() < std::time::Duration::from_millis(500));
1✔
238

1✔
239
        assert_eq!(values, [[1, 10], [2, 20], [3, 30]]);
1✔
240
    }
1✔
241

242
    #[tokio::test]
243
    async fn compare_with_zip() {
1✔
244
        let st1 = futures::stream::iter(vec![1, 2, 3]);
1✔
245
        let st2 = futures::stream::iter(vec![1, 2, 3, 4]);
1✔
246

1✔
247
        // init
1✔
248
        let st_array_zip = StreamArrayZip::new([st1.clone().fuse(), st2.clone().fuse()]);
1✔
249
        let st_vector_zip = StreamVectorZip::new(vec![st1.clone().fuse(), st2.clone().fuse()]);
1✔
250
        let st_zip = st1.zip(st2);
1✔
251

1✔
252
        // size hints
1✔
253
        assert_eq!(st_array_zip.size_hint(), st_zip.size_hint());
1✔
254
        assert_eq!(st_vector_zip.size_hint(), st_zip.size_hint());
1✔
255

1✔
256
        // output
1✔
257
        let o1: Vec<[i32; 2]> = st_array_zip.collect().await;
1✔
258
        let o2: Vec<[i32; 2]> = st_zip.map(|(v1, v2)| [v1, v2]).collect().await;
3✔
259
        let o3: Vec<[i32; 2]> = st_vector_zip.map(|vs| [vs[0], vs[1]]).collect().await;
3✔
260
        assert_eq!(o1, o2);
1✔
261
        assert_eq!(o1, o3);
1✔
262
    }
1✔
263
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc