• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 12469296660

23 Dec 2024 03:15PM UTC coverage: 90.56% (-0.1%) from 90.695%
12469296660

push

github

web-flow
Merge pull request #998 from geo-engine/quota_log_wip

Quota and Data usage Logging

859 of 1214 new or added lines in 66 files covered. (70.76%)

3 existing lines in 2 files now uncovered.

133923 of 147883 relevant lines covered (90.56%)

54439.32 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.66
/operators/src/processing/vector_join/mod.rs
1
use geoengine_datatypes::dataset::NamedData;
2
use serde::{Deserialize, Serialize};
3
use snafu::ensure;
4

5
use geoengine_datatypes::collections::VectorDataType;
6

7
use crate::engine::{
8
    CanonicOperatorName, ExecutionContext, InitializedSources, InitializedVectorOperator, Operator,
9
    OperatorData, OperatorName, TypedVectorQueryProcessor, VectorOperator, VectorQueryProcessor,
10
    VectorResultDescriptor, WorkflowOperatorPath,
11
};
12
use crate::error;
13
use crate::util::Result;
14

15
use self::equi_data_join::EquiGeoToDataJoinProcessor;
16
use crate::processing::vector_join::util::translation_table;
17
use async_trait::async_trait;
18
use std::collections::HashMap;
19

20
mod equi_data_join;
21
mod util;
22

23
/// The vector join operator requires two inputs and the join type.
24
pub type VectorJoin = Operator<VectorJoinParams, VectorJoinSources>;
25

26
impl OperatorName for VectorJoin {
27
    const TYPE_NAME: &'static str = "VectorJoin";
28
}
29

30
/// A set of parameters for the `VectorJoin`
31
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
32
#[serde(rename_all = "camelCase")]
33
pub struct VectorJoinParams {
34
    #[serde(flatten)]
35
    join_type: VectorJoinType,
36
}
37

38
#[derive(Debug, Clone, Serialize, Deserialize)]
39
#[serde(rename_all = "camelCase")]
40
pub struct VectorJoinSources {
41
    left: Box<dyn VectorOperator>,
42
    right: Box<dyn VectorOperator>,
43
}
44

45
impl OperatorData for VectorJoinSources {
46
    fn data_names_collect(&self, data_names: &mut Vec<NamedData>) {
×
47
        self.left.data_names_collect(data_names);
×
48
        self.right.data_names_collect(data_names);
×
49
    }
×
50
}
51

52
pub struct InitializedVectorJoinSources {
53
    left: Box<dyn InitializedVectorOperator>,
54
    right: Box<dyn InitializedVectorOperator>,
55
}
56

57
#[async_trait]
58
impl InitializedSources<InitializedVectorJoinSources> for VectorJoinSources {
59
    async fn initialize_sources(
60
        self,
61
        path: WorkflowOperatorPath,
62
        context: &dyn ExecutionContext,
63
    ) -> Result<InitializedVectorJoinSources> {
2✔
64
        Ok(InitializedVectorJoinSources {
65
            left: self
2✔
66
                .left
2✔
67
                .initialize(path.clone_and_append(0), context)
2✔
68
                .await?,
2✔
69
            right: self
2✔
70
                .right
2✔
71
                .initialize(path.clone_and_append(1), context)
2✔
72
                .await?,
2✔
73
        })
74
    }
4✔
75
}
76

77
/// Define the type of join
78
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
79
#[serde(tag = "type")]
80
pub enum VectorJoinType {
81
    /// An inner equi-join between a `GeoFeatureCollection` and a `DataCollection`
82
    EquiGeoToData {
83
        left_column: String,
84
        right_column: String,
85
        /// which suffix to use if columns have conflicting names?
86
        /// the default is "right"
87
        right_column_suffix: Option<String>,
88
    },
89
}
90

91
#[typetag::serde]
×
92
#[async_trait]
93
impl VectorOperator for VectorJoin {
94
    async fn _initialize(
95
        self: Box<Self>,
96
        path: WorkflowOperatorPath,
97
        context: &dyn ExecutionContext,
98
    ) -> Result<Box<dyn InitializedVectorOperator>> {
2✔
99
        let name = CanonicOperatorName::from(&self);
2✔
100

101
        let initialized_sources = self
2✔
102
            .sources
2✔
103
            .initialize_sources(path.clone(), context)
2✔
104
            .await?;
2✔
105

106
        match &self.params.join_type {
2✔
107
            VectorJoinType::EquiGeoToData {
2✔
108
                left_column,
2✔
109
                right_column,
2✔
110
                right_column_suffix: _,
2✔
111
            } => {
2✔
112
                let left_rd = initialized_sources.left.result_descriptor();
2✔
113
                let right_rd = initialized_sources.right.result_descriptor();
2✔
114

2✔
115
                ensure!(
2✔
116
                    left_rd.columns.contains_key(left_column),
2✔
117
                    error::ColumnDoesNotExist {
1✔
118
                        column: left_column.clone()
1✔
119
                    }
1✔
120
                );
121
                ensure!(
1✔
122
                    right_rd.columns.contains_key(right_column),
1✔
123
                    error::ColumnDoesNotExist {
×
124
                        column: right_column.clone()
×
125
                    }
×
126
                );
127

128
                ensure!(
1✔
129
                    left_rd.data_type != VectorDataType::Data,
1✔
130
                    error::InvalidType {
×
131
                        expected: "a geo data collection".to_string(),
×
132
                        found: initialized_sources
×
133
                            .left
×
134
                            .result_descriptor()
×
135
                            .data_type
×
136
                            .to_string(),
×
137
                    }
×
138
                );
139
                ensure!(
1✔
140
                    right_rd.data_type == VectorDataType::Data,
1✔
141
                    error::InvalidType {
×
142
                        expected: VectorDataType::Data.to_string(),
×
143
                        found: initialized_sources
×
144
                            .right
×
145
                            .result_descriptor()
×
146
                            .data_type
×
147
                            .to_string(),
×
148
                    }
×
149
                );
150
            }
151
        }
152

153
        // TODO: find out if column prefixes are the same for more than one join type and generify
154
        let column_translation_table = match &self.params.join_type {
1✔
155
            VectorJoinType::EquiGeoToData {
1✔
156
                right_column_suffix,
1✔
157
                ..
1✔
158
            } => {
1✔
159
                let right_column_suffix: &str =
1✔
160
                    right_column_suffix.as_ref().map_or("right", String::as_str);
1✔
161
                translation_table(
1✔
162
                    initialized_sources.left.result_descriptor().columns.keys(),
1✔
163
                    initialized_sources.right.result_descriptor().columns.keys(),
1✔
164
                    right_column_suffix,
1✔
165
                )
1✔
166
            }
1✔
167
        };
1✔
168

1✔
169
        let result_descriptor =
1✔
170
            initialized_sources
1✔
171
                .left
1✔
172
                .result_descriptor()
1✔
173
                .map_columns(|left_columns| {
1✔
174
                    let mut columns = left_columns.clone();
1✔
175
                    for (right_column_name, right_column_type) in
1✔
176
                        &initialized_sources.right.result_descriptor().columns
1✔
177
                    {
1✔
178
                        columns.insert(
1✔
179
                            column_translation_table[right_column_name].clone(),
1✔
180
                            right_column_type.clone(),
1✔
181
                        );
1✔
182
                    }
1✔
183
                    columns
1✔
184
                });
1✔
185

1✔
186
        let initialized_operator = InitializedVectorJoin {
1✔
187
            name,
1✔
188
            path,
1✔
189
            result_descriptor,
1✔
190
            left: initialized_sources.left,
1✔
191
            right: initialized_sources.right,
1✔
192
            state: InitializedVectorJoinParams {
1✔
193
                join_type: self.params.join_type.clone(),
1✔
194
                column_translation_table,
1✔
195
            },
1✔
196
        };
1✔
197

1✔
198
        Ok(initialized_operator.boxed())
1✔
199
    }
4✔
200

201
    span_fn!(VectorJoin);
202
}
203

204
/// A set of parameters for the `VectorJoin`
205
#[derive(Debug, Clone, PartialEq, Eq)]
206
pub struct InitializedVectorJoinParams {
207
    join_type: VectorJoinType,
208
    column_translation_table: HashMap<String, String>,
209
}
210

211
pub struct InitializedVectorJoin {
212
    name: CanonicOperatorName,
213
    path: WorkflowOperatorPath,
214
    result_descriptor: VectorResultDescriptor,
215
    left: Box<dyn InitializedVectorOperator>,
216
    right: Box<dyn InitializedVectorOperator>,
217
    state: InitializedVectorJoinParams,
218
}
219

220
impl InitializedVectorOperator for InitializedVectorJoin {
221
    fn query_processor(&self) -> Result<TypedVectorQueryProcessor> {
×
222
        match &self.state.join_type {
×
223
            VectorJoinType::EquiGeoToData {
×
224
                left_column,
×
225
                right_column,
×
226
                right_column_suffix: _right_column_suffix,
×
227
            } => {
228
                let right_processor = self
×
229
                    .right
×
230
                    .query_processor()?
×
231
                    .data()
×
232
                    .expect("checked in constructor");
×
233

234
                let left = self.left.query_processor()?;
×
235

236
                Ok(match left {
×
237
                    TypedVectorQueryProcessor::Data(_) => unreachable!("check in constructor"),
×
238
                    TypedVectorQueryProcessor::MultiPoint(left_processor) => {
×
239
                        TypedVectorQueryProcessor::MultiPoint(
×
240
                            EquiGeoToDataJoinProcessor::new(
×
241
                                self.result_descriptor.clone(),
×
242
                                left_processor,
×
243
                                right_processor,
×
244
                                left_column.clone(),
×
245
                                right_column.clone(),
×
246
                                self.state.column_translation_table.clone(),
×
247
                            )
×
248
                            .boxed(),
×
249
                        )
×
250
                    }
251
                    TypedVectorQueryProcessor::MultiLineString(left_processor) => {
×
252
                        TypedVectorQueryProcessor::MultiLineString(
×
253
                            EquiGeoToDataJoinProcessor::new(
×
254
                                self.result_descriptor.clone(),
×
255
                                left_processor,
×
256
                                right_processor,
×
257
                                left_column.clone(),
×
258
                                right_column.clone(),
×
259
                                self.state.column_translation_table.clone(),
×
260
                            )
×
261
                            .boxed(),
×
262
                        )
×
263
                    }
264
                    TypedVectorQueryProcessor::MultiPolygon(left_processor) => {
×
265
                        TypedVectorQueryProcessor::MultiPolygon(
×
266
                            EquiGeoToDataJoinProcessor::new(
×
267
                                self.result_descriptor.clone(),
×
268
                                left_processor,
×
269
                                right_processor,
×
270
                                left_column.clone(),
×
271
                                right_column.clone(),
×
272
                                self.state.column_translation_table.clone(),
×
273
                            )
×
274
                            .boxed(),
×
275
                        )
×
276
                    }
277
                })
278
            }
279
        }
280
    }
×
281

282
    fn result_descriptor(&self) -> &VectorResultDescriptor {
×
283
        &self.result_descriptor
×
284
    }
×
285

286
    fn canonic_name(&self) -> CanonicOperatorName {
×
287
        self.name.clone()
×
288
    }
×
289

NEW
290
    fn name(&self) -> &'static str {
×
NEW
291
        VectorJoin::TYPE_NAME
×
NEW
292
    }
×
293

NEW
294
    fn path(&self) -> WorkflowOperatorPath {
×
NEW
295
        self.path.clone()
×
NEW
296
    }
×
297
}
298

299
#[cfg(test)]
300
mod tests {
301
    use super::*;
302
    use crate::engine::MockExecutionContext;
303
    use crate::mock::MockFeatureCollectionSource;
304
    use geoengine_datatypes::collections::{DataCollection, MultiPointCollection};
305
    use geoengine_datatypes::primitives::{FeatureData, NoGeometry, TimeInterval};
306
    use geoengine_datatypes::util::test::TestDefault;
307

308
    #[test]
309
    fn params() {
1✔
310
        let params = VectorJoinParams {
1✔
311
            join_type: VectorJoinType::EquiGeoToData {
1✔
312
                left_column: "foo".to_string(),
1✔
313
                right_column: "bar".to_string(),
1✔
314
                right_column_suffix: Some("baz".to_string()),
1✔
315
            },
1✔
316
        };
1✔
317

1✔
318
        let json = serde_json::json!({
1✔
319
            "type": "EquiGeoToData",
1✔
320
            "left_column": "foo",
1✔
321
            "right_column": "bar",
1✔
322
            "right_column_suffix": "baz",
1✔
323
        });
1✔
324

1✔
325
        assert_eq!(json, serde_json::to_value(&params).unwrap());
1✔
326

327
        let params_deserialized: VectorJoinParams = serde_json::from_value(json).unwrap();
1✔
328

1✔
329
        assert_eq!(params, params_deserialized);
1✔
330
    }
1✔
331

332
    #[tokio::test]
333
    async fn initialization() {
1✔
334
        let operator = VectorJoin {
1✔
335
            params: VectorJoinParams {
1✔
336
                join_type: VectorJoinType::EquiGeoToData {
1✔
337
                    left_column: "left_join_column".to_string(),
1✔
338
                    right_column: "right_join_column".to_string(),
1✔
339
                    right_column_suffix: Some("baz".to_string()),
1✔
340
                },
1✔
341
            },
1✔
342
            sources: VectorJoinSources {
1✔
343
                left: MockFeatureCollectionSource::single(
1✔
344
                    MultiPointCollection::from_slices(
1✔
345
                        &[(0.0, 0.1)],
1✔
346
                        &[TimeInterval::default()],
1✔
347
                        &[("left_join_column", FeatureData::Int(vec![5]))],
1✔
348
                    )
1✔
349
                    .unwrap(),
1✔
350
                )
1✔
351
                .boxed(),
1✔
352
                right: MockFeatureCollectionSource::single(
1✔
353
                    DataCollection::from_slices(
1✔
354
                        &[] as &[NoGeometry],
1✔
355
                        &[TimeInterval::default()],
1✔
356
                        &[("right_join_column", FeatureData::Int(vec![5]))],
1✔
357
                    )
1✔
358
                    .unwrap(),
1✔
359
                )
1✔
360
                .boxed(),
1✔
361
            },
1✔
362
        };
1✔
363

1✔
364
        operator
1✔
365
            .boxed()
1✔
366
            .initialize(
1✔
367
                WorkflowOperatorPath::initialize_root(),
1✔
368
                &MockExecutionContext::test_default(),
1✔
369
            )
1✔
370
            .await
1✔
371
            .unwrap();
1✔
372
    }
1✔
373

374
    #[tokio::test]
375
    async fn it_checks_columns() {
1✔
376
        let operator = VectorJoin {
1✔
377
            params: VectorJoinParams {
1✔
378
                join_type: VectorJoinType::EquiGeoToData {
1✔
379
                    left_column: "foo".to_string(),
1✔
380
                    right_column: "bar".to_string(),
1✔
381
                    right_column_suffix: Some("baz".to_string()),
1✔
382
                },
1✔
383
            },
1✔
384
            sources: VectorJoinSources {
1✔
385
                left: MockFeatureCollectionSource::single(
1✔
386
                    MultiPointCollection::from_slices(
1✔
387
                        &[(0.0, 0.1)],
1✔
388
                        &[TimeInterval::default()],
1✔
389
                        &[("join_column", FeatureData::Int(vec![5]))],
1✔
390
                    )
1✔
391
                    .unwrap(),
1✔
392
                )
1✔
393
                .boxed(),
1✔
394
                right: MockFeatureCollectionSource::single(
1✔
395
                    DataCollection::from_slices(
1✔
396
                        &[] as &[NoGeometry],
1✔
397
                        &[TimeInterval::default()],
1✔
398
                        &[("join_column", FeatureData::Int(vec![5]))],
1✔
399
                    )
1✔
400
                    .unwrap(),
1✔
401
                )
1✔
402
                .boxed(),
1✔
403
            },
1✔
404
        };
1✔
405

1✔
406
        assert!(matches!(
1✔
407
            operator
1✔
408
                .boxed()
1✔
409
                .initialize(WorkflowOperatorPath::initialize_root(), &MockExecutionContext::test_default())
1✔
410
                .await,
1✔
411
            Err(error::Error::ColumnDoesNotExist { column }) if column == "foo"
1✔
412
        ));
1✔
413
    }
1✔
414
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc