• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 10178074589

31 Jul 2024 09:34AM UTC coverage: 91.068% (+0.4%) from 90.682%
10178074589

push

github

web-flow
Merge pull request #973 from geo-engine/remove-XGB-update-toolchain

Remove-XGB-update-toolchain

81 of 88 new or added lines in 29 files covered. (92.05%)

456 existing lines in 119 files now uncovered.

131088 of 143945 relevant lines covered (91.07%)

53581.03 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.27
/operators/src/processing/vector_join/mod.rs
1
use geoengine_datatypes::dataset::NamedData;
2
use serde::{Deserialize, Serialize};
3
use snafu::ensure;
4

5
use geoengine_datatypes::collections::VectorDataType;
6

7
use crate::engine::{
8
    CanonicOperatorName, ExecutionContext, InitializedSources, InitializedVectorOperator, Operator,
9
    OperatorData, OperatorName, TypedVectorQueryProcessor, VectorOperator, VectorQueryProcessor,
10
    VectorResultDescriptor, WorkflowOperatorPath,
11
};
12
use crate::error;
13
use crate::util::Result;
14

15
use self::equi_data_join::EquiGeoToDataJoinProcessor;
16
use crate::processing::vector_join::util::translation_table;
17
use async_trait::async_trait;
18
use std::collections::HashMap;
19

20
mod equi_data_join;
21
mod util;
22

23
/// The vector join operator requires two inputs and the join type.
24
pub type VectorJoin = Operator<VectorJoinParams, VectorJoinSources>;
25

26
impl OperatorName for VectorJoin {
27
    const TYPE_NAME: &'static str = "VectorJoin";
28
}
29

30
/// A set of parameters for the `VectorJoin`
31
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
1✔
32
#[serde(rename_all = "camelCase")]
33
pub struct VectorJoinParams {
34
    #[serde(flatten)]
35
    join_type: VectorJoinType,
36
}
37

UNCOV
38
#[derive(Debug, Clone, Serialize, Deserialize)]
×
39
#[serde(rename_all = "camelCase")]
40
pub struct VectorJoinSources {
41
    left: Box<dyn VectorOperator>,
42
    right: Box<dyn VectorOperator>,
43
}
44

45
impl OperatorData for VectorJoinSources {
46
    fn data_names_collect(&self, data_names: &mut Vec<NamedData>) {
×
47
        self.left.data_names_collect(data_names);
×
48
        self.right.data_names_collect(data_names);
×
49
    }
×
50
}
51

52
pub struct InitializedVectorJoinSources {
53
    left: Box<dyn InitializedVectorOperator>,
54
    right: Box<dyn InitializedVectorOperator>,
55
}
56

57
#[async_trait]
58
impl InitializedSources<InitializedVectorJoinSources> for VectorJoinSources {
59
    async fn initialize_sources(
60
        self,
61
        path: WorkflowOperatorPath,
62
        context: &dyn ExecutionContext,
63
    ) -> Result<InitializedVectorJoinSources> {
2✔
64
        Ok(InitializedVectorJoinSources {
2✔
65
            left: self
2✔
66
                .left
2✔
67
                .initialize(path.clone_and_append(0), context)
2✔
68
                .await?,
2✔
69
            right: self
2✔
70
                .right
2✔
71
                .initialize(path.clone_and_append(1), context)
2✔
72
                .await?,
2✔
73
        })
2✔
74
    }
2✔
75
}
76

77
/// Define the type of join
78
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
5✔
79
#[serde(tag = "type")]
80
pub enum VectorJoinType {
81
    /// An inner equi-join between a `GeoFeatureCollection` and a `DataCollection`
82
    EquiGeoToData {
83
        left_column: String,
84
        right_column: String,
85
        /// which suffix to use if columns have conflicting names?
86
        /// the default is "right"
87
        right_column_suffix: Option<String>,
88
    },
89
}
90

91
#[typetag::serde]
×
92
#[async_trait]
93
impl VectorOperator for VectorJoin {
94
    async fn _initialize(
95
        self: Box<Self>,
96
        path: WorkflowOperatorPath,
97
        context: &dyn ExecutionContext,
98
    ) -> Result<Box<dyn InitializedVectorOperator>> {
2✔
99
        let name = CanonicOperatorName::from(&self);
2✔
100

2✔
101
        let initialized_sources = self.sources.initialize_sources(path, context).await?;
2✔
102

2✔
103
        match &self.params.join_type {
2✔
104
            VectorJoinType::EquiGeoToData {
2✔
105
                left_column,
2✔
106
                right_column,
2✔
107
                right_column_suffix: _,
2✔
108
            } => {
2✔
109
                let left_rd = initialized_sources.left.result_descriptor();
2✔
110
                let right_rd = initialized_sources.right.result_descriptor();
2✔
111

2✔
112
                ensure!(
2✔
113
                    left_rd.columns.contains_key(left_column),
2✔
114
                    error::ColumnDoesNotExist {
2✔
115
                        column: left_column.clone()
1✔
116
                    }
1✔
117
                );
2✔
118
                ensure!(
2✔
119
                    right_rd.columns.contains_key(right_column),
1✔
120
                    error::ColumnDoesNotExist {
2✔
121
                        column: right_column.clone()
×
122
                    }
×
123
                );
2✔
124

2✔
125
                ensure!(
2✔
126
                    left_rd.data_type != VectorDataType::Data,
1✔
127
                    error::InvalidType {
2✔
128
                        expected: "a geo data collection".to_string(),
×
129
                        found: initialized_sources
×
130
                            .left
×
131
                            .result_descriptor()
×
132
                            .data_type
×
133
                            .to_string(),
×
134
                    }
×
135
                );
2✔
136
                ensure!(
2✔
137
                    right_rd.data_type == VectorDataType::Data,
1✔
138
                    error::InvalidType {
2✔
139
                        expected: VectorDataType::Data.to_string(),
×
140
                        found: initialized_sources
×
141
                            .right
×
142
                            .result_descriptor()
×
143
                            .data_type
×
144
                            .to_string(),
×
145
                    }
×
146
                );
2✔
147
            }
2✔
148
        }
2✔
149

2✔
150
        // TODO: find out if column prefixes are the same for more than one join type and generify
2✔
151
        let column_translation_table = match &self.params.join_type {
2✔
152
            VectorJoinType::EquiGeoToData {
1✔
153
                right_column_suffix,
1✔
154
                ..
1✔
155
            } => {
1✔
156
                let right_column_suffix: &str =
1✔
157
                    right_column_suffix.as_ref().map_or("right", String::as_str);
1✔
158
                translation_table(
1✔
159
                    initialized_sources.left.result_descriptor().columns.keys(),
1✔
160
                    initialized_sources.right.result_descriptor().columns.keys(),
1✔
161
                    right_column_suffix,
1✔
162
                )
1✔
163
            }
1✔
164
        };
1✔
165

1✔
166
        let result_descriptor =
1✔
167
            initialized_sources
1✔
168
                .left
1✔
169
                .result_descriptor()
1✔
170
                .map_columns(|left_columns| {
1✔
171
                    let mut columns = left_columns.clone();
1✔
172
                    for (right_column_name, right_column_type) in
2✔
173
                        &initialized_sources.right.result_descriptor().columns
2✔
174
                    {
2✔
175
                        columns.insert(
1✔
176
                            column_translation_table[right_column_name].clone(),
1✔
177
                            right_column_type.clone(),
1✔
178
                        );
1✔
179
                    }
1✔
180
                    columns
2✔
181
                });
1✔
182

1✔
183
        let initialized_operator = InitializedVectorJoin {
1✔
184
            name,
1✔
185
            result_descriptor,
1✔
186
            left: initialized_sources.left,
1✔
187
            right: initialized_sources.right,
1✔
188
            state: InitializedVectorJoinParams {
1✔
189
                join_type: self.params.join_type.clone(),
1✔
190
                column_translation_table,
1✔
191
            },
1✔
192
        };
1✔
193

1✔
194
        Ok(initialized_operator.boxed())
1✔
195
    }
2✔
196

197
    span_fn!(VectorJoin);
198
}
199

200
/// A set of parameters for the `VectorJoin`
201
#[derive(Debug, Clone, PartialEq, Eq)]
202
pub struct InitializedVectorJoinParams {
203
    join_type: VectorJoinType,
204
    column_translation_table: HashMap<String, String>,
205
}
206

207
pub struct InitializedVectorJoin {
208
    name: CanonicOperatorName,
209
    result_descriptor: VectorResultDescriptor,
210
    left: Box<dyn InitializedVectorOperator>,
211
    right: Box<dyn InitializedVectorOperator>,
212
    state: InitializedVectorJoinParams,
213
}
214

215
impl InitializedVectorOperator for InitializedVectorJoin {
216
    fn query_processor(&self) -> Result<TypedVectorQueryProcessor> {
×
217
        match &self.state.join_type {
×
218
            VectorJoinType::EquiGeoToData {
×
219
                left_column,
×
220
                right_column,
×
221
                right_column_suffix: _right_column_suffix,
×
222
            } => {
223
                let right_processor = self
×
224
                    .right
×
225
                    .query_processor()?
×
226
                    .data()
×
227
                    .expect("checked in constructor");
×
228

229
                let left = self.left.query_processor()?;
×
230

231
                Ok(match left {
×
232
                    TypedVectorQueryProcessor::Data(_) => unreachable!("check in constructor"),
×
233
                    TypedVectorQueryProcessor::MultiPoint(left_processor) => {
×
234
                        TypedVectorQueryProcessor::MultiPoint(
×
235
                            EquiGeoToDataJoinProcessor::new(
×
236
                                self.result_descriptor.clone(),
×
237
                                left_processor,
×
238
                                right_processor,
×
239
                                left_column.clone(),
×
240
                                right_column.clone(),
×
241
                                self.state.column_translation_table.clone(),
×
242
                            )
×
243
                            .boxed(),
×
244
                        )
×
245
                    }
246
                    TypedVectorQueryProcessor::MultiLineString(left_processor) => {
×
247
                        TypedVectorQueryProcessor::MultiLineString(
×
248
                            EquiGeoToDataJoinProcessor::new(
×
249
                                self.result_descriptor.clone(),
×
250
                                left_processor,
×
251
                                right_processor,
×
252
                                left_column.clone(),
×
253
                                right_column.clone(),
×
254
                                self.state.column_translation_table.clone(),
×
255
                            )
×
256
                            .boxed(),
×
257
                        )
×
258
                    }
259
                    TypedVectorQueryProcessor::MultiPolygon(left_processor) => {
×
260
                        TypedVectorQueryProcessor::MultiPolygon(
×
261
                            EquiGeoToDataJoinProcessor::new(
×
262
                                self.result_descriptor.clone(),
×
263
                                left_processor,
×
264
                                right_processor,
×
265
                                left_column.clone(),
×
266
                                right_column.clone(),
×
267
                                self.state.column_translation_table.clone(),
×
268
                            )
×
269
                            .boxed(),
×
270
                        )
×
271
                    }
272
                })
273
            }
274
        }
275
    }
×
276

277
    fn result_descriptor(&self) -> &VectorResultDescriptor {
×
278
        &self.result_descriptor
×
279
    }
×
280

281
    fn canonic_name(&self) -> CanonicOperatorName {
×
282
        self.name.clone()
×
283
    }
×
284
}
285

286
#[cfg(test)]
287
mod tests {
288
    use super::*;
289
    use crate::engine::MockExecutionContext;
290
    use crate::mock::MockFeatureCollectionSource;
291
    use geoengine_datatypes::collections::{DataCollection, MultiPointCollection};
292
    use geoengine_datatypes::primitives::{FeatureData, NoGeometry, TimeInterval};
293
    use geoengine_datatypes::util::test::TestDefault;
294

295
    #[test]
296
    fn params() {
1✔
297
        let params = VectorJoinParams {
1✔
298
            join_type: VectorJoinType::EquiGeoToData {
1✔
299
                left_column: "foo".to_string(),
1✔
300
                right_column: "bar".to_string(),
1✔
301
                right_column_suffix: Some("baz".to_string()),
1✔
302
            },
1✔
303
        };
1✔
304

1✔
305
        let json = serde_json::json!({
1✔
306
            "type": "EquiGeoToData",
1✔
307
            "left_column": "foo",
1✔
308
            "right_column": "bar",
1✔
309
            "right_column_suffix": "baz",
1✔
310
        });
1✔
311

1✔
312
        assert_eq!(json, serde_json::to_value(&params).unwrap());
1✔
313

314
        let params_deserialized: VectorJoinParams = serde_json::from_value(json).unwrap();
1✔
315

1✔
316
        assert_eq!(params, params_deserialized);
1✔
317
    }
1✔
318

319
    #[tokio::test]
320
    async fn initialization() {
1✔
321
        let operator = VectorJoin {
1✔
322
            params: VectorJoinParams {
1✔
323
                join_type: VectorJoinType::EquiGeoToData {
1✔
324
                    left_column: "left_join_column".to_string(),
1✔
325
                    right_column: "right_join_column".to_string(),
1✔
326
                    right_column_suffix: Some("baz".to_string()),
1✔
327
                },
1✔
328
            },
1✔
329
            sources: VectorJoinSources {
1✔
330
                left: MockFeatureCollectionSource::single(
1✔
331
                    MultiPointCollection::from_slices(
1✔
332
                        &[(0.0, 0.1)],
1✔
333
                        &[TimeInterval::default()],
1✔
334
                        &[("left_join_column", FeatureData::Int(vec![5]))],
1✔
335
                    )
1✔
336
                    .unwrap(),
1✔
337
                )
1✔
338
                .boxed(),
1✔
339
                right: MockFeatureCollectionSource::single(
1✔
340
                    DataCollection::from_slices(
1✔
341
                        &[] as &[NoGeometry],
1✔
342
                        &[TimeInterval::default()],
1✔
343
                        &[("right_join_column", FeatureData::Int(vec![5]))],
1✔
344
                    )
1✔
345
                    .unwrap(),
1✔
346
                )
1✔
347
                .boxed(),
1✔
348
            },
1✔
349
        };
1✔
350

1✔
351
        operator
1✔
352
            .boxed()
1✔
353
            .initialize(
1✔
354
                WorkflowOperatorPath::initialize_root(),
1✔
355
                &MockExecutionContext::test_default(),
1✔
356
            )
1✔
357
            .await
1✔
358
            .unwrap();
1✔
359
    }
1✔
360

361
    #[tokio::test]
362
    async fn it_checks_columns() {
1✔
363
        let operator = VectorJoin {
1✔
364
            params: VectorJoinParams {
1✔
365
                join_type: VectorJoinType::EquiGeoToData {
1✔
366
                    left_column: "foo".to_string(),
1✔
367
                    right_column: "bar".to_string(),
1✔
368
                    right_column_suffix: Some("baz".to_string()),
1✔
369
                },
1✔
370
            },
1✔
371
            sources: VectorJoinSources {
1✔
372
                left: MockFeatureCollectionSource::single(
1✔
373
                    MultiPointCollection::from_slices(
1✔
374
                        &[(0.0, 0.1)],
1✔
375
                        &[TimeInterval::default()],
1✔
376
                        &[("join_column", FeatureData::Int(vec![5]))],
1✔
377
                    )
1✔
378
                    .unwrap(),
1✔
379
                )
1✔
380
                .boxed(),
1✔
381
                right: MockFeatureCollectionSource::single(
1✔
382
                    DataCollection::from_slices(
1✔
383
                        &[] as &[NoGeometry],
1✔
384
                        &[TimeInterval::default()],
1✔
385
                        &[("join_column", FeatureData::Int(vec![5]))],
1✔
386
                    )
1✔
387
                    .unwrap(),
1✔
388
                )
1✔
389
                .boxed(),
1✔
390
            },
1✔
391
        };
1✔
392

1✔
393
        assert!(matches!(
1✔
394
            operator
1✔
395
                .boxed()
1✔
396
                .initialize(WorkflowOperatorPath::initialize_root(), &MockExecutionContext::test_default())
1✔
397
                .await,
1✔
398
            Err(error::Error::ColumnDoesNotExist { column }) if column == "foo"
1✔
399
        ));
1✔
400
    }
1✔
401
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc