• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 3929938005

pending completion
3929938005

push

github

GitHub
Merge #713

84930 of 96741 relevant lines covered (87.79%)

79640.1 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.97
/operators/src/processing/vector_join/mod.rs
1
use geoengine_datatypes::dataset::DataId;
2
use serde::{Deserialize, Serialize};
3
use snafu::ensure;
4

5
use geoengine_datatypes::collections::VectorDataType;
6
use tracing::{span, Level};
7

8
use crate::engine::{
9
    CreateSpan, ExecutionContext, InitializedVectorOperator, Operator, OperatorData, OperatorName,
10
    TypedVectorQueryProcessor, VectorOperator, VectorQueryProcessor, VectorResultDescriptor,
11
};
12
use crate::error;
13
use crate::util::Result;
14

15
use self::equi_data_join::EquiGeoToDataJoinProcessor;
16
use crate::processing::vector_join::util::translation_table;
17
use async_trait::async_trait;
18
use std::collections::HashMap;
19

20
mod equi_data_join;
21
mod util;
22

23
/// The vector join operator requires two inputs and the join type.
24
pub type VectorJoin = Operator<VectorJoinParams, VectorJoinSources>;
25

26
impl OperatorName for VectorJoin {
27
    const TYPE_NAME: &'static str = "VectorJoin";
28
}
29

30
/// A set of parameters for the `VectorJoin`
31
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
1✔
32
#[serde(rename_all = "camelCase")]
33
pub struct VectorJoinParams {
34
    #[serde(flatten)]
35
    join_type: VectorJoinType,
36
}
37

38
#[derive(Debug, Clone, Serialize, Deserialize)]
×
39
#[serde(rename_all = "camelCase")]
40
pub struct VectorJoinSources {
41
    left: Box<dyn VectorOperator>,
42
    right: Box<dyn VectorOperator>,
43
}
44

45
impl OperatorData for VectorJoinSources {
46
    fn data_ids_collect(&self, data_ids: &mut Vec<DataId>) {
×
47
        self.left.data_ids_collect(data_ids);
×
48
        self.right.data_ids_collect(data_ids);
×
49
    }
×
50
}
51

52
/// Define the type of join
53
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
9✔
54
#[serde(tag = "type")]
55
pub enum VectorJoinType {
56
    /// An inner equi-join between a `GeoFeatureCollection` and a `DataCollection`
57
    EquiGeoToData {
58
        left_column: String,
59
        right_column: String,
60
        /// which suffix to use if columns have conflicting names?
61
        /// the default is "right"
62
        right_column_suffix: Option<String>,
63
    },
64
}
65

66
#[typetag::serde]
×
67
#[async_trait]
68
impl VectorOperator for VectorJoin {
69
    async fn _initialize(
2✔
70
        self: Box<Self>,
2✔
71
        context: &dyn ExecutionContext,
2✔
72
    ) -> Result<Box<dyn InitializedVectorOperator>> {
2✔
73
        let left = self.sources.left.initialize(context).await?;
2✔
74
        let right = self.sources.right.initialize(context).await?;
2✔
75

76
        match &self.params.join_type {
2✔
77
            VectorJoinType::EquiGeoToData {
2✔
78
                left_column,
2✔
79
                right_column,
2✔
80
                right_column_suffix: _,
2✔
81
            } => {
2✔
82
                let left_rd = left.result_descriptor();
2✔
83
                let right_rd = right.result_descriptor();
2✔
84

2✔
85
                ensure!(
2✔
86
                    left_rd.columns.contains_key(left_column),
2✔
87
                    error::ColumnDoesNotExist {
1✔
88
                        column: left_column.clone()
1✔
89
                    }
1✔
90
                );
91
                ensure!(
1✔
92
                    right_rd.columns.contains_key(right_column),
1✔
93
                    error::ColumnDoesNotExist {
×
94
                        column: right_column.clone()
×
95
                    }
×
96
                );
97

98
                ensure!(
1✔
99
                    left_rd.data_type != VectorDataType::Data,
1✔
100
                    error::InvalidType {
×
101
                        expected: "a geo data collection".to_string(),
×
102
                        found: left.result_descriptor().data_type.to_string(),
×
103
                    }
×
104
                );
105
                ensure!(
1✔
106
                    right_rd.data_type == VectorDataType::Data,
1✔
107
                    error::InvalidType {
×
108
                        expected: VectorDataType::Data.to_string(),
×
109
                        found: right.result_descriptor().data_type.to_string(),
×
110
                    }
×
111
                );
112
            }
113
        }
114

115
        // TODO: find out if column prefixes are the same for more than one join type and generify
116
        let column_translation_table = match &self.params.join_type {
1✔
117
            VectorJoinType::EquiGeoToData {
1✔
118
                right_column_suffix,
1✔
119
                ..
1✔
120
            } => {
1✔
121
                let right_column_suffix: &str =
1✔
122
                    right_column_suffix.as_ref().map_or("right", String::as_str);
1✔
123
                translation_table(
1✔
124
                    left.result_descriptor().columns.keys(),
1✔
125
                    right.result_descriptor().columns.keys(),
1✔
126
                    right_column_suffix,
1✔
127
                )
1✔
128
            }
1✔
129
        };
1✔
130

1✔
131
        let result_descriptor = left.result_descriptor().map_columns(|left_columns| {
1✔
132
            let mut columns = left_columns.clone();
1✔
133
            for (right_column_name, right_column_type) in &right.result_descriptor().columns {
1✔
134
                columns.insert(
1✔
135
                    column_translation_table[right_column_name].clone(),
1✔
136
                    right_column_type.clone(),
1✔
137
                );
1✔
138
            }
1✔
139
            columns
1✔
140
        });
1✔
141

1✔
142
        let initialized_operator = InitializedVectorJoin {
1✔
143
            result_descriptor,
1✔
144
            left,
1✔
145
            right,
1✔
146
            state: InitializedVectorJoinParams {
1✔
147
                join_type: self.params.join_type.clone(),
1✔
148
                column_translation_table,
1✔
149
            },
1✔
150
        };
1✔
151

1✔
152
        Ok(initialized_operator.boxed())
1✔
153
    }
4✔
154

155
    span_fn!(VectorJoin);
×
156
}
157

158
/// A set of parameters for the `VectorJoin`
159
#[derive(Debug, Clone, PartialEq, Eq)]
×
160
pub struct InitializedVectorJoinParams {
161
    join_type: VectorJoinType,
162
    column_translation_table: HashMap<String, String>,
163
}
164

165
pub struct InitializedVectorJoin {
166
    result_descriptor: VectorResultDescriptor,
167
    left: Box<dyn InitializedVectorOperator>,
168
    right: Box<dyn InitializedVectorOperator>,
169
    state: InitializedVectorJoinParams,
170
}
171

172
impl InitializedVectorOperator for InitializedVectorJoin {
173
    fn query_processor(&self) -> Result<TypedVectorQueryProcessor> {
×
174
        match &self.state.join_type {
×
175
            VectorJoinType::EquiGeoToData {
×
176
                left_column,
×
177
                right_column,
×
178
                right_column_suffix: _right_column_suffix,
×
179
            } => {
180
                let right_processor = self
×
181
                    .right
×
182
                    .query_processor()?
×
183
                    .data()
×
184
                    .expect("checked in constructor");
×
185

186
                let left = self.left.query_processor()?;
×
187

188
                Ok(match left {
×
189
                    TypedVectorQueryProcessor::Data(_) => unreachable!("check in constructor"),
×
190
                    TypedVectorQueryProcessor::MultiPoint(left_processor) => {
×
191
                        TypedVectorQueryProcessor::MultiPoint(
×
192
                            EquiGeoToDataJoinProcessor::new(
×
193
                                left_processor,
×
194
                                right_processor,
×
195
                                left_column.clone(),
×
196
                                right_column.clone(),
×
197
                                self.state.column_translation_table.clone(),
×
198
                            )
×
199
                            .boxed(),
×
200
                        )
×
201
                    }
202
                    TypedVectorQueryProcessor::MultiLineString(left_processor) => {
×
203
                        TypedVectorQueryProcessor::MultiLineString(
×
204
                            EquiGeoToDataJoinProcessor::new(
×
205
                                left_processor,
×
206
                                right_processor,
×
207
                                left_column.clone(),
×
208
                                right_column.clone(),
×
209
                                self.state.column_translation_table.clone(),
×
210
                            )
×
211
                            .boxed(),
×
212
                        )
×
213
                    }
214
                    TypedVectorQueryProcessor::MultiPolygon(left_processor) => {
×
215
                        TypedVectorQueryProcessor::MultiPolygon(
×
216
                            EquiGeoToDataJoinProcessor::new(
×
217
                                left_processor,
×
218
                                right_processor,
×
219
                                left_column.clone(),
×
220
                                right_column.clone(),
×
221
                                self.state.column_translation_table.clone(),
×
222
                            )
×
223
                            .boxed(),
×
224
                        )
×
225
                    }
226
                })
227
            }
228
        }
229
    }
×
230

231
    fn result_descriptor(&self) -> &VectorResultDescriptor {
×
232
        &self.result_descriptor
×
233
    }
×
234
}
235

236
#[cfg(test)]
237
mod tests {
238
    use super::*;
239
    use crate::engine::MockExecutionContext;
240
    use crate::mock::MockFeatureCollectionSource;
241
    use geoengine_datatypes::collections::{DataCollection, MultiPointCollection};
242
    use geoengine_datatypes::primitives::{FeatureData, NoGeometry, TimeInterval};
243
    use geoengine_datatypes::util::test::TestDefault;
244

245
    #[test]
1✔
246
    fn params() {
1✔
247
        let params = VectorJoinParams {
1✔
248
            join_type: VectorJoinType::EquiGeoToData {
1✔
249
                left_column: "foo".to_string(),
1✔
250
                right_column: "bar".to_string(),
1✔
251
                right_column_suffix: Some("baz".to_string()),
1✔
252
            },
1✔
253
        };
1✔
254

1✔
255
        let json = serde_json::json!({
1✔
256
            "type": "EquiGeoToData",
1✔
257
            "left_column": "foo",
1✔
258
            "right_column": "bar",
1✔
259
            "right_column_suffix": "baz",
1✔
260
        });
1✔
261

1✔
262
        assert_eq!(json, serde_json::to_value(&params).unwrap());
1✔
263

264
        let params_deserialized: VectorJoinParams = serde_json::from_value(json).unwrap();
1✔
265

1✔
266
        assert_eq!(params, params_deserialized);
1✔
267
    }
1✔
268

269
    #[tokio::test]
1✔
270
    async fn initialization() {
1✔
271
        let operator = VectorJoin {
1✔
272
            params: VectorJoinParams {
1✔
273
                join_type: VectorJoinType::EquiGeoToData {
1✔
274
                    left_column: "left_join_column".to_string(),
1✔
275
                    right_column: "right_join_column".to_string(),
1✔
276
                    right_column_suffix: Some("baz".to_string()),
1✔
277
                },
1✔
278
            },
1✔
279
            sources: VectorJoinSources {
1✔
280
                left: MockFeatureCollectionSource::single(
1✔
281
                    MultiPointCollection::from_slices(
1✔
282
                        &[(0.0, 0.1)],
1✔
283
                        &[TimeInterval::default()],
1✔
284
                        &[("left_join_column", FeatureData::Int(vec![5]))],
1✔
285
                    )
1✔
286
                    .unwrap(),
1✔
287
                )
1✔
288
                .boxed(),
1✔
289
                right: MockFeatureCollectionSource::single(
1✔
290
                    DataCollection::from_slices(
1✔
291
                        &[] as &[NoGeometry],
1✔
292
                        &[TimeInterval::default()],
1✔
293
                        &[("right_join_column", FeatureData::Int(vec![5]))],
1✔
294
                    )
1✔
295
                    .unwrap(),
1✔
296
                )
1✔
297
                .boxed(),
1✔
298
            },
1✔
299
        };
1✔
300

1✔
301
        operator
1✔
302
            .boxed()
1✔
303
            .initialize(&MockExecutionContext::test_default())
1✔
304
            .await
×
305
            .unwrap();
1✔
306
    }
307

308
    #[tokio::test]
1✔
309
    async fn it_checks_columns() {
1✔
310
        let operator = VectorJoin {
1✔
311
            params: VectorJoinParams {
1✔
312
                join_type: VectorJoinType::EquiGeoToData {
1✔
313
                    left_column: "foo".to_string(),
1✔
314
                    right_column: "bar".to_string(),
1✔
315
                    right_column_suffix: Some("baz".to_string()),
1✔
316
                },
1✔
317
            },
1✔
318
            sources: VectorJoinSources {
1✔
319
                left: MockFeatureCollectionSource::single(
1✔
320
                    MultiPointCollection::from_slices(
1✔
321
                        &[(0.0, 0.1)],
1✔
322
                        &[TimeInterval::default()],
1✔
323
                        &[("join_column", FeatureData::Int(vec![5]))],
1✔
324
                    )
1✔
325
                    .unwrap(),
1✔
326
                )
1✔
327
                .boxed(),
1✔
328
                right: MockFeatureCollectionSource::single(
1✔
329
                    DataCollection::from_slices(
1✔
330
                        &[] as &[NoGeometry],
1✔
331
                        &[TimeInterval::default()],
1✔
332
                        &[("join_column", FeatureData::Int(vec![5]))],
1✔
333
                    )
1✔
334
                    .unwrap(),
1✔
335
                )
1✔
336
                .boxed(),
1✔
337
            },
1✔
338
        };
1✔
339

340
        assert!(matches!(
1✔
341
            operator
1✔
342
                .boxed()
1✔
343
                .initialize(&MockExecutionContext::test_default())
1✔
344
                .await,
×
345
            Err(error::Error::ColumnDoesNotExist { column }) if column == "foo"
1✔
346
        ));
347
    }
348
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc