• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 5006008836

pending completion
5006008836

push

github

GitHub
Merge #785 #787

936 of 936 new or added lines in 50 files covered. (100.0%)

96010 of 107707 relevant lines covered (89.14%)

72676.46 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.86
/operators/src/processing/time_projection/mod.rs
1
use std::sync::Arc;
2

3
use crate::engine::{
4
    CanonicOperatorName, ExecutionContext, InitializedSources, InitializedVectorOperator, Operator,
5
    OperatorName, QueryContext, SingleVectorSource, TypedVectorQueryProcessor, VectorOperator,
6
    VectorQueryProcessor, VectorResultDescriptor, WorkflowOperatorPath,
7
};
8
use crate::util::Result;
9
use async_trait::async_trait;
10
use futures::stream::BoxStream;
11
use futures::{StreamExt, TryStreamExt};
12
use geoengine_datatypes::collections::{
13
    FeatureCollection, FeatureCollectionInfos, FeatureCollectionModifications,
14
};
15
use geoengine_datatypes::primitives::{Geometry, TimeInterval};
16
use geoengine_datatypes::primitives::{TimeInstance, TimeStep, VectorQueryRectangle};
17
use geoengine_datatypes::util::arrow::ArrowTyped;
18
use log::debug;
19
use rayon::iter::{IndexedParallelIterator, IntoParallelRefIterator, ParallelIterator};
20
use rayon::ThreadPool;
21
use serde::{Deserialize, Serialize};
22
use snafu::{ensure, ResultExt, Snafu};
23

24
/// Projection of time information in queries and data
25
///
26
/// This operator changes the temporal validity of the queried data.
27
/// In order to query all valid data, it is necessary to change the query rectangle as well.
28
///
29
pub type TimeProjection = Operator<TimeProjectionParams, SingleVectorSource>;
30

31
impl OperatorName for TimeProjection {
32
    const TYPE_NAME: &'static str = "TimeProjection";
33
}
34

35
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
2✔
36
pub struct TimeProjectionParams {
37
    /// Specify the time step granularity and size
38
    step: TimeStep,
39
    /// Define an anchor point for `step`
40
    /// If `None`, the anchor point is `1970-01-01T00:00:00Z` by default
41
    step_reference: Option<TimeInstance>,
42
}
43

44
#[derive(Debug, Snafu)]
×
45
#[snafu(visibility(pub(crate)), context(suffix(false)), module(error))]
46
pub enum TimeProjectionError {
47
    #[snafu(display("Time step must be larger than zero"))]
48
    WindowSizeMustNotBeZero,
49

50
    #[snafu(display("Query rectangle expansion failed: {}", source))]
51
    CannotExpandQueryRectangle {
52
        source: geoengine_datatypes::error::Error,
53
    },
54

55
    #[snafu(display("Feature time interval expansion failed: {}", source))]
56
    CannotExpandFeatureTimeInterval {
57
        source: geoengine_datatypes::error::Error,
58
    },
59
}
60

61
#[typetag::serde]
×
62
#[async_trait]
63
impl VectorOperator for TimeProjection {
64
    async fn _initialize(
2✔
65
        self: Box<Self>,
2✔
66
        path: WorkflowOperatorPath,
2✔
67
        context: &dyn ExecutionContext,
2✔
68
    ) -> Result<Box<dyn InitializedVectorOperator>> {
2✔
69
        ensure!(self.params.step.step > 0, error::WindowSizeMustNotBeZero);
2✔
70

71
        let name = CanonicOperatorName::from(&self);
2✔
72

73
        let initialized_sources = self.sources.initialize_sources(path, context).await?;
2✔
74

75
        debug!("Initializing `TimeProjection` with {:?}.", &self.params);
2✔
76

77
        let step_reference = self
2✔
78
            .params
2✔
79
            .step_reference
2✔
80
            // use UTC 0 as default
2✔
81
            .unwrap_or(TimeInstance::EPOCH_START);
2✔
82

2✔
83
        let mut result_descriptor = initialized_sources.vector.result_descriptor().clone();
2✔
84
        rewrite_result_descriptor(&mut result_descriptor, self.params.step, step_reference)?;
2✔
85

86
        let initialized_operator = InitializedVectorTimeProjection {
2✔
87
            name,
2✔
88
            source: initialized_sources.vector,
2✔
89
            result_descriptor,
2✔
90
            step: self.params.step,
2✔
91
            step_reference,
2✔
92
        };
2✔
93

2✔
94
        Ok(initialized_operator.boxed())
2✔
95
    }
4✔
96

97
    span_fn!(TimeProjection);
×
98
}
99

100
fn rewrite_result_descriptor(
101
    result_descriptor: &mut VectorResultDescriptor,
102
    step: TimeStep,
103
    step_reference: TimeInstance,
104
) -> Result<()> {
105
    if let Some(time) = result_descriptor.time {
3✔
106
        let start = step.snap_relative(step_reference, time.start())?;
1✔
107
        let end = (step.snap_relative(step_reference, time.end())? + step)?;
1✔
108

109
        result_descriptor.time = Some(TimeInterval::new(start, end)?);
1✔
110
    }
2✔
111
    Ok(())
3✔
112
}
3✔
113

114
pub struct InitializedVectorTimeProjection {
115
    name: CanonicOperatorName,
116
    source: Box<dyn InitializedVectorOperator>,
117
    result_descriptor: VectorResultDescriptor,
118
    step: TimeStep,
119
    step_reference: TimeInstance,
120
}
121

122
impl InitializedVectorOperator for InitializedVectorTimeProjection {
123
    fn result_descriptor(&self) -> &VectorResultDescriptor {
×
124
        &self.result_descriptor
×
125
    }
×
126

127
    fn query_processor(&self) -> Result<TypedVectorQueryProcessor> {
2✔
128
        let source_processor = self.source.query_processor()?;
2✔
129

130
        Ok(
131
            call_on_generic_vector_processor!(source_processor, processor => VectorTimeProjectionProcessor {
2✔
132
                processor,
2✔
133
                step: self.step,
2✔
134
                step_reference: self.step_reference,
2✔
135
            }.boxed().into()),
2✔
136
        )
137
    }
2✔
138

139
    fn canonic_name(&self) -> CanonicOperatorName {
×
140
        self.name.clone()
×
141
    }
×
142
}
143

144
pub struct VectorTimeProjectionProcessor<G>
145
where
146
    G: Geometry,
147
{
148
    processor: Box<dyn VectorQueryProcessor<VectorType = FeatureCollection<G>>>,
149
    step: TimeStep,
150
    step_reference: TimeInstance,
151
}
152

153
#[async_trait]
154
impl<G> VectorQueryProcessor for VectorTimeProjectionProcessor<G>
155
where
156
    G: Geometry + ArrowTyped + 'static,
157
{
158
    type VectorType = FeatureCollection<G>;
159

160
    async fn vector_query<'a>(
2✔
161
        &'a self,
2✔
162
        query: VectorQueryRectangle,
2✔
163
        ctx: &'a dyn QueryContext,
2✔
164
    ) -> Result<BoxStream<'a, Result<Self::VectorType>>> {
2✔
165
        let query = self.expand_query_rectangle(query)?;
2✔
166
        let stream = self
2✔
167
            .processor
2✔
168
            .vector_query(query, ctx)
2✔
169
            .await?
×
170
            .and_then(|collection| {
2✔
171
                self.expand_feature_collection_result(collection, ctx.thread_pool().clone())
2✔
172
            })
2✔
173
            .boxed();
2✔
174
        Ok(stream)
2✔
175
    }
4✔
176
}
177

178
impl<G> VectorTimeProjectionProcessor<G>
179
where
180
    G: Geometry + ArrowTyped + 'static,
181
{
182
    fn expand_query_rectangle(&self, query: VectorQueryRectangle) -> Result<VectorQueryRectangle> {
2✔
183
        Ok(expand_query_rectangle(
2✔
184
            self.step,
2✔
185
            self.step_reference,
2✔
186
            query,
2✔
187
        )?)
2✔
188
    }
2✔
189

190
    async fn expand_feature_collection_result(
2✔
191
        &self,
2✔
192
        feature_collection: FeatureCollection<G>,
2✔
193
        thread_pool: Arc<ThreadPool>,
2✔
194
    ) -> Result<FeatureCollection<G>> {
2✔
195
        let step = self.step;
2✔
196
        let step_reference = self.step_reference;
2✔
197

2✔
198
        crate::util::spawn_blocking_with_thread_pool(thread_pool, move || {
2✔
199
            Self::expand_feature_collection_result_inner(feature_collection, step, step_reference)
2✔
200
        })
2✔
201
        .await?
2✔
202
        .map_err(Into::into)
2✔
203
    }
2✔
204

205
    fn expand_feature_collection_result_inner(
2✔
206
        feature_collection: FeatureCollection<G>,
2✔
207
        step: TimeStep,
2✔
208
        step_reference: TimeInstance,
2✔
209
    ) -> Result<FeatureCollection<G>, TimeProjectionError> {
2✔
210
        let time_intervals: Option<Result<Vec<TimeInterval>, TimeProjectionError>> =
2✔
211
            feature_collection
2✔
212
                .time_intervals()
2✔
213
                .par_iter()
2✔
214
                .with_min_len(128) // TODO: find good default
2✔
215
                .map(|time_interval| expand_time_interval(step, step_reference, *time_interval))
6✔
216
                // TODO: change to [`try_collect_into_vec`](https://github.com/rayon-rs/rayon/issues/713) if available
2✔
217
                .try_fold_with(Vec::new(), |mut acc, time_interval| {
2✔
218
                    acc.push(time_interval?);
6✔
219
                    Ok(acc)
6✔
220
                })
6✔
221
                .try_reduce_with(|a, b| Ok([a, b].concat()));
2✔
222

223
        match time_intervals {
2✔
224
            Some(Ok(time_intervals)) => feature_collection
2✔
225
                .replace_time(&time_intervals)
2✔
226
                .context(error::CannotExpandFeatureTimeInterval),
2✔
227
            Some(Err(error)) => Err(error),
×
228
            None => Ok(feature_collection), // was empty
×
229
        }
230
    }
2✔
231
}
232

233
fn expand_query_rectangle(
2✔
234
    step: TimeStep,
2✔
235
    step_reference: TimeInstance,
2✔
236
    query: VectorQueryRectangle,
2✔
237
) -> Result<VectorQueryRectangle, TimeProjectionError> {
2✔
238
    Ok(VectorQueryRectangle {
2✔
239
        spatial_bounds: query.spatial_bounds,
2✔
240
        time_interval: expand_time_interval(step, step_reference, query.time_interval)?,
2✔
241
        spatial_resolution: query.spatial_resolution,
2✔
242
    })
243
}
2✔
244

245
fn expand_time_interval(
15✔
246
    step: TimeStep,
15✔
247
    step_reference: TimeInstance,
15✔
248
    time_interval: TimeInterval,
15✔
249
) -> Result<TimeInterval, TimeProjectionError> {
15✔
250
    let start = step.snap_relative_preserve_bounds(step_reference, time_interval.start());
15✔
251
    let mut end = step.snap_relative_preserve_bounds(step_reference, time_interval.end());
15✔
252

15✔
253
    // Since snap_relative snaps to the "left side", we have to add one step to the end
15✔
254
    // This only applies if `time_interval.end()` is not the snap point itself.
15✔
255
    if end < time_interval.end() {
15✔
256
        end = match end + step {
14✔
257
            Ok(end) => end,
12✔
258
            Err(_) => TimeInstance::MAX, // `TimeInterval::MAX` can overflow
2✔
259
        };
260
    }
1✔
261

262
    TimeInterval::new(start, end).context(error::CannotExpandQueryRectangle)
15✔
263
}
15✔
264

265
#[cfg(test)]
266
mod tests {
267
    use super::*;
268

269
    use crate::{
270
        engine::{MockExecutionContext, MockQueryContext},
271
        mock::MockFeatureCollectionSource,
272
    };
273
    use geoengine_datatypes::{
274
        collections::{MultiPointCollection, VectorDataType},
275
        primitives::{
276
            BoundingBox2D, DateTime, MultiPoint, SpatialResolution, TimeGranularity, TimeInterval,
277
        },
278
        spatial_reference::SpatialReference,
279
        util::test::TestDefault,
280
    };
281

282
    #[test]
1✔
283
    #[allow(clippy::too_many_lines)]
284
    fn test_expand_query_time_interval() {
1✔
285
        fn assert_time_interval_transform<T1: TryInto<TimeInstance>, T2: TryInto<TimeInstance>>(
7✔
286
            t1: T1,
7✔
287
            t2: T1,
7✔
288
            step: TimeStep,
7✔
289
            step_reference: TimeInstance,
7✔
290
            t1_expanded: T2,
7✔
291
            t2_expanded: T2,
7✔
292
        ) where
7✔
293
            T1::Error: std::fmt::Debug,
7✔
294
            T2::Error: std::fmt::Debug,
7✔
295
        {
7✔
296
            let result = expand_time_interval(
7✔
297
                step,
7✔
298
                step_reference,
7✔
299
                TimeInterval::new(t1.try_into().unwrap(), t2.try_into().unwrap()).unwrap(),
7✔
300
            )
7✔
301
            .unwrap();
7✔
302
            let expected = TimeInterval::new(
7✔
303
                t1_expanded.try_into().unwrap(),
7✔
304
                t2_expanded.try_into().unwrap(),
7✔
305
            )
7✔
306
            .unwrap();
7✔
307

7✔
308
            assert_eq!(
7✔
309
                result,
1✔
310
                expected,
1✔
311
                "[{}, {}) != [{}, {})",
1✔
312
                result.start().as_datetime_string(),
×
313
                result.end().as_datetime_string(),
×
314
                expected.start().as_datetime_string(),
×
315
                expected.end().as_datetime_string(),
×
316
            );
1✔
317
        }
7✔
318

1✔
319
        assert_time_interval_transform(
1✔
320
            DateTime::new_utc(2010, 1, 1, 0, 0, 0),
1✔
321
            DateTime::new_utc(2011, 1, 1, 0, 0, 0),
1✔
322
            TimeStep {
1✔
323
                granularity: TimeGranularity::Years,
1✔
324
                step: 1,
1✔
325
            },
1✔
326
            TimeInstance::from_millis(0).unwrap(),
1✔
327
            DateTime::new_utc(2010, 1, 1, 0, 0, 0),
1✔
328
            DateTime::new_utc(2011, 1, 1, 0, 0, 0),
1✔
329
        );
1✔
330

1✔
331
        assert_time_interval_transform(
1✔
332
            DateTime::new_utc(2010, 4, 3, 0, 0, 0),
1✔
333
            DateTime::new_utc(2010, 5, 14, 0, 0, 0),
1✔
334
            TimeStep {
1✔
335
                granularity: TimeGranularity::Years,
1✔
336
                step: 1,
1✔
337
            },
1✔
338
            TimeInstance::from_millis(0).unwrap(),
1✔
339
            DateTime::new_utc(2010, 1, 1, 0, 0, 0),
1✔
340
            DateTime::new_utc(2011, 1, 1, 0, 0, 0),
1✔
341
        );
1✔
342

1✔
343
        assert_time_interval_transform(
1✔
344
            DateTime::new_utc(2009, 4, 3, 0, 0, 0),
1✔
345
            DateTime::new_utc(2010, 5, 14, 0, 0, 0),
1✔
346
            TimeStep {
1✔
347
                granularity: TimeGranularity::Years,
1✔
348
                step: 1,
1✔
349
            },
1✔
350
            TimeInstance::from_millis(0).unwrap(),
1✔
351
            DateTime::new_utc(2009, 1, 1, 0, 0, 0),
1✔
352
            DateTime::new_utc(2011, 1, 1, 0, 0, 0),
1✔
353
        );
1✔
354

1✔
355
        assert_time_interval_transform(
1✔
356
            DateTime::new_utc(2009, 4, 3, 0, 0, 0),
1✔
357
            DateTime::new_utc(2010, 5, 14, 0, 0, 0),
1✔
358
            TimeStep {
1✔
359
                granularity: TimeGranularity::Months,
1✔
360
                step: 6,
1✔
361
            },
1✔
362
            TimeInstance::from(DateTime::new_utc(2010, 3, 1, 0, 0, 0)),
1✔
363
            DateTime::new_utc(2009, 3, 1, 0, 0, 0),
1✔
364
            DateTime::new_utc(2010, 9, 1, 0, 0, 0),
1✔
365
        );
1✔
366

1✔
367
        assert_time_interval_transform(
1✔
368
            DateTime::new_utc(2009, 4, 3, 0, 0, 0),
1✔
369
            DateTime::new_utc(2010, 5, 14, 0, 0, 0),
1✔
370
            TimeStep {
1✔
371
                granularity: TimeGranularity::Months,
1✔
372
                step: 6,
1✔
373
            },
1✔
374
            TimeInstance::from(DateTime::new_utc(2020, 1, 1, 0, 0, 0)),
1✔
375
            DateTime::new_utc(2009, 1, 1, 0, 0, 0),
1✔
376
            DateTime::new_utc(2010, 7, 1, 0, 0, 0),
1✔
377
        );
1✔
378

1✔
379
        assert_time_interval_transform(
1✔
380
            TimeInstance::MIN,
1✔
381
            TimeInstance::MAX,
1✔
382
            TimeStep {
1✔
383
                granularity: TimeGranularity::Months,
1✔
384
                step: 6,
1✔
385
            },
1✔
386
            TimeInstance::from(DateTime::new_utc(2010, 3, 1, 0, 0, 0)),
1✔
387
            TimeInstance::MIN,
1✔
388
            TimeInstance::MAX,
1✔
389
        );
1✔
390

1✔
391
        assert_time_interval_transform(
1✔
392
            TimeInstance::MIN + 1,
1✔
393
            TimeInstance::MAX - 1,
1✔
394
            TimeStep {
1✔
395
                granularity: TimeGranularity::Months,
1✔
396
                step: 6,
1✔
397
            },
1✔
398
            TimeInstance::from(DateTime::new_utc(2010, 3, 1, 0, 0, 0)),
1✔
399
            TimeInstance::MIN,
1✔
400
            TimeInstance::MAX,
1✔
401
        );
1✔
402
    }
1✔
403

404
    #[tokio::test]
1✔
405
    async fn single_year() {
1✔
406
        let execution_context = MockExecutionContext::test_default();
1✔
407
        let query_context = MockQueryContext::test_default();
1✔
408

1✔
409
        let source = MockFeatureCollectionSource::single(
1✔
410
            MultiPointCollection::from_data(
1✔
411
                MultiPoint::many(vec![(0., 0.), (1., 1.), (2., 2.)]).unwrap(),
1✔
412
                vec![
1✔
413
                    TimeInterval::new(
1✔
414
                        DateTime::new_utc(2010, 1, 1, 0, 0, 0),
1✔
415
                        DateTime::new_utc_with_millis(2010, 12, 31, 23, 59, 59, 999),
1✔
416
                    )
1✔
417
                    .unwrap(),
1✔
418
                    TimeInterval::new(
1✔
419
                        DateTime::new_utc(2010, 6, 3, 0, 0, 0),
1✔
420
                        DateTime::new_utc(2010, 7, 14, 0, 0, 0),
1✔
421
                    )
1✔
422
                    .unwrap(),
1✔
423
                    TimeInterval::new(
1✔
424
                        DateTime::new_utc(2010, 1, 1, 0, 0, 0),
1✔
425
                        DateTime::new_utc_with_millis(2010, 3, 31, 23, 59, 59, 999),
1✔
426
                    )
1✔
427
                    .unwrap(),
1✔
428
                ],
1✔
429
                Default::default(),
1✔
430
            )
1✔
431
            .unwrap(),
1✔
432
        );
1✔
433

1✔
434
        let time_projection = TimeProjection {
1✔
435
            sources: SingleVectorSource {
1✔
436
                vector: source.boxed(),
1✔
437
            },
1✔
438
            params: TimeProjectionParams {
1✔
439
                step: TimeStep {
1✔
440
                    granularity: TimeGranularity::Years,
1✔
441
                    step: 1,
1✔
442
                },
1✔
443
                step_reference: None,
1✔
444
            },
1✔
445
        };
1✔
446

447
        let query_processor = time_projection
1✔
448
            .boxed()
1✔
449
            .initialize(WorkflowOperatorPath::initialize_root(), &execution_context)
1✔
450
            .await
×
451
            .unwrap()
1✔
452
            .query_processor()
1✔
453
            .unwrap()
1✔
454
            .multi_point()
1✔
455
            .unwrap();
1✔
456

457
        let mut stream = query_processor
1✔
458
            .vector_query(
1✔
459
                VectorQueryRectangle {
1✔
460
                    spatial_bounds: BoundingBox2D::new((0., 0.).into(), (2., 2.).into()).unwrap(),
1✔
461
                    time_interval: TimeInterval::new(
1✔
462
                        DateTime::new_utc(2010, 4, 3, 0, 0, 0),
1✔
463
                        DateTime::new_utc(2010, 5, 14, 0, 0, 0),
1✔
464
                    )
1✔
465
                    .unwrap(),
1✔
466
                    spatial_resolution: SpatialResolution::one(),
1✔
467
                },
1✔
468
                &query_context,
1✔
469
            )
1✔
470
            .await
×
471
            .unwrap();
1✔
472

1✔
473
        let mut result = Vec::new();
1✔
474
        while let Some(collection) = stream.next().await {
2✔
475
            result.push(collection.unwrap());
1✔
476
        }
1✔
477

478
        assert_eq!(result.len(), 1);
1✔
479

480
        let expected = MultiPointCollection::from_data(
1✔
481
            MultiPoint::many(vec![(0., 0.), (1., 1.), (2., 2.)]).unwrap(),
1✔
482
            vec![
1✔
483
                TimeInterval::new(
1✔
484
                    DateTime::new_utc(2010, 1, 1, 0, 0, 0),
1✔
485
                    DateTime::new_utc(2011, 1, 1, 0, 0, 0),
1✔
486
                )
1✔
487
                .unwrap(),
1✔
488
                TimeInterval::new(
1✔
489
                    DateTime::new_utc(2010, 1, 1, 0, 0, 0),
1✔
490
                    DateTime::new_utc(2011, 1, 1, 0, 0, 0),
1✔
491
                )
1✔
492
                .unwrap(),
1✔
493
                TimeInterval::new(
1✔
494
                    DateTime::new_utc(2010, 1, 1, 0, 0, 0),
1✔
495
                    DateTime::new_utc(2011, 1, 1, 0, 0, 0),
1✔
496
                )
1✔
497
                .unwrap(),
1✔
498
            ],
1✔
499
            Default::default(),
1✔
500
        )
1✔
501
        .unwrap();
1✔
502

1✔
503
        assert_eq!(result[0], expected);
1✔
504
    }
505

506
    #[tokio::test]
1✔
507
    async fn over_a_year() {
1✔
508
        let execution_context = MockExecutionContext::test_default();
1✔
509
        let query_context = MockQueryContext::test_default();
1✔
510

1✔
511
        let source = MockFeatureCollectionSource::single(
1✔
512
            MultiPointCollection::from_data(
1✔
513
                MultiPoint::many(vec![(0., 0.), (1., 1.), (2., 2.)]).unwrap(),
1✔
514
                vec![
1✔
515
                    TimeInterval::new(
1✔
516
                        DateTime::new_utc(2009, 1, 1, 0, 0, 0),
1✔
517
                        DateTime::new_utc_with_millis(2010, 12, 31, 23, 59, 59, 999),
1✔
518
                    )
1✔
519
                    .unwrap(),
1✔
520
                    TimeInterval::new(
1✔
521
                        DateTime::new_utc(2009, 6, 3, 0, 0, 0),
1✔
522
                        DateTime::new_utc(2010, 7, 14, 0, 0, 0),
1✔
523
                    )
1✔
524
                    .unwrap(),
1✔
525
                    TimeInterval::new(
1✔
526
                        DateTime::new_utc(2010, 1, 1, 0, 0, 0),
1✔
527
                        DateTime::new_utc_with_millis(2011, 3, 31, 23, 59, 59, 999),
1✔
528
                    )
1✔
529
                    .unwrap(),
1✔
530
                ],
1✔
531
                Default::default(),
1✔
532
            )
1✔
533
            .unwrap(),
1✔
534
        );
1✔
535

1✔
536
        let time_projection = TimeProjection {
1✔
537
            sources: SingleVectorSource {
1✔
538
                vector: source.boxed(),
1✔
539
            },
1✔
540
            params: TimeProjectionParams {
1✔
541
                step: TimeStep {
1✔
542
                    granularity: TimeGranularity::Years,
1✔
543
                    step: 1,
1✔
544
                },
1✔
545
                step_reference: None,
1✔
546
            },
1✔
547
        };
1✔
548

549
        let query_processor = time_projection
1✔
550
            .boxed()
1✔
551
            .initialize(WorkflowOperatorPath::initialize_root(), &execution_context)
1✔
552
            .await
×
553
            .unwrap()
1✔
554
            .query_processor()
1✔
555
            .unwrap()
1✔
556
            .multi_point()
1✔
557
            .unwrap();
1✔
558

559
        let mut stream = query_processor
1✔
560
            .vector_query(
1✔
561
                VectorQueryRectangle {
1✔
562
                    spatial_bounds: BoundingBox2D::new((0., 0.).into(), (2., 2.).into()).unwrap(),
1✔
563
                    time_interval: TimeInterval::new(
1✔
564
                        DateTime::new_utc(2010, 4, 3, 0, 0, 0),
1✔
565
                        DateTime::new_utc(2010, 5, 14, 0, 0, 0),
1✔
566
                    )
1✔
567
                    .unwrap(),
1✔
568
                    spatial_resolution: SpatialResolution::one(),
1✔
569
                },
1✔
570
                &query_context,
1✔
571
            )
1✔
572
            .await
×
573
            .unwrap();
1✔
574

1✔
575
        let mut result = Vec::new();
1✔
576
        while let Some(collection) = stream.next().await {
2✔
577
            result.push(collection.unwrap());
1✔
578
        }
1✔
579

580
        assert_eq!(result.len(), 1);
1✔
581

582
        let expected = MultiPointCollection::from_data(
1✔
583
            MultiPoint::many(vec![(0., 0.), (1., 1.), (2., 2.)]).unwrap(),
1✔
584
            vec![
1✔
585
                TimeInterval::new(
1✔
586
                    DateTime::new_utc(2009, 1, 1, 0, 0, 0),
1✔
587
                    DateTime::new_utc(2011, 1, 1, 0, 0, 0),
1✔
588
                )
1✔
589
                .unwrap(),
1✔
590
                TimeInterval::new(
1✔
591
                    DateTime::new_utc(2009, 1, 1, 0, 0, 0),
1✔
592
                    DateTime::new_utc(2011, 1, 1, 0, 0, 0),
1✔
593
                )
1✔
594
                .unwrap(),
1✔
595
                TimeInterval::new(
1✔
596
                    DateTime::new_utc(2010, 1, 1, 0, 0, 0),
1✔
597
                    DateTime::new_utc(2012, 1, 1, 0, 0, 0),
1✔
598
                )
1✔
599
                .unwrap(),
1✔
600
            ],
1✔
601
            Default::default(),
1✔
602
        )
1✔
603
        .unwrap();
1✔
604

1✔
605
        assert_eq!(result[0], expected);
1✔
606
    }
607

608
    #[test]
1✔
609
    fn it_rewrites_result_descriptor() {
1✔
610
        let mut result_descriptor = VectorResultDescriptor {
1✔
611
            data_type: VectorDataType::MultiPoint,
1✔
612
            spatial_reference: SpatialReference::epsg_4326().into(),
1✔
613
            columns: Default::default(),
1✔
614
            time: Some(TimeInterval::new_unchecked(30_000, 90_000)),
1✔
615
            bbox: None,
1✔
616
        };
1✔
617

1✔
618
        rewrite_result_descriptor(
1✔
619
            &mut result_descriptor,
1✔
620
            TimeStep {
1✔
621
                granularity: TimeGranularity::Minutes,
1✔
622
                step: 1,
1✔
623
            },
1✔
624
            TimeInstance::from_millis_unchecked(0),
1✔
625
        )
1✔
626
        .unwrap();
1✔
627

1✔
628
        assert_eq!(
1✔
629
            result_descriptor,
1✔
630
            VectorResultDescriptor {
1✔
631
                data_type: VectorDataType::MultiPoint,
1✔
632
                spatial_reference: SpatialReference::epsg_4326().into(),
1✔
633
                columns: Default::default(),
1✔
634
                time: Some(TimeInterval::new_unchecked(0, 120_000)),
1✔
635
                bbox: None,
1✔
636
            }
1✔
637
        );
1✔
638
    }
1✔
639
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc