• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 3929938005

pending completion
3929938005

push

github

GitHub
Merge #713

84930 of 96741 relevant lines covered (87.79%)

79640.1 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.31
/operators/src/processing/vector_join/equi_data_join.rs
1
use std::collections::HashMap;
2
use std::sync::Arc;
3

4
use float_cmp::approx_eq;
5
use futures::stream::{self, BoxStream};
6
use futures::StreamExt;
7

8
use geoengine_datatypes::collections::{
9
    BuilderProvider, DataCollection, FeatureCollection, FeatureCollectionBuilder,
10
    FeatureCollectionInfos, FeatureCollectionRowBuilder, GeoFeatureCollectionRowBuilder,
11
    GeometryRandomAccess,
12
};
13
use geoengine_datatypes::primitives::{
14
    BoundingBox2D, FeatureDataRef, Geometry, TimeInterval, VectorQueryRectangle,
15
};
16
use geoengine_datatypes::util::arrow::ArrowTyped;
17

18
use crate::adapters::FeatureCollectionChunkMerger;
19
use crate::engine::QueryProcessor;
20
use crate::engine::{QueryContext, VectorQueryProcessor};
21
use crate::error::Error;
22
use crate::util::Result;
23
use async_trait::async_trait;
24
use futures::TryStreamExt;
25

26
/// Implements an inner equi-join between a `GeoFeatureCollection` stream and a `DataCollection` stream.
27
pub struct EquiGeoToDataJoinProcessor<G> {
28
    left_processor: Box<dyn VectorQueryProcessor<VectorType = FeatureCollection<G>>>,
29
    right_processor: Box<dyn VectorQueryProcessor<VectorType = DataCollection>>,
30
    left_column: Arc<String>,
31
    right_column: Arc<String>,
32
    right_translation_table: Arc<HashMap<String, String>>,
33
}
34

35
impl<G> EquiGeoToDataJoinProcessor<G>
36
where
37
    G: Geometry + ArrowTyped + Sync + Send + 'static,
38
    for<'g> FeatureCollection<G>: GeometryRandomAccess<'g>,
39
    for<'g> <FeatureCollection<G> as GeometryRandomAccess<'g>>::GeometryType: Into<G>,
40
    FeatureCollectionRowBuilder<G>: GeoFeatureCollectionRowBuilder<G>,
41
{
42
    pub fn new(
5✔
43
        left_processor: Box<dyn VectorQueryProcessor<VectorType = FeatureCollection<G>>>,
5✔
44
        right_processor: Box<dyn VectorQueryProcessor<VectorType = DataCollection>>,
5✔
45
        left_column: String,
5✔
46
        right_column: String,
5✔
47
        right_translation_table: HashMap<String, String>,
5✔
48
    ) -> Self {
5✔
49
        Self {
5✔
50
            left_processor,
5✔
51
            right_processor,
5✔
52
            left_column: Arc::new(left_column),
5✔
53
            right_column: Arc::new(right_column),
5✔
54
            right_translation_table: Arc::new(right_translation_table),
5✔
55
        }
5✔
56
    }
5✔
57

58
    fn join(
5✔
59
        &self,
5✔
60
        left: Arc<FeatureCollection<G>>,
5✔
61
        right: DataCollection,
5✔
62
        chunk_byte_size: usize,
5✔
63
    ) -> Result<BatchBuilderIterator<G>> {
5✔
64
        BatchBuilderIterator::new(
5✔
65
            left,
5✔
66
            right,
5✔
67
            self.left_column.clone(),
5✔
68
            self.right_column.clone(),
5✔
69
            self.right_translation_table.clone(),
5✔
70
            chunk_byte_size,
5✔
71
        )
5✔
72
    }
5✔
73
}
74

75
struct BatchBuilderIterator<G>
76
where
77
    G: Geometry + ArrowTyped + Sync + Send + 'static,
78
{
79
    left: Arc<FeatureCollection<G>>,
80
    right: DataCollection,
81
    left_column: Arc<String>,
82
    right_column: Arc<String>,
83
    right_translation_table: Arc<HashMap<String, String>>,
84
    builder: FeatureCollectionBuilder<G>,
85
    chunk_byte_size: usize,
86
    left_idx: usize,
87
    first_iteration: bool,
88
    has_ended: bool,
89
}
90

91
impl<G> BatchBuilderIterator<G>
92
where
93
    G: Geometry + ArrowTyped + Sync + Send + 'static,
94
    for<'g> FeatureCollection<G>: GeometryRandomAccess<'g>,
95
    for<'g> <FeatureCollection<G> as GeometryRandomAccess<'g>>::GeometryType: Into<G>,
96
    FeatureCollectionRowBuilder<G>: GeoFeatureCollectionRowBuilder<G>,
97
{
98
    pub fn new(
5✔
99
        left: Arc<FeatureCollection<G>>,
5✔
100
        right: DataCollection,
5✔
101
        left_column: Arc<String>,
5✔
102
        right_column: Arc<String>,
5✔
103
        right_translation_table: Arc<HashMap<String, String>>,
5✔
104
        chunk_byte_size: usize,
5✔
105
    ) -> Result<Self> {
5✔
106
        let mut builder = FeatureCollection::<G>::builder();
5✔
107

108
        // create header by combining values from both collections
109
        for (column_name, column_type) in left.column_types() {
5✔
110
            builder.add_column(column_name, column_type)?;
5✔
111
        }
112
        for (column_name, column_type) in right.column_types() {
6✔
113
            builder.add_column(right_translation_table[&column_name].clone(), column_type)?;
6✔
114
        }
115

116
        Ok(Self {
5✔
117
            left,
5✔
118
            right,
5✔
119
            left_column,
5✔
120
            right_column,
5✔
121
            right_translation_table,
5✔
122
            builder,
5✔
123
            chunk_byte_size,
5✔
124
            left_idx: 0,
5✔
125
            first_iteration: true,
5✔
126
            has_ended: false,
5✔
127
        })
5✔
128
    }
5✔
129

130
    fn join_inner_batch_matches(
10✔
131
        left: &FeatureDataRef,
10✔
132
        right: &FeatureDataRef,
10✔
133
        left_time_interval: TimeInterval,
10✔
134
        right_time_intervals: &[TimeInterval],
10✔
135
        left_idx: usize,
10✔
136
    ) -> Result<Vec<(usize, TimeInterval)>> {
10✔
137
        fn matches<T, F>(
10✔
138
            right_values: &[T],
10✔
139
            equals_left_value: F,
10✔
140
            left_time_interval: TimeInterval,
10✔
141
            right_time_intervals: &[TimeInterval],
10✔
142
        ) -> Vec<(usize, TimeInterval)>
10✔
143
        where
10✔
144
            T: PartialEq + Copy,
10✔
145
            F: Fn(T) -> bool,
10✔
146
        {
10✔
147
            right_values
10✔
148
                .iter()
10✔
149
                .enumerate()
10✔
150
                .filter_map(move |(right_idx, &right_value)| {
26✔
151
                    if !equals_left_value(right_value) {
26✔
152
                        return None;
15✔
153
                    }
11✔
154

11✔
155
                    Some(right_idx)
11✔
156
                        .zip(left_time_interval.intersect(&right_time_intervals[right_idx]))
11✔
157
                })
26✔
158
                .collect()
10✔
159
        }
10✔
160

161
        let right_indices = match (left, right) {
10✔
162
            (FeatureDataRef::Float(left), FeatureDataRef::Float(right)) => {
×
163
                let left_value = left.as_ref()[left_idx];
×
164
                matches(
×
165
                    right.as_ref(),
×
166
                    |right_value| approx_eq!(f64, left_value, right_value),
×
167
                    left_time_interval,
×
168
                    right_time_intervals,
×
169
                )
×
170
            }
171
            (FeatureDataRef::Category(left), FeatureDataRef::Category(right)) => {
×
172
                let left_value = left.as_ref()[left_idx];
×
173
                matches(
×
174
                    right.as_ref(),
×
175
                    |right_value| left_value == right_value,
×
176
                    left_time_interval,
×
177
                    right_time_intervals,
×
178
                )
×
179
            }
180
            (FeatureDataRef::Int(left), FeatureDataRef::Int(right)) => {
10✔
181
                let left_value = left.as_ref()[left_idx];
10✔
182
                matches(
10✔
183
                    right.as_ref(),
10✔
184
                    |right_value| left_value == right_value,
26✔
185
                    left_time_interval,
10✔
186
                    right_time_intervals,
10✔
187
                )
10✔
188
            }
189
            (FeatureDataRef::Text(left), FeatureDataRef::Text(right)) => {
×
190
                let left_value = left.as_ref()[left_idx];
×
191
                matches(
×
192
                    right.as_ref(),
×
193
                    |right_value| left_value == right_value,
×
194
                    left_time_interval,
×
195
                    right_time_intervals,
×
196
                )
×
197
            }
198
            (left, right) => {
×
199
                return Err(Error::ColumnTypeMismatch {
×
200
                    left: left.into(),
×
201
                    right: right.into(),
×
202
                });
×
203
            }
204
        };
205

206
        Ok(right_indices)
10✔
207
    }
10✔
208

209
    fn compute_batch(&mut self) -> Result<FeatureCollection<G>> {
5✔
210
        let mut builder = self.builder.clone().finish_header();
5✔
211

5✔
212
        let left_join_column = self.left.data(&self.left_column).expect("should exist");
5✔
213
        let right_join_column = self.right.data(&self.right_column).expect("should exist");
5✔
214
        let left_time_intervals = self.left.time_intervals();
5✔
215
        let right_time_intervals = self.right.time_intervals();
5✔
216

5✔
217
        // copy such that `self` is not borrowed
5✔
218
        let mut left_idx = self.left_idx;
5✔
219

5✔
220
        let left_data_lookup: HashMap<String, FeatureDataRef> = self
5✔
221
            .left
5✔
222
            .column_names()
5✔
223
            .map(|column_name| {
5✔
224
                (
5✔
225
                    column_name.clone(),
5✔
226
                    self.left.data(column_name).expect(
5✔
227
                        "column should exist because it was checked during operator initialization",
5✔
228
                    ),
5✔
229
                )
5✔
230
            })
5✔
231
            .collect();
5✔
232
        let right_data_lookup: HashMap<String, FeatureDataRef> = self
5✔
233
            .right_translation_table
5✔
234
            .iter()
5✔
235
            .map(|(old_column_name, new_column_name)| {
6✔
236
                (
6✔
237
                    new_column_name.clone(),
6✔
238
                    self.right.data(old_column_name).expect(
6✔
239
                        "column should exist because it was checked during operator initialization",
6✔
240
                    ),
6✔
241
                )
6✔
242
            })
6✔
243
            .collect();
5✔
244

245
        while left_idx < self.left.len() {
15✔
246
            let geometry: G = self
10✔
247
                .left
10✔
248
                .geometry_at(left_idx)
10✔
249
                .expect("geometry should exist because `left_idx` < `len`")
10✔
250
                .into();
10✔
251

252
            let join_inner_batch_matches = Self::join_inner_batch_matches(
10✔
253
                &left_join_column,
10✔
254
                &right_join_column,
10✔
255
                left_time_intervals[left_idx],
10✔
256
                right_time_intervals,
10✔
257
                left_idx,
10✔
258
            )?;
10✔
259

260
            // add left value
261
            for (column_name, feature_data) in &left_data_lookup {
20✔
262
                let data = feature_data.get_unchecked(left_idx);
10✔
263

10✔
264
                for _ in 0..join_inner_batch_matches.len() {
10✔
265
                    builder.push_data(column_name, data.clone())?;
10✔
266
                }
267
            }
268

269
            // add right value
270
            for (column_name, feature_data) in &right_data_lookup {
22✔
271
                for &(right_idx, _) in &join_inner_batch_matches {
27✔
272
                    let data = feature_data.get_unchecked(right_idx);
15✔
273

15✔
274
                    builder.push_data(column_name, data)?;
15✔
275
                }
276
            }
277

278
            // add time and geo
279
            for (_, time_interval) in join_inner_batch_matches {
20✔
280
                builder.push_geometry(geometry.clone());
10✔
281
                builder.push_time_interval(time_interval);
10✔
282
                builder.finish_row();
10✔
283
            }
10✔
284

285
            left_idx += 1;
10✔
286

10✔
287
            // there could be the degenerated case that one left feature matches with
10✔
288
            // many features of the right side and is twice as large as the chunk byte size
10✔
289
            if !builder.is_empty() && builder.byte_size() > self.chunk_byte_size {
10✔
290
                break;
×
291
            }
10✔
292
        }
293

294
        self.left_idx = left_idx;
5✔
295

5✔
296
        // if iterator ran through, set flag `has_ended` so that the next call will not
5✔
297
        // produce an empty collection
5✔
298
        if self.left_idx >= self.left.len() {
5✔
299
            self.has_ended = true;
5✔
300
        }
5✔
301

302
        builder.build().map_err(Into::into)
5✔
303
    }
5✔
304
}
305

306
impl<G> Iterator for BatchBuilderIterator<G>
307
where
308
    G: Geometry + ArrowTyped + Sync + Send + 'static,
309
    for<'g> FeatureCollection<G>: GeometryRandomAccess<'g>,
310
    for<'g> <FeatureCollection<G> as GeometryRandomAccess<'g>>::GeometryType: Into<G>,
311
    FeatureCollectionRowBuilder<G>: GeoFeatureCollectionRowBuilder<G>,
312
{
313
    type Item = Result<FeatureCollection<G>>;
314

315
    fn next(&mut self) -> Option<Self::Item> {
10✔
316
        if self.has_ended {
10✔
317
            return None;
5✔
318
        }
5✔
319

5✔
320
        match self.compute_batch() {
5✔
321
            Ok(collection) => {
5✔
322
                if self.first_iteration {
5✔
323
                    self.first_iteration = false;
5✔
324
                    return Some(Ok(collection));
5✔
325
                }
×
326

×
327
                if collection.is_empty() {
×
328
                    self.has_ended = true;
×
329
                    None
×
330
                } else {
331
                    Some(Ok(collection))
×
332
                }
333
            }
334
            Err(error) => {
×
335
                self.first_iteration = false;
×
336
                self.has_ended = true;
×
337

×
338
                Some(Err(error))
×
339
            }
340
        }
341
    }
10✔
342
}
343

344
#[async_trait]
345
impl<G> QueryProcessor for EquiGeoToDataJoinProcessor<G>
346
where
347
    G: Geometry + ArrowTyped + Sync + Send + 'static,
348
    for<'g> FeatureCollection<G>: GeometryRandomAccess<'g>,
349
    for<'g> <FeatureCollection<G> as GeometryRandomAccess<'g>>::GeometryType: Into<G>,
350
    FeatureCollectionRowBuilder<G>: GeoFeatureCollectionRowBuilder<G>,
351
{
352
    type Output = FeatureCollection<G>;
353
    type SpatialBounds = BoundingBox2D;
354

355
    async fn _query<'a>(
5✔
356
        &'a self,
5✔
357
        query: VectorQueryRectangle,
5✔
358
        ctx: &'a dyn QueryContext,
5✔
359
    ) -> Result<BoxStream<'a, Result<Self::Output>>> {
5✔
360
        let result_stream = self
5✔
361
            .left_processor
5✔
362
            .query(query, ctx)
5✔
363
            .await?
×
364
            .and_then(move |left_collection| async move {
5✔
365
                // This implementation is a nested-loop join
5✔
366
                let left_collection = Arc::new(left_collection);
5✔
367

368
                let data_query = self.right_processor.query(query, ctx).await?;
5✔
369

370
                let out = data_query
5✔
371
                    .flat_map(move |right_collection| {
5✔
372
                        match right_collection.and_then(|right_collection| {
5✔
373
                            self.join(
5✔
374
                                left_collection.clone(),
5✔
375
                                right_collection,
5✔
376
                                ctx.chunk_byte_size().into(),
5✔
377
                            )
5✔
378
                        }) {
5✔
379
                            Ok(batch_iter) => stream::iter(batch_iter).boxed(),
5✔
380
                            Err(e) => stream::once(async { Err(e) }).boxed(),
×
381
                        }
382
                    })
5✔
383
                    .boxed();
5✔
384
                Ok(out)
5✔
385
            })
5✔
386
            .try_flatten();
5✔
387

5✔
388
        Ok(
5✔
389
            FeatureCollectionChunkMerger::new(result_stream.fuse(), ctx.chunk_byte_size().into())
5✔
390
                .boxed(),
5✔
391
        )
5✔
392
    }
10✔
393
}
394

395
#[cfg(test)]
396
mod tests {
397
    use futures::executor::block_on_stream;
398

399
    use geoengine_datatypes::collections::MultiPointCollection;
400
    use geoengine_datatypes::primitives::{
401
        BoundingBox2D, FeatureData, MultiPoint, SpatialResolution, TimeInterval,
402
    };
403
    use geoengine_datatypes::util::test::TestDefault;
404

405
    use crate::engine::{ChunkByteSize, MockExecutionContext, MockQueryContext, VectorOperator};
406
    use crate::mock::MockFeatureCollectionSource;
407

408
    use super::*;
409
    use crate::processing::vector_join::util::translation_table;
410

411
    async fn join_mock_collections(
5✔
412
        left: MultiPointCollection,
5✔
413
        right: DataCollection,
5✔
414
        left_join_column: &str,
5✔
415
        right_join_column: &str,
5✔
416
        right_suffix: &str,
5✔
417
    ) -> Vec<MultiPointCollection> {
5✔
418
        let execution_context = MockExecutionContext::test_default();
5✔
419

420
        let left = MockFeatureCollectionSource::single(left)
5✔
421
            .boxed()
5✔
422
            .initialize(&execution_context)
5✔
423
            .await
×
424
            .unwrap();
5✔
425
        let right = MockFeatureCollectionSource::single(right)
5✔
426
            .boxed()
5✔
427
            .initialize(&execution_context)
5✔
428
            .await
×
429
            .unwrap();
5✔
430

5✔
431
        let left_processor = left.query_processor().unwrap().multi_point().unwrap();
5✔
432
        let right_processor = right.query_processor().unwrap().data().unwrap();
5✔
433

5✔
434
        let query_rectangle = VectorQueryRectangle {
5✔
435
            spatial_bounds: BoundingBox2D::new(
5✔
436
                (f64::MIN, f64::MIN).into(),
5✔
437
                (f64::MAX, f64::MAX).into(),
5✔
438
            )
5✔
439
            .unwrap(),
5✔
440
            time_interval: TimeInterval::default(),
5✔
441
            spatial_resolution: SpatialResolution::zero_point_one(),
5✔
442
        };
5✔
443

5✔
444
        let ctx = MockQueryContext::new(ChunkByteSize::MAX);
5✔
445

5✔
446
        let processor = EquiGeoToDataJoinProcessor::new(
5✔
447
            left_processor,
5✔
448
            right_processor,
5✔
449
            left_join_column.to_string(),
5✔
450
            right_join_column.to_string(),
5✔
451
            translation_table(
5✔
452
                left.result_descriptor().columns.keys(),
5✔
453
                right.result_descriptor().columns.keys(),
5✔
454
                right_suffix,
5✔
455
            ),
5✔
456
        );
5✔
457

5✔
458
        block_on_stream(processor.query(query_rectangle, &ctx).await.unwrap())
5✔
459
            .collect::<Result<_>>()
5✔
460
            .unwrap()
5✔
461
    }
5✔
462

463
    #[tokio::test]
1✔
464
    async fn join() {
1✔
465
        let left = MultiPointCollection::from_data(
1✔
466
            MultiPoint::many(vec![(0.0, 0.1), (1.0, 1.1)]).unwrap(),
1✔
467
            vec![TimeInterval::default(); 2],
1✔
468
            [("foo".to_string(), FeatureData::Int(vec![1, 2]))]
1✔
469
                .iter()
1✔
470
                .cloned()
1✔
471
                .collect(),
1✔
472
        )
1✔
473
        .unwrap();
1✔
474

1✔
475
        let right = DataCollection::from_data(
1✔
476
            vec![],
1✔
477
            vec![TimeInterval::default(); 2],
1✔
478
            [("bar".to_string(), FeatureData::Int(vec![2, 2]))]
1✔
479
                .iter()
1✔
480
                .cloned()
1✔
481
                .collect(),
1✔
482
        )
1✔
483
        .unwrap();
1✔
484

1✔
485
        let expected_result = MultiPointCollection::from_data(
1✔
486
            MultiPoint::many(vec![(1.0, 1.1), (1.0, 1.1)]).unwrap(),
1✔
487
            vec![TimeInterval::default(); 2],
1✔
488
            [
1✔
489
                ("foo".to_string(), FeatureData::Int(vec![2, 2])),
1✔
490
                ("bar".to_string(), FeatureData::Int(vec![2, 2])),
1✔
491
            ]
1✔
492
            .iter()
1✔
493
            .cloned()
1✔
494
            .collect(),
1✔
495
        )
1✔
496
        .unwrap();
1✔
497

498
        let result = join_mock_collections(left, right, "foo", "bar", "").await;
1✔
499

500
        assert_eq!(result.len(), 1);
1✔
501
        assert_eq!(result[0], expected_result);
1✔
502
    }
503

504
    #[tokio::test]
1✔
505
    async fn time_intervals() {
1✔
506
        let left = MultiPointCollection::from_data(
1✔
507
            MultiPoint::many(vec![(0.0, 0.1), (1.0, 1.1)]).unwrap(),
1✔
508
            vec![
1✔
509
                TimeInterval::new_unchecked(0, 2),
1✔
510
                TimeInterval::new_unchecked(4, 5),
1✔
511
            ],
1✔
512
            [("foo".to_string(), FeatureData::Int(vec![1, 2]))]
1✔
513
                .iter()
1✔
514
                .cloned()
1✔
515
                .collect(),
1✔
516
        )
1✔
517
        .unwrap();
1✔
518

1✔
519
        let right = DataCollection::from_data(
1✔
520
            vec![],
1✔
521
            vec![
1✔
522
                TimeInterval::new_unchecked(1, 3),
1✔
523
                TimeInterval::new_unchecked(5, 6),
1✔
524
            ],
1✔
525
            [("bar".to_string(), FeatureData::Int(vec![1, 2]))]
1✔
526
                .iter()
1✔
527
                .cloned()
1✔
528
                .collect(),
1✔
529
        )
1✔
530
        .unwrap();
1✔
531

1✔
532
        let expected_result = MultiPointCollection::from_data(
1✔
533
            MultiPoint::many(vec![(0.0, 0.1)]).unwrap(),
1✔
534
            vec![TimeInterval::new_unchecked(1, 2)],
1✔
535
            [
1✔
536
                ("foo".to_string(), FeatureData::Int(vec![1])),
1✔
537
                ("bar".to_string(), FeatureData::Int(vec![1])),
1✔
538
            ]
1✔
539
            .iter()
1✔
540
            .cloned()
1✔
541
            .collect(),
1✔
542
        )
1✔
543
        .unwrap();
1✔
544

545
        let result = join_mock_collections(left, right, "foo", "bar", "").await;
1✔
546

547
        assert_eq!(result.len(), 1);
1✔
548
        assert_eq!(result[0], expected_result);
1✔
549
    }
550

551
    #[tokio::test]
1✔
552
    async fn name_collision() {
1✔
553
        let left = MultiPointCollection::from_data(
1✔
554
            MultiPoint::many(vec![(0.0, 0.1), (1.0, 1.1)]).unwrap(),
1✔
555
            vec![TimeInterval::default(); 2],
1✔
556
            [("foo".to_string(), FeatureData::Int(vec![1, 2]))]
1✔
557
                .iter()
1✔
558
                .cloned()
1✔
559
                .collect(),
1✔
560
        )
1✔
561
        .unwrap();
1✔
562

1✔
563
        let right = DataCollection::from_data(
1✔
564
            vec![],
1✔
565
            vec![TimeInterval::default(); 2],
1✔
566
            [("foo".to_string(), FeatureData::Int(vec![1, 2]))]
1✔
567
                .iter()
1✔
568
                .cloned()
1✔
569
                .collect(),
1✔
570
        )
1✔
571
        .unwrap();
1✔
572

1✔
573
        let expected_result = MultiPointCollection::from_data(
1✔
574
            MultiPoint::many(vec![(0.0, 0.1), (1.0, 1.1)]).unwrap(),
1✔
575
            vec![TimeInterval::default(); 2],
1✔
576
            [
1✔
577
                ("foo".to_string(), FeatureData::Int(vec![1, 2])),
1✔
578
                ("foo2".to_string(), FeatureData::Int(vec![1, 2])),
1✔
579
            ]
1✔
580
            .iter()
1✔
581
            .cloned()
1✔
582
            .collect(),
1✔
583
        )
1✔
584
        .unwrap();
1✔
585

586
        let result = join_mock_collections(left, right, "foo", "foo", "2").await;
1✔
587

588
        assert_eq!(result.len(), 1);
1✔
589
        assert_eq!(result[0], expected_result);
1✔
590
    }
591

592
    #[tokio::test]
1✔
593
    async fn multi_match_geo() {
1✔
594
        let left = MultiPointCollection::from_data(
1✔
595
            MultiPoint::many(vec![(0.0, 0.1), (1.0, 1.1)]).unwrap(),
1✔
596
            vec![TimeInterval::default(); 2],
1✔
597
            [("foo".to_string(), FeatureData::Int(vec![1, 2]))]
1✔
598
                .iter()
1✔
599
                .cloned()
1✔
600
                .collect(),
1✔
601
        )
1✔
602
        .unwrap();
1✔
603

1✔
604
        let right = DataCollection::from_data(
1✔
605
            vec![],
1✔
606
            vec![TimeInterval::default(); 5],
1✔
607
            [
1✔
608
                ("bar".to_string(), FeatureData::Int(vec![1, 1, 1, 2, 2])),
1✔
609
                (
1✔
610
                    "baz".to_string(),
1✔
611
                    FeatureData::Text(vec![
1✔
612
                        "this".to_string(),
1✔
613
                        "is".to_string(),
1✔
614
                        "the".to_string(),
1✔
615
                        "way".to_string(),
1✔
616
                        "!".to_string(),
1✔
617
                    ]),
1✔
618
                ),
1✔
619
            ]
1✔
620
            .iter()
1✔
621
            .cloned()
1✔
622
            .collect(),
1✔
623
        )
1✔
624
        .unwrap();
1✔
625

1✔
626
        let expected_result = MultiPointCollection::from_data(
1✔
627
            MultiPoint::many(vec![
1✔
628
                (0.0, 0.1),
1✔
629
                (0.0, 0.1),
1✔
630
                (0.0, 0.1),
1✔
631
                (1.0, 1.1),
1✔
632
                (1.0, 1.1),
1✔
633
            ])
1✔
634
            .unwrap(),
1✔
635
            vec![TimeInterval::default(); 5],
1✔
636
            [
1✔
637
                ("foo".to_string(), FeatureData::Int(vec![1, 1, 1, 2, 2])),
1✔
638
                ("bar".to_string(), FeatureData::Int(vec![1, 1, 1, 2, 2])),
1✔
639
                (
1✔
640
                    "baz".to_string(),
1✔
641
                    FeatureData::Text(vec![
1✔
642
                        "this".to_string(),
1✔
643
                        "is".to_string(),
1✔
644
                        "the".to_string(),
1✔
645
                        "way".to_string(),
1✔
646
                        "!".to_string(),
1✔
647
                    ]),
1✔
648
                ),
1✔
649
            ]
1✔
650
            .iter()
1✔
651
            .cloned()
1✔
652
            .collect(),
1✔
653
        )
1✔
654
        .unwrap();
1✔
655

656
        let result = join_mock_collections(left, right, "foo", "bar", "").await;
1✔
657

658
        assert_eq!(result.len(), 1);
1✔
659
        assert_eq!(result[0], expected_result);
1✔
660
    }
661

662
    #[tokio::test]
1✔
663
    async fn no_matches() {
1✔
664
        let left = MultiPointCollection::from_data(
1✔
665
            MultiPoint::many(vec![(0.0, 0.1), (1.0, 1.1)]).unwrap(),
1✔
666
            vec![TimeInterval::default(); 2],
1✔
667
            [("foo".to_string(), FeatureData::Int(vec![1, 2]))]
1✔
668
                .iter()
1✔
669
                .cloned()
1✔
670
                .collect(),
1✔
671
        )
1✔
672
        .unwrap();
1✔
673

1✔
674
        let right = DataCollection::from_data(
1✔
675
            vec![],
1✔
676
            vec![TimeInterval::default(); 2],
1✔
677
            [("bar".to_string(), FeatureData::Int(vec![3, 4]))]
1✔
678
                .iter()
1✔
679
                .cloned()
1✔
680
                .collect(),
1✔
681
        )
1✔
682
        .unwrap();
1✔
683

684
        let result = join_mock_collections(left, right, "foo", "bar", "").await;
1✔
685

686
        // TODO: do we need an empty collection here? (cf. `FeatureCollectionChunkMerger`)
687
        assert_eq!(result.len(), 0);
1✔
688
    }
689
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc