• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 7006568925

27 Nov 2023 02:07PM UTC coverage: 89.651% (+0.2%) from 89.498%
7006568925

push

github

web-flow
Merge pull request #888 from geo-engine/raster_stacks

raster stacking

4032 of 4274 new or added lines in 107 files covered. (94.34%)

12 existing lines in 8 files now uncovered.

113020 of 126066 relevant lines covered (89.65%)

59901.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.65
/operators/src/processing/vector_join/equi_data_join.rs
1
use std::collections::HashMap;
2
use std::sync::Arc;
3

4
use float_cmp::approx_eq;
5
use futures::stream::{self, BoxStream};
6
use futures::StreamExt;
7

8
use geoengine_datatypes::collections::{
9
    BuilderProvider, DataCollection, FeatureCollection, FeatureCollectionBuilder,
10
    FeatureCollectionInfos, FeatureCollectionRowBuilder, GeoFeatureCollectionRowBuilder,
11
    GeometryRandomAccess,
12
};
13
use geoengine_datatypes::primitives::{
14
    BoundingBox2D, ColumnSelection, FeatureDataRef, Geometry, TimeInterval, VectorQueryRectangle,
15
};
16
use geoengine_datatypes::util::arrow::ArrowTyped;
17

18
use crate::adapters::FeatureCollectionChunkMerger;
19
use crate::engine::QueryProcessor;
20
use crate::engine::{QueryContext, VectorQueryProcessor};
21
use crate::error::Error;
22
use crate::util::Result;
23
use async_trait::async_trait;
24
use futures::TryStreamExt;
25

26
/// Implements an inner equi-join between a `GeoFeatureCollection` stream and a `DataCollection` stream.
27
pub struct EquiGeoToDataJoinProcessor<G> {
28
    left_processor: Box<dyn VectorQueryProcessor<VectorType = FeatureCollection<G>>>,
29
    right_processor: Box<dyn VectorQueryProcessor<VectorType = DataCollection>>,
30
    left_column: Arc<String>,
31
    right_column: Arc<String>,
32
    right_translation_table: Arc<HashMap<String, String>>,
33
}
34

35
impl<G> EquiGeoToDataJoinProcessor<G>
36
where
37
    G: Geometry + ArrowTyped + Sync + Send + 'static,
38
    for<'g> FeatureCollection<G>: GeometryRandomAccess<'g>,
39
    for<'g> <FeatureCollection<G> as GeometryRandomAccess<'g>>::GeometryType: Into<G>,
40
    FeatureCollectionRowBuilder<G>: GeoFeatureCollectionRowBuilder<G>,
41
{
42
    pub fn new(
5✔
43
        left_processor: Box<dyn VectorQueryProcessor<VectorType = FeatureCollection<G>>>,
5✔
44
        right_processor: Box<dyn VectorQueryProcessor<VectorType = DataCollection>>,
5✔
45
        left_column: String,
5✔
46
        right_column: String,
5✔
47
        right_translation_table: HashMap<String, String>,
5✔
48
    ) -> Self {
5✔
49
        Self {
5✔
50
            left_processor,
5✔
51
            right_processor,
5✔
52
            left_column: Arc::new(left_column),
5✔
53
            right_column: Arc::new(right_column),
5✔
54
            right_translation_table: Arc::new(right_translation_table),
5✔
55
        }
5✔
56
    }
5✔
57

58
    fn join(
5✔
59
        &self,
5✔
60
        left: Arc<FeatureCollection<G>>,
5✔
61
        right: DataCollection,
5✔
62
        chunk_byte_size: usize,
5✔
63
    ) -> Result<BatchBuilderIterator<G>> {
5✔
64
        BatchBuilderIterator::new(
5✔
65
            left,
5✔
66
            right,
5✔
67
            self.left_column.clone(),
5✔
68
            self.right_column.clone(),
5✔
69
            self.right_translation_table.clone(),
5✔
70
            chunk_byte_size,
5✔
71
        )
5✔
72
    }
5✔
73
}
74

75
struct BatchBuilderIterator<G>
76
where
77
    G: Geometry + ArrowTyped + Sync + Send + 'static,
78
{
79
    left: Arc<FeatureCollection<G>>,
80
    right: DataCollection,
81
    left_column: Arc<String>,
82
    right_column: Arc<String>,
83
    right_translation_table: Arc<HashMap<String, String>>,
84
    builder: FeatureCollectionBuilder<G>,
85
    chunk_byte_size: usize,
86
    left_idx: usize,
87
    first_iteration: bool,
88
    has_ended: bool,
89
}
90

91
impl<G> BatchBuilderIterator<G>
92
where
93
    G: Geometry + ArrowTyped + Sync + Send + 'static,
94
    for<'g> FeatureCollection<G>: GeometryRandomAccess<'g>,
95
    for<'g> <FeatureCollection<G> as GeometryRandomAccess<'g>>::GeometryType: Into<G>,
96
    FeatureCollectionRowBuilder<G>: GeoFeatureCollectionRowBuilder<G>,
97
{
98
    pub fn new(
5✔
99
        left: Arc<FeatureCollection<G>>,
5✔
100
        right: DataCollection,
5✔
101
        left_column: Arc<String>,
5✔
102
        right_column: Arc<String>,
5✔
103
        right_translation_table: Arc<HashMap<String, String>>,
5✔
104
        chunk_byte_size: usize,
5✔
105
    ) -> Result<Self> {
5✔
106
        let mut builder = FeatureCollection::<G>::builder();
5✔
107

108
        // create header by combining values from both collections
109
        for (column_name, column_type) in left.column_types() {
5✔
110
            builder.add_column(column_name, column_type)?;
5✔
111
        }
112
        for (column_name, column_type) in right.column_types() {
6✔
113
            builder.add_column(right_translation_table[&column_name].clone(), column_type)?;
6✔
114
        }
115

116
        Ok(Self {
5✔
117
            left,
5✔
118
            right,
5✔
119
            left_column,
5✔
120
            right_column,
5✔
121
            right_translation_table,
5✔
122
            builder,
5✔
123
            chunk_byte_size,
5✔
124
            left_idx: 0,
5✔
125
            first_iteration: true,
5✔
126
            has_ended: false,
5✔
127
        })
5✔
128
    }
5✔
129

130
    fn join_inner_batch_matches(
10✔
131
        left: &FeatureDataRef,
10✔
132
        right: &FeatureDataRef,
10✔
133
        left_time_interval: TimeInterval,
10✔
134
        right_time_intervals: &[TimeInterval],
10✔
135
        left_idx: usize,
10✔
136
    ) -> Result<Vec<(usize, TimeInterval)>> {
10✔
137
        fn matches<T, F>(
10✔
138
            right_values: &[T],
10✔
139
            equals_left_value: F,
10✔
140
            left_time_interval: TimeInterval,
10✔
141
            right_time_intervals: &[TimeInterval],
10✔
142
        ) -> Vec<(usize, TimeInterval)>
10✔
143
        where
10✔
144
            T: PartialEq + Copy,
10✔
145
            F: Fn(T) -> bool,
10✔
146
        {
10✔
147
            right_values
10✔
148
                .iter()
10✔
149
                .enumerate()
10✔
150
                .filter_map(move |(right_idx, &right_value)| {
26✔
151
                    if !equals_left_value(right_value) {
26✔
152
                        return None;
15✔
153
                    }
11✔
154

11✔
155
                    Some(right_idx)
11✔
156
                        .zip(left_time_interval.intersect(&right_time_intervals[right_idx]))
11✔
157
                })
26✔
158
                .collect()
10✔
159
        }
10✔
160

161
        let right_indices = match (left, right) {
10✔
162
            (FeatureDataRef::Float(left), FeatureDataRef::Float(right)) => {
×
163
                let left_value = left.as_ref()[left_idx];
×
164
                matches(
×
165
                    right.as_ref(),
×
166
                    |right_value| approx_eq!(f64, left_value, right_value),
×
167
                    left_time_interval,
×
168
                    right_time_intervals,
×
169
                )
×
170
            }
171
            (FeatureDataRef::Category(left), FeatureDataRef::Category(right)) => {
×
172
                let left_value = left.as_ref()[left_idx];
×
173
                matches(
×
174
                    right.as_ref(),
×
175
                    |right_value| left_value == right_value,
×
176
                    left_time_interval,
×
177
                    right_time_intervals,
×
178
                )
×
179
            }
180
            (FeatureDataRef::Int(left), FeatureDataRef::Int(right)) => {
10✔
181
                let left_value = left.as_ref()[left_idx];
10✔
182
                matches(
10✔
183
                    right.as_ref(),
10✔
184
                    |right_value| left_value == right_value,
26✔
185
                    left_time_interval,
10✔
186
                    right_time_intervals,
10✔
187
                )
10✔
188
            }
189
            (FeatureDataRef::Text(left), FeatureDataRef::Text(right)) => {
×
190
                let left_value = left.as_ref()[left_idx];
×
191
                matches(
×
192
                    right.as_ref(),
×
193
                    |right_value| left_value == right_value,
×
194
                    left_time_interval,
×
195
                    right_time_intervals,
×
196
                )
×
197
            }
198
            (left, right) => {
×
199
                return Err(Error::ColumnTypeMismatch {
×
200
                    left: left.into(),
×
201
                    right: right.into(),
×
202
                });
×
203
            }
204
        };
205

206
        Ok(right_indices)
10✔
207
    }
10✔
208

209
    fn compute_batch(&mut self) -> Result<FeatureCollection<G>> {
5✔
210
        let mut builder = self.builder.clone().finish_header();
5✔
211

5✔
212
        let left_join_column = self.left.data(&self.left_column).expect("should exist");
5✔
213
        let right_join_column = self.right.data(&self.right_column).expect("should exist");
5✔
214
        let left_time_intervals = self.left.time_intervals();
5✔
215
        let right_time_intervals = self.right.time_intervals();
5✔
216

5✔
217
        // copy such that `self` is not borrowed
5✔
218
        let mut left_idx = self.left_idx;
5✔
219

5✔
220
        let left_data_lookup: HashMap<String, FeatureDataRef> = self
5✔
221
            .left
5✔
222
            .column_names()
5✔
223
            .map(|column_name| {
5✔
224
                (
5✔
225
                    column_name.clone(),
5✔
226
                    self.left.data(column_name).expect(
5✔
227
                        "column should exist because it was checked during operator initialization",
5✔
228
                    ),
5✔
229
                )
5✔
230
            })
5✔
231
            .collect();
5✔
232
        let right_data_lookup: HashMap<String, FeatureDataRef> = self
5✔
233
            .right_translation_table
5✔
234
            .iter()
5✔
235
            .map(|(old_column_name, new_column_name)| {
6✔
236
                (
6✔
237
                    new_column_name.clone(),
6✔
238
                    self.right.data(old_column_name).expect(
6✔
239
                        "column should exist because it was checked during operator initialization",
6✔
240
                    ),
6✔
241
                )
6✔
242
            })
6✔
243
            .collect();
5✔
244

245
        while left_idx < self.left.len() {
15✔
246
            let geometry: G = self
10✔
247
                .left
10✔
248
                .geometry_at(left_idx)
10✔
249
                .expect("geometry should exist because `left_idx` < `len`")
10✔
250
                .into();
10✔
251

252
            let join_inner_batch_matches = Self::join_inner_batch_matches(
10✔
253
                &left_join_column,
10✔
254
                &right_join_column,
10✔
255
                left_time_intervals[left_idx],
10✔
256
                right_time_intervals,
10✔
257
                left_idx,
10✔
258
            )?;
10✔
259

260
            // add left value
261
            for (column_name, feature_data) in &left_data_lookup {
20✔
262
                let data = feature_data.get_unchecked(left_idx);
10✔
263

10✔
264
                for _ in 0..join_inner_batch_matches.len() {
10✔
265
                    builder.push_data(column_name, data.clone())?;
10✔
266
                }
267
            }
268

269
            // add right value
270
            for (column_name, feature_data) in &right_data_lookup {
22✔
271
                for &(right_idx, _) in &join_inner_batch_matches {
27✔
272
                    let data = feature_data.get_unchecked(right_idx);
15✔
273

15✔
274
                    builder.push_data(column_name, data)?;
15✔
275
                }
276
            }
277

278
            // add time and geo
279
            for (_, time_interval) in join_inner_batch_matches {
20✔
280
                builder.push_geometry(geometry.clone());
10✔
281
                builder.push_time_interval(time_interval);
10✔
282
                builder.finish_row();
10✔
283
            }
10✔
284

285
            left_idx += 1;
10✔
286

10✔
287
            // there could be the degenerated case that one left feature matches with
10✔
288
            // many features of the right side and is twice as large as the chunk byte size
10✔
289
            if !builder.is_empty() && builder.estimate_memory_size() > self.chunk_byte_size {
10✔
290
                break;
×
291
            }
10✔
292
        }
293

294
        self.left_idx = left_idx;
5✔
295

5✔
296
        // if iterator ran through, set flag `has_ended` so that the next call will not
5✔
297
        // produce an empty collection
5✔
298
        if self.left_idx >= self.left.len() {
5✔
299
            self.has_ended = true;
5✔
300
        }
5✔
301

302
        builder.cache_hint(self.left.cache_hint.merged(&self.right.cache_hint));
5✔
303

5✔
304
        builder.build().map_err(Into::into)
5✔
305
    }
5✔
306
}
307

308
impl<G> Iterator for BatchBuilderIterator<G>
309
where
310
    G: Geometry + ArrowTyped + Sync + Send + 'static,
311
    for<'g> FeatureCollection<G>: GeometryRandomAccess<'g>,
312
    for<'g> <FeatureCollection<G> as GeometryRandomAccess<'g>>::GeometryType: Into<G>,
313
    FeatureCollectionRowBuilder<G>: GeoFeatureCollectionRowBuilder<G>,
314
{
315
    type Item = Result<FeatureCollection<G>>;
316

317
    fn next(&mut self) -> Option<Self::Item> {
10✔
318
        if self.has_ended {
10✔
319
            return None;
5✔
320
        }
5✔
321

5✔
322
        match self.compute_batch() {
5✔
323
            Ok(collection) => {
5✔
324
                if self.first_iteration {
5✔
325
                    self.first_iteration = false;
5✔
326
                    return Some(Ok(collection));
5✔
327
                }
×
328

×
329
                if collection.is_empty() {
×
330
                    self.has_ended = true;
×
331
                    None
×
332
                } else {
333
                    Some(Ok(collection))
×
334
                }
335
            }
336
            Err(error) => {
×
337
                self.first_iteration = false;
×
338
                self.has_ended = true;
×
339

×
340
                Some(Err(error))
×
341
            }
342
        }
343
    }
10✔
344
}
345

346
#[async_trait]
347
impl<G> QueryProcessor for EquiGeoToDataJoinProcessor<G>
348
where
349
    G: Geometry + ArrowTyped + Sync + Send + 'static,
350
    for<'g> FeatureCollection<G>: GeometryRandomAccess<'g>,
351
    for<'g> <FeatureCollection<G> as GeometryRandomAccess<'g>>::GeometryType: Into<G>,
352
    FeatureCollectionRowBuilder<G>: GeoFeatureCollectionRowBuilder<G>,
353
{
354
    type Output = FeatureCollection<G>;
355
    type SpatialBounds = BoundingBox2D;
356
    type Selection = ColumnSelection;
357

358
    async fn _query<'a>(
5✔
359
        &'a self,
5✔
360
        query: VectorQueryRectangle,
5✔
361
        ctx: &'a dyn QueryContext,
5✔
362
    ) -> Result<BoxStream<'a, Result<Self::Output>>> {
5✔
363
        let result_stream = self
5✔
364
            .left_processor
5✔
365
            .query(query.clone(), ctx)
5✔
366
            .await?
×
367
            .and_then(move |left_collection| {
5✔
368
                let query = query.clone();
5✔
369
                async move {
5✔
370
                    // This implementation is a nested-loop join
5✔
371
                    let left_collection = Arc::new(left_collection);
5✔
372

373
                    let data_query = self.right_processor.query(query, ctx).await?;
5✔
374

375
                    let out = data_query
5✔
376
                        .flat_map(move |right_collection| {
5✔
377
                            match right_collection.and_then(|right_collection| {
5✔
378
                                self.join(
5✔
379
                                    left_collection.clone(),
5✔
380
                                    right_collection,
5✔
381
                                    ctx.chunk_byte_size().into(),
5✔
382
                                )
5✔
383
                            }) {
5✔
384
                                Ok(batch_iter) => stream::iter(batch_iter).boxed(),
5✔
NEW
385
                                Err(e) => stream::once(async { Err(e) }).boxed(),
×
386
                            }
387
                        })
5✔
388
                        .boxed();
5✔
389
                    Ok(out)
5✔
390
                }
5✔
391
            })
5✔
392
            .try_flatten();
5✔
393

5✔
394
        Ok(
5✔
395
            FeatureCollectionChunkMerger::new(result_stream.fuse(), ctx.chunk_byte_size().into())
5✔
396
                .boxed(),
5✔
397
        )
5✔
398
    }
10✔
399
}
400

401
#[cfg(test)]
402
mod tests {
403
    use futures::executor::block_on_stream;
404

405
    use geoengine_datatypes::collections::{ChunksEqualIgnoringCacheHint, MultiPointCollection};
406
    use geoengine_datatypes::primitives::CacheHint;
407
    use geoengine_datatypes::primitives::{
408
        BoundingBox2D, FeatureData, MultiPoint, SpatialResolution, TimeInterval,
409
    };
410
    use geoengine_datatypes::util::test::TestDefault;
411

412
    use crate::engine::{
413
        ChunkByteSize, MockExecutionContext, MockQueryContext, VectorOperator, WorkflowOperatorPath,
414
    };
415
    use crate::mock::MockFeatureCollectionSource;
416

417
    use super::*;
418
    use crate::processing::vector_join::util::translation_table;
419

420
    async fn join_mock_collections(
5✔
421
        left: MultiPointCollection,
5✔
422
        right: DataCollection,
5✔
423
        left_join_column: &str,
5✔
424
        right_join_column: &str,
5✔
425
        right_suffix: &str,
5✔
426
    ) -> Vec<MultiPointCollection> {
5✔
427
        let execution_context = MockExecutionContext::test_default();
5✔
428

429
        let left = MockFeatureCollectionSource::single(left)
5✔
430
            .boxed()
5✔
431
            .initialize(WorkflowOperatorPath::initialize_root(), &execution_context)
5✔
432
            .await
×
433
            .unwrap();
5✔
434
        let right = MockFeatureCollectionSource::single(right)
5✔
435
            .boxed()
5✔
436
            .initialize(WorkflowOperatorPath::initialize_root(), &execution_context)
5✔
437
            .await
×
438
            .unwrap();
5✔
439

5✔
440
        let left_processor = left.query_processor().unwrap().multi_point().unwrap();
5✔
441
        let right_processor = right.query_processor().unwrap().data().unwrap();
5✔
442

5✔
443
        let query_rectangle = VectorQueryRectangle {
5✔
444
            spatial_bounds: BoundingBox2D::new(
5✔
445
                (f64::MIN, f64::MIN).into(),
5✔
446
                (f64::MAX, f64::MAX).into(),
5✔
447
            )
5✔
448
            .unwrap(),
5✔
449
            time_interval: TimeInterval::default(),
5✔
450
            spatial_resolution: SpatialResolution::zero_point_one(),
5✔
451
            attributes: ColumnSelection::all(),
5✔
452
        };
5✔
453

5✔
454
        let ctx = MockQueryContext::new(ChunkByteSize::MAX);
5✔
455

5✔
456
        let processor = EquiGeoToDataJoinProcessor::new(
5✔
457
            left_processor,
5✔
458
            right_processor,
5✔
459
            left_join_column.to_string(),
5✔
460
            right_join_column.to_string(),
5✔
461
            translation_table(
5✔
462
                left.result_descriptor().columns.keys(),
5✔
463
                right.result_descriptor().columns.keys(),
5✔
464
                right_suffix,
5✔
465
            ),
5✔
466
        );
5✔
467

5✔
468
        block_on_stream(processor.query(query_rectangle, &ctx).await.unwrap())
5✔
469
            .collect::<Result<_>>()
5✔
470
            .unwrap()
5✔
471
    }
5✔
472

473
    #[tokio::test]
1✔
474
    async fn join() {
1✔
475
        let left = MultiPointCollection::from_data(
1✔
476
            MultiPoint::many(vec![(0.0, 0.1), (1.0, 1.1)]).unwrap(),
1✔
477
            vec![TimeInterval::default(); 2],
1✔
478
            [("foo".to_string(), FeatureData::Int(vec![1, 2]))]
1✔
479
                .iter()
1✔
480
                .cloned()
1✔
481
                .collect(),
1✔
482
            CacheHint::default(),
1✔
483
        )
1✔
484
        .unwrap();
1✔
485

1✔
486
        let right = DataCollection::from_data(
1✔
487
            vec![],
1✔
488
            vec![TimeInterval::default(); 2],
1✔
489
            [("bar".to_string(), FeatureData::Int(vec![2, 2]))]
1✔
490
                .iter()
1✔
491
                .cloned()
1✔
492
                .collect(),
1✔
493
            CacheHint::default(),
1✔
494
        )
1✔
495
        .unwrap();
1✔
496

1✔
497
        let expected_result = MultiPointCollection::from_data(
1✔
498
            MultiPoint::many(vec![(1.0, 1.1), (1.0, 1.1)]).unwrap(),
1✔
499
            vec![TimeInterval::default(); 2],
1✔
500
            [
1✔
501
                ("foo".to_string(), FeatureData::Int(vec![2, 2])),
1✔
502
                ("bar".to_string(), FeatureData::Int(vec![2, 2])),
1✔
503
            ]
1✔
504
            .iter()
1✔
505
            .cloned()
1✔
506
            .collect(),
1✔
507
            CacheHint::default(),
1✔
508
        )
1✔
509
        .unwrap();
1✔
510

511
        let result = join_mock_collections(left, right, "foo", "bar", "").await;
1✔
512

513
        assert_eq!(result.len(), 1);
1✔
514
        assert!(result[0].chunks_equal_ignoring_cache_hint(&expected_result));
1✔
515
    }
516

517
    #[tokio::test]
1✔
518
    async fn time_intervals() {
1✔
519
        let left = MultiPointCollection::from_data(
1✔
520
            MultiPoint::many(vec![(0.0, 0.1), (1.0, 1.1)]).unwrap(),
1✔
521
            vec![
1✔
522
                TimeInterval::new_unchecked(0, 2),
1✔
523
                TimeInterval::new_unchecked(4, 5),
1✔
524
            ],
1✔
525
            [("foo".to_string(), FeatureData::Int(vec![1, 2]))]
1✔
526
                .iter()
1✔
527
                .cloned()
1✔
528
                .collect(),
1✔
529
            CacheHint::default(),
1✔
530
        )
1✔
531
        .unwrap();
1✔
532

1✔
533
        let right = DataCollection::from_data(
1✔
534
            vec![],
1✔
535
            vec![
1✔
536
                TimeInterval::new_unchecked(1, 3),
1✔
537
                TimeInterval::new_unchecked(5, 6),
1✔
538
            ],
1✔
539
            [("bar".to_string(), FeatureData::Int(vec![1, 2]))]
1✔
540
                .iter()
1✔
541
                .cloned()
1✔
542
                .collect(),
1✔
543
            CacheHint::default(),
1✔
544
        )
1✔
545
        .unwrap();
1✔
546

1✔
547
        let expected_result = MultiPointCollection::from_data(
1✔
548
            MultiPoint::many(vec![(0.0, 0.1)]).unwrap(),
1✔
549
            vec![TimeInterval::new_unchecked(1, 2)],
1✔
550
            [
1✔
551
                ("foo".to_string(), FeatureData::Int(vec![1])),
1✔
552
                ("bar".to_string(), FeatureData::Int(vec![1])),
1✔
553
            ]
1✔
554
            .iter()
1✔
555
            .cloned()
1✔
556
            .collect(),
1✔
557
            CacheHint::default(),
1✔
558
        )
1✔
559
        .unwrap();
1✔
560

561
        let result = join_mock_collections(left, right, "foo", "bar", "").await;
1✔
562

563
        assert_eq!(result.len(), 1);
1✔
564
        assert!(result[0].chunks_equal_ignoring_cache_hint(&expected_result));
1✔
565
    }
566

567
    #[tokio::test]
1✔
568
    async fn name_collision() {
1✔
569
        let left = MultiPointCollection::from_data(
1✔
570
            MultiPoint::many(vec![(0.0, 0.1), (1.0, 1.1)]).unwrap(),
1✔
571
            vec![TimeInterval::default(); 2],
1✔
572
            [("foo".to_string(), FeatureData::Int(vec![1, 2]))]
1✔
573
                .iter()
1✔
574
                .cloned()
1✔
575
                .collect(),
1✔
576
            CacheHint::default(),
1✔
577
        )
1✔
578
        .unwrap();
1✔
579

1✔
580
        let right = DataCollection::from_data(
1✔
581
            vec![],
1✔
582
            vec![TimeInterval::default(); 2],
1✔
583
            [("foo".to_string(), FeatureData::Int(vec![1, 2]))]
1✔
584
                .iter()
1✔
585
                .cloned()
1✔
586
                .collect(),
1✔
587
            CacheHint::default(),
1✔
588
        )
1✔
589
        .unwrap();
1✔
590

1✔
591
        let expected_result = MultiPointCollection::from_data(
1✔
592
            MultiPoint::many(vec![(0.0, 0.1), (1.0, 1.1)]).unwrap(),
1✔
593
            vec![TimeInterval::default(); 2],
1✔
594
            [
1✔
595
                ("foo".to_string(), FeatureData::Int(vec![1, 2])),
1✔
596
                ("foo2".to_string(), FeatureData::Int(vec![1, 2])),
1✔
597
            ]
1✔
598
            .iter()
1✔
599
            .cloned()
1✔
600
            .collect(),
1✔
601
            CacheHint::default(),
1✔
602
        )
1✔
603
        .unwrap();
1✔
604

605
        let result = join_mock_collections(left, right, "foo", "foo", "2").await;
1✔
606

607
        assert_eq!(result.len(), 1);
1✔
608
        assert!(result[0].chunks_equal_ignoring_cache_hint(&expected_result));
1✔
609
    }
610

611
    #[tokio::test]
1✔
612
    async fn multi_match_geo() {
1✔
613
        let left = MultiPointCollection::from_data(
1✔
614
            MultiPoint::many(vec![(0.0, 0.1), (1.0, 1.1)]).unwrap(),
1✔
615
            vec![TimeInterval::default(); 2],
1✔
616
            [("foo".to_string(), FeatureData::Int(vec![1, 2]))]
1✔
617
                .iter()
1✔
618
                .cloned()
1✔
619
                .collect(),
1✔
620
            CacheHint::default(),
1✔
621
        )
1✔
622
        .unwrap();
1✔
623

1✔
624
        let right = DataCollection::from_data(
1✔
625
            vec![],
1✔
626
            vec![TimeInterval::default(); 5],
1✔
627
            [
1✔
628
                ("bar".to_string(), FeatureData::Int(vec![1, 1, 1, 2, 2])),
1✔
629
                (
1✔
630
                    "baz".to_string(),
1✔
631
                    FeatureData::Text(vec![
1✔
632
                        "this".to_string(),
1✔
633
                        "is".to_string(),
1✔
634
                        "the".to_string(),
1✔
635
                        "way".to_string(),
1✔
636
                        "!".to_string(),
1✔
637
                    ]),
1✔
638
                ),
1✔
639
            ]
1✔
640
            .iter()
1✔
641
            .cloned()
1✔
642
            .collect(),
1✔
643
            CacheHint::default(),
1✔
644
        )
1✔
645
        .unwrap();
1✔
646

1✔
647
        let expected_result = MultiPointCollection::from_data(
1✔
648
            MultiPoint::many(vec![
1✔
649
                (0.0, 0.1),
1✔
650
                (0.0, 0.1),
1✔
651
                (0.0, 0.1),
1✔
652
                (1.0, 1.1),
1✔
653
                (1.0, 1.1),
1✔
654
            ])
1✔
655
            .unwrap(),
1✔
656
            vec![TimeInterval::default(); 5],
1✔
657
            [
1✔
658
                ("foo".to_string(), FeatureData::Int(vec![1, 1, 1, 2, 2])),
1✔
659
                ("bar".to_string(), FeatureData::Int(vec![1, 1, 1, 2, 2])),
1✔
660
                (
1✔
661
                    "baz".to_string(),
1✔
662
                    FeatureData::Text(vec![
1✔
663
                        "this".to_string(),
1✔
664
                        "is".to_string(),
1✔
665
                        "the".to_string(),
1✔
666
                        "way".to_string(),
1✔
667
                        "!".to_string(),
1✔
668
                    ]),
1✔
669
                ),
1✔
670
            ]
1✔
671
            .iter()
1✔
672
            .cloned()
1✔
673
            .collect(),
1✔
674
            CacheHint::default(),
1✔
675
        )
1✔
676
        .unwrap();
1✔
677

678
        let result = join_mock_collections(left, right, "foo", "bar", "").await;
1✔
679

680
        assert_eq!(result.len(), 1);
1✔
681
        assert!(result[0].chunks_equal_ignoring_cache_hint(&expected_result));
1✔
682
    }
683

684
    #[tokio::test]
1✔
685
    async fn no_matches() {
1✔
686
        let left = MultiPointCollection::from_data(
1✔
687
            MultiPoint::many(vec![(0.0, 0.1), (1.0, 1.1)]).unwrap(),
1✔
688
            vec![TimeInterval::default(); 2],
1✔
689
            [("foo".to_string(), FeatureData::Int(vec![1, 2]))]
1✔
690
                .iter()
1✔
691
                .cloned()
1✔
692
                .collect(),
1✔
693
            CacheHint::default(),
1✔
694
        )
1✔
695
        .unwrap();
1✔
696

1✔
697
        let right = DataCollection::from_data(
1✔
698
            vec![],
1✔
699
            vec![TimeInterval::default(); 2],
1✔
700
            [("bar".to_string(), FeatureData::Int(vec![3, 4]))]
1✔
701
                .iter()
1✔
702
                .cloned()
1✔
703
                .collect(),
1✔
704
            CacheHint::default(),
1✔
705
        )
1✔
706
        .unwrap();
1✔
707

708
        let result = join_mock_collections(left, right, "foo", "bar", "").await;
1✔
709

710
        assert_eq!(result.len(), 1);
1✔
711
        assert!(result[0].is_empty());
1✔
712
    }
713
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc