• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 11911118784

19 Nov 2024 10:06AM UTC coverage: 90.448% (-0.2%) from 90.687%
11911118784

push

github

web-flow
Merge pull request #994 from geo-engine/workspace-dependencies

use workspace dependencies, update toolchain, use global lock in expression

9 of 11 new or added lines in 6 files covered. (81.82%)

369 existing lines in 74 files now uncovered.

132871 of 146904 relevant lines covered (90.45%)

54798.62 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.01
/operators/src/processing/vector_join/equi_data_join.rs
1
use std::collections::HashMap;
2
use std::sync::Arc;
3

4
use float_cmp::approx_eq;
5
use futures::stream::{self, BoxStream};
6
use futures::StreamExt;
7

8
use geoengine_datatypes::collections::{
9
    BuilderProvider, DataCollection, FeatureCollection, FeatureCollectionBuilder,
10
    FeatureCollectionInfos, FeatureCollectionRowBuilder, GeoFeatureCollectionRowBuilder,
11
    GeometryRandomAccess,
12
};
13
use geoengine_datatypes::primitives::{
14
    BoundingBox2D, ColumnSelection, FeatureDataRef, Geometry, TimeInterval, VectorQueryRectangle,
15
};
16
use geoengine_datatypes::util::arrow::ArrowTyped;
17

18
use crate::adapters::FeatureCollectionChunkMerger;
19
use crate::engine::{QueryContext, VectorQueryProcessor};
20
use crate::engine::{QueryProcessor, VectorResultDescriptor};
21
use crate::error::Error;
22
use crate::util::Result;
23
use async_trait::async_trait;
24
use futures::TryStreamExt;
25

26
/// Implements an inner equi-join between a `GeoFeatureCollection` stream and a `DataCollection` stream.
27
pub struct EquiGeoToDataJoinProcessor<G> {
28
    result_descriptor: VectorResultDescriptor,
29
    left_processor: Box<dyn VectorQueryProcessor<VectorType = FeatureCollection<G>>>,
30
    right_processor: Box<dyn VectorQueryProcessor<VectorType = DataCollection>>,
31
    left_column: Arc<String>,
32
    right_column: Arc<String>,
33
    right_translation_table: Arc<HashMap<String, String>>,
34
}
35

36
impl<G> EquiGeoToDataJoinProcessor<G>
37
where
38
    G: Geometry + ArrowTyped + Sync + Send + 'static,
39
    for<'g> FeatureCollection<G>: GeometryRandomAccess<'g>,
40
    for<'g> <FeatureCollection<G> as GeometryRandomAccess<'g>>::GeometryType: Into<G>,
41
    FeatureCollectionRowBuilder<G>: GeoFeatureCollectionRowBuilder<G>,
42
{
43
    pub fn new(
5✔
44
        result_descriptor: VectorResultDescriptor,
5✔
45
        left_processor: Box<dyn VectorQueryProcessor<VectorType = FeatureCollection<G>>>,
5✔
46
        right_processor: Box<dyn VectorQueryProcessor<VectorType = DataCollection>>,
5✔
47
        left_column: String,
5✔
48
        right_column: String,
5✔
49
        right_translation_table: HashMap<String, String>,
5✔
50
    ) -> Self {
5✔
51
        Self {
5✔
52
            result_descriptor,
5✔
53
            left_processor,
5✔
54
            right_processor,
5✔
55
            left_column: Arc::new(left_column),
5✔
56
            right_column: Arc::new(right_column),
5✔
57
            right_translation_table: Arc::new(right_translation_table),
5✔
58
        }
5✔
59
    }
5✔
60

61
    fn join(
5✔
62
        &self,
5✔
63
        left: Arc<FeatureCollection<G>>,
5✔
64
        right: DataCollection,
5✔
65
        chunk_byte_size: usize,
5✔
66
    ) -> Result<BatchBuilderIterator<G>> {
5✔
67
        BatchBuilderIterator::new(
5✔
68
            left,
5✔
69
            right,
5✔
70
            self.left_column.clone(),
5✔
71
            self.right_column.clone(),
5✔
72
            self.right_translation_table.clone(),
5✔
73
            chunk_byte_size,
5✔
74
        )
5✔
75
    }
5✔
76
}
77

78
struct BatchBuilderIterator<G>
79
where
80
    G: Geometry + ArrowTyped + Sync + Send + 'static,
81
{
82
    left: Arc<FeatureCollection<G>>,
83
    right: DataCollection,
84
    left_column: Arc<String>,
85
    right_column: Arc<String>,
86
    right_translation_table: Arc<HashMap<String, String>>,
87
    builder: FeatureCollectionBuilder<G>,
88
    chunk_byte_size: usize,
89
    left_idx: usize,
90
    first_iteration: bool,
91
    has_ended: bool,
92
}
93

94
impl<G> BatchBuilderIterator<G>
95
where
96
    G: Geometry + ArrowTyped + Sync + Send + 'static,
97
    for<'g> FeatureCollection<G>: GeometryRandomAccess<'g>,
98
    for<'g> <FeatureCollection<G> as GeometryRandomAccess<'g>>::GeometryType: Into<G>,
99
    FeatureCollectionRowBuilder<G>: GeoFeatureCollectionRowBuilder<G>,
100
{
101
    pub fn new(
5✔
102
        left: Arc<FeatureCollection<G>>,
5✔
103
        right: DataCollection,
5✔
104
        left_column: Arc<String>,
5✔
105
        right_column: Arc<String>,
5✔
106
        right_translation_table: Arc<HashMap<String, String>>,
5✔
107
        chunk_byte_size: usize,
5✔
108
    ) -> Result<Self> {
5✔
109
        let mut builder = FeatureCollection::<G>::builder();
5✔
110

111
        // create header by combining values from both collections
112
        for (column_name, column_type) in left.column_types() {
5✔
113
            builder.add_column(column_name, column_type)?;
5✔
114
        }
115
        for (column_name, column_type) in right.column_types() {
6✔
116
            builder.add_column(right_translation_table[&column_name].clone(), column_type)?;
6✔
117
        }
118

119
        Ok(Self {
5✔
120
            left,
5✔
121
            right,
5✔
122
            left_column,
5✔
123
            right_column,
5✔
124
            right_translation_table,
5✔
125
            builder,
5✔
126
            chunk_byte_size,
5✔
127
            left_idx: 0,
5✔
128
            first_iteration: true,
5✔
129
            has_ended: false,
5✔
130
        })
5✔
131
    }
5✔
132

133
    fn join_inner_batch_matches(
10✔
134
        left: &FeatureDataRef,
10✔
135
        right: &FeatureDataRef,
10✔
136
        left_time_interval: TimeInterval,
10✔
137
        right_time_intervals: &[TimeInterval],
10✔
138
        left_idx: usize,
10✔
139
    ) -> Result<Vec<(usize, TimeInterval)>> {
10✔
140
        fn matches<T, F>(
10✔
141
            right_values: &[T],
10✔
142
            equals_left_value: F,
10✔
143
            left_time_interval: TimeInterval,
10✔
144
            right_time_intervals: &[TimeInterval],
10✔
145
        ) -> Vec<(usize, TimeInterval)>
10✔
146
        where
10✔
147
            T: PartialEq + Copy,
10✔
148
            F: Fn(T) -> bool,
10✔
149
        {
10✔
150
            right_values
10✔
151
                .iter()
10✔
152
                .enumerate()
10✔
153
                .filter_map(move |(right_idx, &right_value)| {
26✔
154
                    if !equals_left_value(right_value) {
26✔
155
                        return None;
15✔
156
                    }
11✔
157

11✔
158
                    Some(right_idx)
11✔
159
                        .zip(left_time_interval.intersect(&right_time_intervals[right_idx]))
11✔
160
                })
26✔
161
                .collect()
10✔
162
        }
10✔
163

164
        let right_indices = match (left, right) {
10✔
165
            (FeatureDataRef::Float(left), FeatureDataRef::Float(right)) => {
×
166
                let left_value = left.as_ref()[left_idx];
×
167
                matches(
×
168
                    right.as_ref(),
×
169
                    |right_value| approx_eq!(f64, left_value, right_value),
×
170
                    left_time_interval,
×
171
                    right_time_intervals,
×
172
                )
×
173
            }
174
            (FeatureDataRef::Category(left), FeatureDataRef::Category(right)) => {
×
175
                let left_value = left.as_ref()[left_idx];
×
176
                matches(
×
177
                    right.as_ref(),
×
178
                    |right_value| left_value == right_value,
×
179
                    left_time_interval,
×
180
                    right_time_intervals,
×
181
                )
×
182
            }
183
            (FeatureDataRef::Int(left), FeatureDataRef::Int(right)) => {
10✔
184
                let left_value = left.as_ref()[left_idx];
10✔
185
                matches(
10✔
186
                    right.as_ref(),
10✔
187
                    |right_value| left_value == right_value,
26✔
188
                    left_time_interval,
10✔
189
                    right_time_intervals,
10✔
190
                )
10✔
191
            }
192
            (FeatureDataRef::Text(left), FeatureDataRef::Text(right)) => {
×
193
                let left_value = left.as_ref()[left_idx];
×
194
                matches(
×
195
                    right.as_ref(),
×
196
                    |right_value| left_value == right_value,
×
197
                    left_time_interval,
×
198
                    right_time_intervals,
×
199
                )
×
200
            }
201
            (left, right) => {
×
202
                return Err(Error::ColumnTypeMismatch {
×
203
                    left: left.into(),
×
204
                    right: right.into(),
×
205
                });
×
206
            }
207
        };
208

209
        Ok(right_indices)
10✔
210
    }
10✔
211

212
    fn compute_batch(&mut self) -> Result<FeatureCollection<G>> {
5✔
213
        let mut builder = self.builder.clone().finish_header();
5✔
214

5✔
215
        let left_join_column = self.left.data(&self.left_column).expect("should exist");
5✔
216
        let right_join_column = self.right.data(&self.right_column).expect("should exist");
5✔
217
        let left_time_intervals = self.left.time_intervals();
5✔
218
        let right_time_intervals = self.right.time_intervals();
5✔
219

5✔
220
        // copy such that `self` is not borrowed
5✔
221
        let mut left_idx = self.left_idx;
5✔
222

5✔
223
        let left_data_lookup: HashMap<String, FeatureDataRef> = self
5✔
224
            .left
5✔
225
            .column_names()
5✔
226
            .map(|column_name| {
5✔
227
                (
5✔
228
                    column_name.clone(),
5✔
229
                    self.left.data(column_name).expect(
5✔
230
                        "column should exist because it was checked during operator initialization",
5✔
231
                    ),
5✔
232
                )
5✔
233
            })
5✔
234
            .collect();
5✔
235
        let right_data_lookup: HashMap<String, FeatureDataRef> = self
5✔
236
            .right_translation_table
5✔
237
            .iter()
5✔
238
            .map(|(old_column_name, new_column_name)| {
6✔
239
                (
6✔
240
                    new_column_name.clone(),
6✔
241
                    self.right.data(old_column_name).expect(
6✔
242
                        "column should exist because it was checked during operator initialization",
6✔
243
                    ),
6✔
244
                )
6✔
245
            })
6✔
246
            .collect();
5✔
247

248
        while left_idx < self.left.len() {
15✔
249
            let geometry: G = self
10✔
250
                .left
10✔
251
                .geometry_at(left_idx)
10✔
252
                .expect("geometry should exist because `left_idx` < `len`")
10✔
253
                .into();
10✔
254

255
            let join_inner_batch_matches = Self::join_inner_batch_matches(
10✔
256
                &left_join_column,
10✔
257
                &right_join_column,
10✔
258
                left_time_intervals[left_idx],
10✔
259
                right_time_intervals,
10✔
260
                left_idx,
10✔
261
            )?;
10✔
262

263
            // add left value
264
            for (column_name, feature_data) in &left_data_lookup {
20✔
265
                let data = feature_data.get_unchecked(left_idx);
10✔
266

10✔
267
                for _ in 0..join_inner_batch_matches.len() {
10✔
268
                    builder.push_data(column_name, data.clone())?;
10✔
269
                }
270
            }
271

272
            // add right value
273
            for (column_name, feature_data) in &right_data_lookup {
22✔
274
                for &(right_idx, _) in &join_inner_batch_matches {
27✔
275
                    let data = feature_data.get_unchecked(right_idx);
15✔
276

15✔
277
                    builder.push_data(column_name, data)?;
15✔
278
                }
279
            }
280

281
            // add time and geo
282
            for (_, time_interval) in join_inner_batch_matches {
20✔
283
                builder.push_geometry(geometry.clone());
10✔
284
                builder.push_time_interval(time_interval);
10✔
285
                builder.finish_row();
10✔
286
            }
10✔
287

288
            left_idx += 1;
10✔
289

10✔
290
            // there could be the degenerated case that one left feature matches with
10✔
291
            // many features of the right side and is twice as large as the chunk byte size
10✔
292
            if !builder.is_empty() && builder.estimate_memory_size() > self.chunk_byte_size {
10✔
293
                break;
×
294
            }
10✔
295
        }
296

297
        self.left_idx = left_idx;
5✔
298

5✔
299
        // if iterator ran through, set flag `has_ended` so that the next call will not
5✔
300
        // produce an empty collection
5✔
301
        if self.left_idx >= self.left.len() {
5✔
302
            self.has_ended = true;
5✔
303
        }
5✔
304

305
        builder.cache_hint(self.left.cache_hint.merged(&self.right.cache_hint));
5✔
306

5✔
307
        builder.build().map_err(Into::into)
5✔
308
    }
5✔
309
}
310

311
impl<G> Iterator for BatchBuilderIterator<G>
312
where
313
    G: Geometry + ArrowTyped + Sync + Send + 'static,
314
    for<'g> FeatureCollection<G>: GeometryRandomAccess<'g>,
315
    for<'g> <FeatureCollection<G> as GeometryRandomAccess<'g>>::GeometryType: Into<G>,
316
    FeatureCollectionRowBuilder<G>: GeoFeatureCollectionRowBuilder<G>,
317
{
318
    type Item = Result<FeatureCollection<G>>;
319

320
    fn next(&mut self) -> Option<Self::Item> {
10✔
321
        if self.has_ended {
10✔
322
            return None;
5✔
323
        }
5✔
324

5✔
325
        match self.compute_batch() {
5✔
326
            Ok(collection) => {
5✔
327
                if self.first_iteration {
5✔
328
                    self.first_iteration = false;
5✔
329
                    return Some(Ok(collection));
5✔
330
                }
×
331

×
332
                if collection.is_empty() {
×
333
                    self.has_ended = true;
×
334
                    None
×
335
                } else {
336
                    Some(Ok(collection))
×
337
                }
338
            }
339
            Err(error) => {
×
340
                self.first_iteration = false;
×
341
                self.has_ended = true;
×
342

×
343
                Some(Err(error))
×
344
            }
345
        }
346
    }
10✔
347
}
348

349
#[async_trait]
350
impl<G> QueryProcessor for EquiGeoToDataJoinProcessor<G>
351
where
352
    G: Geometry + ArrowTyped + Sync + Send + 'static,
353
    for<'g> FeatureCollection<G>: GeometryRandomAccess<'g>,
354
    for<'g> <FeatureCollection<G> as GeometryRandomAccess<'g>>::GeometryType: Into<G>,
355
    FeatureCollectionRowBuilder<G>: GeoFeatureCollectionRowBuilder<G>,
356
{
357
    type Output = FeatureCollection<G>;
358
    type SpatialBounds = BoundingBox2D;
359
    type Selection = ColumnSelection;
360
    type ResultDescription = VectorResultDescriptor;
361

362
    async fn _query<'a>(
363
        &'a self,
364
        query: VectorQueryRectangle,
365
        ctx: &'a dyn QueryContext,
366
    ) -> Result<BoxStream<'a, Result<Self::Output>>> {
5✔
367
        let result_stream = self
5✔
368
            .left_processor
5✔
369
            .query(query.clone(), ctx)
5✔
UNCOV
370
            .await?
×
371
            .and_then(move |left_collection| {
5✔
372
                let query = query.clone();
5✔
373
                async move {
5✔
374
                    // This implementation is a nested-loop join
5✔
375
                    let left_collection = Arc::new(left_collection);
5✔
376

377
                    let data_query = self.right_processor.query(query, ctx).await?;
5✔
378

379
                    let out = data_query
5✔
380
                        .flat_map(move |right_collection| {
5✔
381
                            match right_collection.and_then(|right_collection| {
5✔
382
                                self.join(
5✔
383
                                    left_collection.clone(),
5✔
384
                                    right_collection,
5✔
385
                                    ctx.chunk_byte_size().into(),
5✔
386
                                )
5✔
387
                            }) {
5✔
388
                                Ok(batch_iter) => stream::iter(batch_iter).boxed(),
5✔
UNCOV
389
                                Err(e) => stream::once(async { Err(e) }).boxed(),
×
390
                            }
391
                        })
5✔
392
                        .boxed();
5✔
393
                    Ok(out)
5✔
394
                }
5✔
395
            })
5✔
396
            .try_flatten();
5✔
397

5✔
398
        Ok(
5✔
399
            FeatureCollectionChunkMerger::new(result_stream.fuse(), ctx.chunk_byte_size().into())
5✔
400
                .boxed(),
5✔
401
        )
5✔
402
    }
10✔
403

404
    fn result_descriptor(&self) -> &Self::ResultDescription {
5✔
405
        &self.result_descriptor
5✔
406
    }
5✔
407
}
408

409
#[cfg(test)]
410
mod tests {
411
    use futures::executor::block_on_stream;
412

413
    use geoengine_datatypes::collections::{
414
        ChunksEqualIgnoringCacheHint, MultiPointCollection, VectorDataType,
415
    };
416
    use geoengine_datatypes::primitives::CacheHint;
417
    use geoengine_datatypes::primitives::{
418
        BoundingBox2D, FeatureData, MultiPoint, SpatialResolution, TimeInterval,
419
    };
420
    use geoengine_datatypes::spatial_reference::SpatialReference;
421
    use geoengine_datatypes::util::test::TestDefault;
422

423
    use crate::engine::{
424
        ChunkByteSize, MockExecutionContext, MockQueryContext, VectorOperator, WorkflowOperatorPath,
425
    };
426
    use crate::mock::MockFeatureCollectionSource;
427

428
    use super::*;
429
    use crate::processing::vector_join::util::translation_table;
430

431
    async fn join_mock_collections(
5✔
432
        left: MultiPointCollection,
5✔
433
        right: DataCollection,
5✔
434
        left_join_column: &str,
5✔
435
        right_join_column: &str,
5✔
436
        right_suffix: &str,
5✔
437
    ) -> Vec<MultiPointCollection> {
5✔
438
        let execution_context = MockExecutionContext::test_default();
5✔
439

440
        let left = MockFeatureCollectionSource::single(left)
5✔
441
            .boxed()
5✔
442
            .initialize(WorkflowOperatorPath::initialize_root(), &execution_context)
5✔
443
            .await
×
444
            .unwrap();
5✔
445
        let right = MockFeatureCollectionSource::single(right)
5✔
446
            .boxed()
5✔
447
            .initialize(WorkflowOperatorPath::initialize_root(), &execution_context)
5✔
448
            .await
×
449
            .unwrap();
5✔
450

5✔
451
        let left_processor = left.query_processor().unwrap().multi_point().unwrap();
5✔
452
        let right_processor = right.query_processor().unwrap().data().unwrap();
5✔
453

5✔
454
        let query_rectangle = VectorQueryRectangle {
5✔
455
            spatial_bounds: BoundingBox2D::new(
5✔
456
                (f64::MIN, f64::MIN).into(),
5✔
457
                (f64::MAX, f64::MAX).into(),
5✔
458
            )
5✔
459
            .unwrap(),
5✔
460
            time_interval: TimeInterval::default(),
5✔
461
            spatial_resolution: SpatialResolution::zero_point_one(),
5✔
462
            attributes: ColumnSelection::all(),
5✔
463
        };
5✔
464

5✔
465
        let ctx = MockQueryContext::new(ChunkByteSize::MAX);
5✔
466

5✔
467
        let processor = EquiGeoToDataJoinProcessor::new(
5✔
468
            VectorResultDescriptor {
5✔
469
                data_type: VectorDataType::MultiPoint,
5✔
470
                spatial_reference: SpatialReference::epsg_4326().into(),
5✔
471
                columns: left
5✔
472
                    .result_descriptor()
5✔
473
                    .columns
5✔
474
                    .clone()
5✔
475
                    .into_iter()
5✔
476
                    .chain(right.result_descriptor().columns.clone().into_iter())
5✔
477
                    .collect(),
5✔
478
                time: None,
5✔
479
                bbox: None,
5✔
480
            },
5✔
481
            left_processor,
5✔
482
            right_processor,
5✔
483
            left_join_column.to_string(),
5✔
484
            right_join_column.to_string(),
5✔
485
            translation_table(
5✔
486
                left.result_descriptor().columns.keys(),
5✔
487
                right.result_descriptor().columns.keys(),
5✔
488
                right_suffix,
5✔
489
            ),
5✔
490
        );
5✔
491

5✔
492
        block_on_stream(processor.query(query_rectangle, &ctx).await.unwrap())
5✔
493
            .collect::<Result<_>>()
5✔
494
            .unwrap()
5✔
495
    }
5✔
496

497
    #[tokio::test]
498
    async fn join() {
1✔
499
        let left = MultiPointCollection::from_data(
1✔
500
            MultiPoint::many(vec![(0.0, 0.1), (1.0, 1.1)]).unwrap(),
1✔
501
            vec![TimeInterval::default(); 2],
1✔
502
            [("foo".to_string(), FeatureData::Int(vec![1, 2]))]
1✔
503
                .iter()
1✔
504
                .cloned()
1✔
505
                .collect(),
1✔
506
            CacheHint::default(),
1✔
507
        )
1✔
508
        .unwrap();
1✔
509

1✔
510
        let right = DataCollection::from_data(
1✔
511
            vec![],
1✔
512
            vec![TimeInterval::default(); 2],
1✔
513
            [("bar".to_string(), FeatureData::Int(vec![2, 2]))]
1✔
514
                .iter()
1✔
515
                .cloned()
1✔
516
                .collect(),
1✔
517
            CacheHint::default(),
1✔
518
        )
1✔
519
        .unwrap();
1✔
520

1✔
521
        let expected_result = MultiPointCollection::from_data(
1✔
522
            MultiPoint::many(vec![(1.0, 1.1), (1.0, 1.1)]).unwrap(),
1✔
523
            vec![TimeInterval::default(); 2],
1✔
524
            [
1✔
525
                ("foo".to_string(), FeatureData::Int(vec![2, 2])),
1✔
526
                ("bar".to_string(), FeatureData::Int(vec![2, 2])),
1✔
527
            ]
1✔
528
            .iter()
1✔
529
            .cloned()
1✔
530
            .collect(),
1✔
531
            CacheHint::default(),
1✔
532
        )
1✔
533
        .unwrap();
1✔
534

1✔
535
        let result = join_mock_collections(left, right, "foo", "bar", "").await;
1✔
536

1✔
537
        assert_eq!(result.len(), 1);
1✔
538
        assert!(result[0].chunks_equal_ignoring_cache_hint(&expected_result));
1✔
539
    }
1✔
540

541
    #[tokio::test]
542
    async fn time_intervals() {
1✔
543
        let left = MultiPointCollection::from_data(
1✔
544
            MultiPoint::many(vec![(0.0, 0.1), (1.0, 1.1)]).unwrap(),
1✔
545
            vec![
1✔
546
                TimeInterval::new_unchecked(0, 2),
1✔
547
                TimeInterval::new_unchecked(4, 5),
1✔
548
            ],
1✔
549
            [("foo".to_string(), FeatureData::Int(vec![1, 2]))]
1✔
550
                .iter()
1✔
551
                .cloned()
1✔
552
                .collect(),
1✔
553
            CacheHint::default(),
1✔
554
        )
1✔
555
        .unwrap();
1✔
556

1✔
557
        let right = DataCollection::from_data(
1✔
558
            vec![],
1✔
559
            vec![
1✔
560
                TimeInterval::new_unchecked(1, 3),
1✔
561
                TimeInterval::new_unchecked(5, 6),
1✔
562
            ],
1✔
563
            [("bar".to_string(), FeatureData::Int(vec![1, 2]))]
1✔
564
                .iter()
1✔
565
                .cloned()
1✔
566
                .collect(),
1✔
567
            CacheHint::default(),
1✔
568
        )
1✔
569
        .unwrap();
1✔
570

1✔
571
        let expected_result = MultiPointCollection::from_data(
1✔
572
            MultiPoint::many(vec![(0.0, 0.1)]).unwrap(),
1✔
573
            vec![TimeInterval::new_unchecked(1, 2)],
1✔
574
            [
1✔
575
                ("foo".to_string(), FeatureData::Int(vec![1])),
1✔
576
                ("bar".to_string(), FeatureData::Int(vec![1])),
1✔
577
            ]
1✔
578
            .iter()
1✔
579
            .cloned()
1✔
580
            .collect(),
1✔
581
            CacheHint::default(),
1✔
582
        )
1✔
583
        .unwrap();
1✔
584

1✔
585
        let result = join_mock_collections(left, right, "foo", "bar", "").await;
1✔
586

1✔
587
        assert_eq!(result.len(), 1);
1✔
588
        assert!(result[0].chunks_equal_ignoring_cache_hint(&expected_result));
1✔
589
    }
1✔
590

591
    #[tokio::test]
592
    async fn name_collision() {
1✔
593
        let left = MultiPointCollection::from_data(
1✔
594
            MultiPoint::many(vec![(0.0, 0.1), (1.0, 1.1)]).unwrap(),
1✔
595
            vec![TimeInterval::default(); 2],
1✔
596
            [("foo".to_string(), FeatureData::Int(vec![1, 2]))]
1✔
597
                .iter()
1✔
598
                .cloned()
1✔
599
                .collect(),
1✔
600
            CacheHint::default(),
1✔
601
        )
1✔
602
        .unwrap();
1✔
603

1✔
604
        let right = DataCollection::from_data(
1✔
605
            vec![],
1✔
606
            vec![TimeInterval::default(); 2],
1✔
607
            [("foo".to_string(), FeatureData::Int(vec![1, 2]))]
1✔
608
                .iter()
1✔
609
                .cloned()
1✔
610
                .collect(),
1✔
611
            CacheHint::default(),
1✔
612
        )
1✔
613
        .unwrap();
1✔
614

1✔
615
        let expected_result = MultiPointCollection::from_data(
1✔
616
            MultiPoint::many(vec![(0.0, 0.1), (1.0, 1.1)]).unwrap(),
1✔
617
            vec![TimeInterval::default(); 2],
1✔
618
            [
1✔
619
                ("foo".to_string(), FeatureData::Int(vec![1, 2])),
1✔
620
                ("foo2".to_string(), FeatureData::Int(vec![1, 2])),
1✔
621
            ]
1✔
622
            .iter()
1✔
623
            .cloned()
1✔
624
            .collect(),
1✔
625
            CacheHint::default(),
1✔
626
        )
1✔
627
        .unwrap();
1✔
628

1✔
629
        let result = join_mock_collections(left, right, "foo", "foo", "2").await;
1✔
630

1✔
631
        assert_eq!(result.len(), 1);
1✔
632
        assert!(result[0].chunks_equal_ignoring_cache_hint(&expected_result));
1✔
633
    }
1✔
634

635
    #[tokio::test]
636
    async fn multi_match_geo() {
1✔
637
        let left = MultiPointCollection::from_data(
1✔
638
            MultiPoint::many(vec![(0.0, 0.1), (1.0, 1.1)]).unwrap(),
1✔
639
            vec![TimeInterval::default(); 2],
1✔
640
            [("foo".to_string(), FeatureData::Int(vec![1, 2]))]
1✔
641
                .iter()
1✔
642
                .cloned()
1✔
643
                .collect(),
1✔
644
            CacheHint::default(),
1✔
645
        )
1✔
646
        .unwrap();
1✔
647

1✔
648
        let right = DataCollection::from_data(
1✔
649
            vec![],
1✔
650
            vec![TimeInterval::default(); 5],
1✔
651
            [
1✔
652
                ("bar".to_string(), FeatureData::Int(vec![1, 1, 1, 2, 2])),
1✔
653
                (
1✔
654
                    "baz".to_string(),
1✔
655
                    FeatureData::Text(vec![
1✔
656
                        "this".to_string(),
1✔
657
                        "is".to_string(),
1✔
658
                        "the".to_string(),
1✔
659
                        "way".to_string(),
1✔
660
                        "!".to_string(),
1✔
661
                    ]),
1✔
662
                ),
1✔
663
            ]
1✔
664
            .iter()
1✔
665
            .cloned()
1✔
666
            .collect(),
1✔
667
            CacheHint::default(),
1✔
668
        )
1✔
669
        .unwrap();
1✔
670

1✔
671
        let expected_result = MultiPointCollection::from_data(
1✔
672
            MultiPoint::many(vec![
1✔
673
                (0.0, 0.1),
1✔
674
                (0.0, 0.1),
1✔
675
                (0.0, 0.1),
1✔
676
                (1.0, 1.1),
1✔
677
                (1.0, 1.1),
1✔
678
            ])
1✔
679
            .unwrap(),
1✔
680
            vec![TimeInterval::default(); 5],
1✔
681
            [
1✔
682
                ("foo".to_string(), FeatureData::Int(vec![1, 1, 1, 2, 2])),
1✔
683
                ("bar".to_string(), FeatureData::Int(vec![1, 1, 1, 2, 2])),
1✔
684
                (
1✔
685
                    "baz".to_string(),
1✔
686
                    FeatureData::Text(vec![
1✔
687
                        "this".to_string(),
1✔
688
                        "is".to_string(),
1✔
689
                        "the".to_string(),
1✔
690
                        "way".to_string(),
1✔
691
                        "!".to_string(),
1✔
692
                    ]),
1✔
693
                ),
1✔
694
            ]
1✔
695
            .iter()
1✔
696
            .cloned()
1✔
697
            .collect(),
1✔
698
            CacheHint::default(),
1✔
699
        )
1✔
700
        .unwrap();
1✔
701

1✔
702
        let result = join_mock_collections(left, right, "foo", "bar", "").await;
1✔
703

1✔
704
        assert_eq!(result.len(), 1);
1✔
705
        assert!(result[0].chunks_equal_ignoring_cache_hint(&expected_result));
1✔
706
    }
1✔
707

708
    #[tokio::test]
709
    async fn no_matches() {
1✔
710
        let left = MultiPointCollection::from_data(
1✔
711
            MultiPoint::many(vec![(0.0, 0.1), (1.0, 1.1)]).unwrap(),
1✔
712
            vec![TimeInterval::default(); 2],
1✔
713
            [("foo".to_string(), FeatureData::Int(vec![1, 2]))]
1✔
714
                .iter()
1✔
715
                .cloned()
1✔
716
                .collect(),
1✔
717
            CacheHint::default(),
1✔
718
        )
1✔
719
        .unwrap();
1✔
720

1✔
721
        let right = DataCollection::from_data(
1✔
722
            vec![],
1✔
723
            vec![TimeInterval::default(); 2],
1✔
724
            [("bar".to_string(), FeatureData::Int(vec![3, 4]))]
1✔
725
                .iter()
1✔
726
                .cloned()
1✔
727
                .collect(),
1✔
728
            CacheHint::default(),
1✔
729
        )
1✔
730
        .unwrap();
1✔
731

1✔
732
        let result = join_mock_collections(left, right, "foo", "bar", "").await;
1✔
733

1✔
734
        assert_eq!(result.len(), 1);
1✔
735
        assert!(result[0].is_empty());
1✔
736
    }
1✔
737
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc