• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 19148930607

06 Nov 2025 08:28PM UTC coverage: 88.827%. First build
19148930607

Pull #1083

github

web-flow
Merge 287fd2af8 into 113de40ca
Pull Request #1083: feat: new gdal source workflow optimization

6168 of 7008 new or added lines in 71 files covered. (88.01%)

116144 of 130753 relevant lines covered (88.83%)

496539.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.86
/operators/src/processing/time_projection/mod.rs
1
use std::sync::Arc;
2

3
use crate::engine::{
4
    CanonicOperatorName, ExecutionContext, InitializedSources, InitializedVectorOperator, Operator,
5
    OperatorName, QueryContext, SingleVectorSource, TypedVectorQueryProcessor, VectorOperator,
6
    VectorQueryProcessor, VectorResultDescriptor, WorkflowOperatorPath,
7
};
8
use crate::optimization::OptimizationError;
9
use crate::util::Result;
10
use async_trait::async_trait;
11
use futures::stream::BoxStream;
12
use futures::{StreamExt, TryStreamExt};
13
use geoengine_datatypes::collections::{
14
    FeatureCollection, FeatureCollectionInfos, FeatureCollectionModifications,
15
};
16
use geoengine_datatypes::primitives::{ColumnSelection, Geometry, SpatialResolution, TimeInterval};
17
use geoengine_datatypes::primitives::{TimeInstance, TimeStep, VectorQueryRectangle};
18
use geoengine_datatypes::util::arrow::ArrowTyped;
19
use rayon::ThreadPool;
20
use rayon::iter::{IndexedParallelIterator, IntoParallelRefIterator, ParallelIterator};
21
use serde::{Deserialize, Serialize};
22
use snafu::{ResultExt, Snafu, ensure};
23
use tracing::debug;
24

25
/// Projection of time information in queries and data
26
///
27
/// This operator changes the temporal validity of the queried data.
28
/// In order to query all valid data, it is necessary to change the query rectangle as well.
29
///
30
pub type TimeProjection = Operator<TimeProjectionParams, SingleVectorSource>;
31

32
impl OperatorName for TimeProjection {
33
    const TYPE_NAME: &'static str = "TimeProjection";
34
}
35

36
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
37
pub struct TimeProjectionParams {
38
    /// Specify the time step granularity and size
39
    step: TimeStep,
40
    /// Define an anchor point for `step`
41
    /// If `None`, the anchor point is `1970-01-01T00:00:00Z` by default
42
    step_reference: Option<TimeInstance>,
43
}
44

45
#[derive(Debug, Snafu)]
46
#[snafu(visibility(pub(crate)), context(suffix(false)), module(error))]
47
pub enum TimeProjectionError {
48
    #[snafu(display("Time step must be larger than zero"))]
49
    WindowSizeMustNotBeZero,
50

51
    #[snafu(display("Query rectangle expansion failed: {}", source))]
52
    CannotExpandQueryRectangle {
53
        source: geoengine_datatypes::error::Error,
54
    },
55

56
    #[snafu(display("Feature time interval expansion failed: {}", source))]
57
    CannotExpandFeatureTimeInterval {
58
        source: geoengine_datatypes::error::Error,
59
    },
60
}
61

62
#[typetag::serde]
×
63
#[async_trait]
64
impl VectorOperator for TimeProjection {
65
    async fn _initialize(
66
        self: Box<Self>,
67
        path: WorkflowOperatorPath,
68
        context: &dyn ExecutionContext,
69
    ) -> Result<Box<dyn InitializedVectorOperator>> {
2✔
70
        ensure!(self.params.step.step > 0, error::WindowSizeMustNotBeZero);
71

72
        let name = CanonicOperatorName::from(&self);
73

74
        let initialized_sources = self
75
            .sources
76
            .initialize_sources(path.clone(), context)
77
            .await?;
78

79
        debug!("Initializing `TimeProjection` with {:?}.", &self.params);
80

81
        let step_reference = self
82
            .params
83
            .step_reference
84
            // use UTC 0 as default
85
            .unwrap_or(TimeInstance::EPOCH_START);
86

87
        let mut result_descriptor = initialized_sources.vector.result_descriptor().clone();
88
        rewrite_result_descriptor(&mut result_descriptor, self.params.step, step_reference)?;
89

90
        let initialized_operator = InitializedVectorTimeProjection {
91
            name,
92
            path,
93
            source: initialized_sources.vector,
94
            result_descriptor,
95
            step: self.params.step,
96
            step_reference,
97
        };
98

99
        Ok(initialized_operator.boxed())
100
    }
2✔
101

102
    span_fn!(TimeProjection);
103
}
104

105
fn rewrite_result_descriptor(
3✔
106
    result_descriptor: &mut VectorResultDescriptor,
3✔
107
    step: TimeStep,
3✔
108
    step_reference: TimeInstance,
3✔
109
) -> Result<()> {
3✔
110
    if let Some(time) = result_descriptor.time {
3✔
111
        let start = step.snap_relative(step_reference, time.start())?;
1✔
112
        let end = (step.snap_relative(step_reference, time.end())? + step)?;
1✔
113

114
        result_descriptor.time = Some(TimeInterval::new(start, end)?);
1✔
115
    }
2✔
116
    Ok(())
3✔
117
}
3✔
118

119
pub struct InitializedVectorTimeProjection {
120
    name: CanonicOperatorName,
121
    path: WorkflowOperatorPath,
122
    source: Box<dyn InitializedVectorOperator>,
123
    result_descriptor: VectorResultDescriptor,
124
    step: TimeStep,
125
    step_reference: TimeInstance,
126
}
127

128
impl InitializedVectorOperator for InitializedVectorTimeProjection {
129
    fn result_descriptor(&self) -> &VectorResultDescriptor {
×
130
        &self.result_descriptor
×
131
    }
×
132

133
    fn query_processor(&self) -> Result<TypedVectorQueryProcessor> {
2✔
134
        let source_processor = self.source.query_processor()?;
2✔
135

136
        Ok(
137
            call_on_generic_vector_processor!(source_processor, processor => VectorTimeProjectionProcessor {
2✔
138
                processor,
×
139
                result_descriptor: self.result_descriptor.clone(),
×
140
                step: self.step,
×
141
                step_reference: self.step_reference,
×
142
            }.boxed().into()),
×
143
        )
144
    }
2✔
145

146
    fn canonic_name(&self) -> CanonicOperatorName {
×
147
        self.name.clone()
×
148
    }
×
149

150
    fn name(&self) -> &'static str {
×
151
        TimeProjection::TYPE_NAME
×
152
    }
×
153

154
    fn path(&self) -> WorkflowOperatorPath {
×
155
        self.path.clone()
×
156
    }
×
157

NEW
158
    fn optimize(
×
NEW
159
        &self,
×
NEW
160
        target_resolution: SpatialResolution,
×
NEW
161
    ) -> Result<Box<dyn VectorOperator>, OptimizationError> {
×
162
        Ok(TimeProjection {
NEW
163
            params: TimeProjectionParams {
×
NEW
164
                step: self.step,
×
NEW
165
                step_reference: Some(self.step_reference),
×
NEW
166
            },
×
167
            sources: SingleVectorSource {
NEW
168
                vector: self.source.optimize(target_resolution)?,
×
169
            },
170
        }
NEW
171
        .boxed())
×
NEW
172
    }
×
173
}
174

175
pub struct VectorTimeProjectionProcessor<G>
176
where
177
    G: Geometry,
178
{
179
    processor: Box<dyn VectorQueryProcessor<VectorType = FeatureCollection<G>>>,
180
    result_descriptor: VectorResultDescriptor,
181
    step: TimeStep,
182
    step_reference: TimeInstance,
183
}
184

185
#[async_trait]
186
impl<G> VectorQueryProcessor for VectorTimeProjectionProcessor<G>
187
where
188
    G: Geometry + ArrowTyped + 'static,
189
{
190
    type VectorType = FeatureCollection<G>;
191

192
    async fn vector_query<'a>(
193
        &'a self,
194
        query: VectorQueryRectangle,
195
        ctx: &'a dyn QueryContext,
196
    ) -> Result<BoxStream<'a, Result<Self::VectorType>>> {
2✔
197
        let query = self.expand_query_rectangle(&query)?;
198
        let stream = self
199
            .processor
200
            .vector_query(query, ctx)
201
            .await?
202
            .and_then(|collection| {
2✔
203
                self.expand_feature_collection_result(collection, ctx.thread_pool().clone())
2✔
204
            })
2✔
205
            .boxed();
206
        Ok(stream)
207
    }
2✔
208

209
    fn vector_result_descriptor(&self) -> &VectorResultDescriptor {
2✔
210
        &self.result_descriptor
2✔
211
    }
2✔
212
}
213

214
impl<G> VectorTimeProjectionProcessor<G>
215
where
216
    G: Geometry + ArrowTyped + 'static,
217
{
218
    fn expand_query_rectangle(&self, query: &VectorQueryRectangle) -> Result<VectorQueryRectangle> {
2✔
219
        Ok(expand_query_rectangle(
2✔
220
            self.step,
2✔
221
            self.step_reference,
2✔
222
            query,
2✔
223
        )?)
×
224
    }
2✔
225

226
    async fn expand_feature_collection_result(
2✔
227
        &self,
2✔
228
        feature_collection: FeatureCollection<G>,
2✔
229
        thread_pool: Arc<ThreadPool>,
2✔
230
    ) -> Result<FeatureCollection<G>> {
2✔
231
        let step = self.step;
2✔
232
        let step_reference = self.step_reference;
2✔
233

234
        crate::util::spawn_blocking_with_thread_pool(thread_pool, move || {
2✔
235
            Self::expand_feature_collection_result_inner(feature_collection, step, step_reference)
2✔
236
        })
2✔
237
        .await?
2✔
238
        .map_err(Into::into)
2✔
239
    }
2✔
240

241
    fn expand_feature_collection_result_inner(
2✔
242
        feature_collection: FeatureCollection<G>,
2✔
243
        step: TimeStep,
2✔
244
        step_reference: TimeInstance,
2✔
245
    ) -> Result<FeatureCollection<G>, TimeProjectionError> {
2✔
246
        let time_intervals: Option<Result<Vec<TimeInterval>, TimeProjectionError>> =
2✔
247
            feature_collection
2✔
248
                .time_intervals()
2✔
249
                .par_iter()
2✔
250
                .with_min_len(128) // TODO: find good default
2✔
251
                .map(|time_interval| expand_time_interval(step, step_reference, *time_interval))
6✔
252
                // TODO: change to [`try_collect_into_vec`](https://github.com/rayon-rs/rayon/issues/713) if available
253
                .try_fold_with(Vec::new(), |mut acc, time_interval| {
6✔
254
                    acc.push(time_interval?);
6✔
255
                    Ok(acc)
6✔
256
                })
6✔
257
                .try_reduce_with(|a, b| Ok([a, b].concat()));
2✔
258

259
        match time_intervals {
2✔
260
            Some(Ok(time_intervals)) => feature_collection
2✔
261
                .replace_time(&time_intervals)
2✔
262
                .context(error::CannotExpandFeatureTimeInterval),
2✔
263
            Some(Err(error)) => Err(error),
×
264
            None => Ok(feature_collection), // was empty
×
265
        }
266
    }
2✔
267
}
268

269
fn expand_query_rectangle(
2✔
270
    step: TimeStep,
2✔
271
    step_reference: TimeInstance,
2✔
272
    query: &VectorQueryRectangle,
2✔
273
) -> Result<VectorQueryRectangle, TimeProjectionError> {
2✔
274
    Ok(VectorQueryRectangle::new(
2✔
275
        query.spatial_bounds(),
2✔
276
        expand_time_interval(step, step_reference, query.time_interval())?,
2✔
277
        ColumnSelection::all(),
2✔
278
    ))
279
}
2✔
280

281
fn expand_time_interval(
15✔
282
    step: TimeStep,
15✔
283
    step_reference: TimeInstance,
15✔
284
    time_interval: TimeInterval,
15✔
285
) -> Result<TimeInterval, TimeProjectionError> {
15✔
286
    let start = step.snap_relative_preserve_bounds(step_reference, time_interval.start());
15✔
287
    let mut end = step.snap_relative_preserve_bounds(step_reference, time_interval.end());
15✔
288

289
    // Since snap_relative snaps to the "left side", we have to add one step to the end
290
    // This only applies if `time_interval.end()` is not the snap point itself.
291
    if end < time_interval.end() {
15✔
292
        end = match end + step {
14✔
293
            Ok(end) => end,
12✔
294
            Err(_) => TimeInstance::MAX, // `TimeInterval::MAX` can overflow
2✔
295
        };
296
    }
1✔
297

298
    TimeInterval::new(start, end).context(error::CannotExpandQueryRectangle)
15✔
299
}
15✔
300

301
#[cfg(test)]
302
mod tests {
303
    use super::*;
304

305
    use crate::{engine::MockExecutionContext, mock::MockFeatureCollectionSource};
306
    use geoengine_datatypes::{
307
        collections::{ChunksEqualIgnoringCacheHint, MultiPointCollection, VectorDataType},
308
        primitives::{
309
            BoundingBox2D, CacheHint, DateTime, MultiPoint, TimeGranularity, TimeInterval,
310
        },
311
        spatial_reference::SpatialReference,
312
        util::test::TestDefault,
313
    };
314

315
    #[test]
316
    #[allow(clippy::too_many_lines)]
317
    fn test_expand_query_time_interval() {
1✔
318
        fn assert_time_interval_transform<T1: TryInto<TimeInstance>, T2: TryInto<TimeInstance>>(
7✔
319
            t1: T1,
7✔
320
            t2: T1,
7✔
321
            step: TimeStep,
7✔
322
            step_reference: TimeInstance,
7✔
323
            t1_expanded: T2,
7✔
324
            t2_expanded: T2,
7✔
325
        ) where
7✔
326
            T1::Error: std::fmt::Debug,
7✔
327
            T2::Error: std::fmt::Debug,
7✔
328
        {
329
            let result = expand_time_interval(
7✔
330
                step,
7✔
331
                step_reference,
7✔
332
                TimeInterval::new(t1.try_into().unwrap(), t2.try_into().unwrap()).unwrap(),
7✔
333
            )
334
            .unwrap();
7✔
335
            let expected = TimeInterval::new(
7✔
336
                t1_expanded.try_into().unwrap(),
7✔
337
                t2_expanded.try_into().unwrap(),
7✔
338
            )
339
            .unwrap();
7✔
340

341
            assert_eq!(
7✔
342
                result,
343
                expected,
344
                "[{}, {}) != [{}, {})",
×
345
                result.start().as_datetime_string(),
×
346
                result.end().as_datetime_string(),
×
347
                expected.start().as_datetime_string(),
×
348
                expected.end().as_datetime_string(),
×
349
            );
350
        }
7✔
351

352
        assert_time_interval_transform(
1✔
353
            DateTime::new_utc(2010, 1, 1, 0, 0, 0),
1✔
354
            DateTime::new_utc(2011, 1, 1, 0, 0, 0),
1✔
355
            TimeStep {
1✔
356
                granularity: TimeGranularity::Years,
1✔
357
                step: 1,
1✔
358
            },
1✔
359
            TimeInstance::from_millis(0).unwrap(),
1✔
360
            DateTime::new_utc(2010, 1, 1, 0, 0, 0),
1✔
361
            DateTime::new_utc(2011, 1, 1, 0, 0, 0),
1✔
362
        );
363

364
        assert_time_interval_transform(
1✔
365
            DateTime::new_utc(2010, 4, 3, 0, 0, 0),
1✔
366
            DateTime::new_utc(2010, 5, 14, 0, 0, 0),
1✔
367
            TimeStep {
1✔
368
                granularity: TimeGranularity::Years,
1✔
369
                step: 1,
1✔
370
            },
1✔
371
            TimeInstance::from_millis(0).unwrap(),
1✔
372
            DateTime::new_utc(2010, 1, 1, 0, 0, 0),
1✔
373
            DateTime::new_utc(2011, 1, 1, 0, 0, 0),
1✔
374
        );
375

376
        assert_time_interval_transform(
1✔
377
            DateTime::new_utc(2009, 4, 3, 0, 0, 0),
1✔
378
            DateTime::new_utc(2010, 5, 14, 0, 0, 0),
1✔
379
            TimeStep {
1✔
380
                granularity: TimeGranularity::Years,
1✔
381
                step: 1,
1✔
382
            },
1✔
383
            TimeInstance::from_millis(0).unwrap(),
1✔
384
            DateTime::new_utc(2009, 1, 1, 0, 0, 0),
1✔
385
            DateTime::new_utc(2011, 1, 1, 0, 0, 0),
1✔
386
        );
387

388
        assert_time_interval_transform(
1✔
389
            DateTime::new_utc(2009, 4, 3, 0, 0, 0),
1✔
390
            DateTime::new_utc(2010, 5, 14, 0, 0, 0),
1✔
391
            TimeStep {
1✔
392
                granularity: TimeGranularity::Months,
1✔
393
                step: 6,
1✔
394
            },
1✔
395
            TimeInstance::from(DateTime::new_utc(2010, 3, 1, 0, 0, 0)),
1✔
396
            DateTime::new_utc(2009, 3, 1, 0, 0, 0),
1✔
397
            DateTime::new_utc(2010, 9, 1, 0, 0, 0),
1✔
398
        );
399

400
        assert_time_interval_transform(
1✔
401
            DateTime::new_utc(2009, 4, 3, 0, 0, 0),
1✔
402
            DateTime::new_utc(2010, 5, 14, 0, 0, 0),
1✔
403
            TimeStep {
1✔
404
                granularity: TimeGranularity::Months,
1✔
405
                step: 6,
1✔
406
            },
1✔
407
            TimeInstance::from(DateTime::new_utc(2020, 1, 1, 0, 0, 0)),
1✔
408
            DateTime::new_utc(2009, 1, 1, 0, 0, 0),
1✔
409
            DateTime::new_utc(2010, 7, 1, 0, 0, 0),
1✔
410
        );
411

412
        assert_time_interval_transform(
1✔
413
            TimeInstance::MIN,
414
            TimeInstance::MAX,
415
            TimeStep {
1✔
416
                granularity: TimeGranularity::Months,
1✔
417
                step: 6,
1✔
418
            },
1✔
419
            TimeInstance::from(DateTime::new_utc(2010, 3, 1, 0, 0, 0)),
1✔
420
            TimeInstance::MIN,
421
            TimeInstance::MAX,
422
        );
423

424
        assert_time_interval_transform(
1✔
425
            TimeInstance::MIN + 1,
1✔
426
            TimeInstance::MAX - 1,
1✔
427
            TimeStep {
1✔
428
                granularity: TimeGranularity::Months,
1✔
429
                step: 6,
1✔
430
            },
1✔
431
            TimeInstance::from(DateTime::new_utc(2010, 3, 1, 0, 0, 0)),
1✔
432
            TimeInstance::MIN,
433
            TimeInstance::MAX,
434
        );
435
    }
1✔
436

437
    #[tokio::test]
438
    async fn single_year() {
1✔
439
        let execution_context = MockExecutionContext::test_default();
1✔
440
        let query_context = execution_context.mock_query_context_test_default();
1✔
441

442
        let source = MockFeatureCollectionSource::single(
1✔
443
            MultiPointCollection::from_data(
1✔
444
                MultiPoint::many(vec![(0., 0.), (1., 1.), (2., 2.)]).unwrap(),
1✔
445
                vec![
1✔
446
                    TimeInterval::new(
1✔
447
                        DateTime::new_utc(2010, 1, 1, 0, 0, 0),
1✔
448
                        DateTime::new_utc_with_millis(2010, 12, 31, 23, 59, 59, 999),
1✔
449
                    )
450
                    .unwrap(),
1✔
451
                    TimeInterval::new(
1✔
452
                        DateTime::new_utc(2010, 6, 3, 0, 0, 0),
1✔
453
                        DateTime::new_utc(2010, 7, 14, 0, 0, 0),
1✔
454
                    )
455
                    .unwrap(),
1✔
456
                    TimeInterval::new(
1✔
457
                        DateTime::new_utc(2010, 1, 1, 0, 0, 0),
1✔
458
                        DateTime::new_utc_with_millis(2010, 3, 31, 23, 59, 59, 999),
1✔
459
                    )
460
                    .unwrap(),
1✔
461
                ],
462
                Default::default(),
1✔
463
                CacheHint::default(),
1✔
464
            )
465
            .unwrap(),
1✔
466
        );
467

468
        let time_projection = TimeProjection {
1✔
469
            sources: SingleVectorSource {
1✔
470
                vector: source.boxed(),
1✔
471
            },
1✔
472
            params: TimeProjectionParams {
1✔
473
                step: TimeStep {
1✔
474
                    granularity: TimeGranularity::Years,
1✔
475
                    step: 1,
1✔
476
                },
1✔
477
                step_reference: None,
1✔
478
            },
1✔
479
        };
1✔
480

481
        let query_processor = time_projection
1✔
482
            .boxed()
1✔
483
            .initialize(WorkflowOperatorPath::initialize_root(), &execution_context)
1✔
484
            .await
1✔
485
            .unwrap()
1✔
486
            .query_processor()
1✔
487
            .unwrap()
1✔
488
            .multi_point()
1✔
489
            .unwrap();
1✔
490

491
        let mut stream = query_processor
1✔
492
            .vector_query(
1✔
493
                VectorQueryRectangle::new(
1✔
494
                    BoundingBox2D::new((0., 0.).into(), (2., 2.).into()).unwrap(),
1✔
495
                    TimeInterval::new(
1✔
496
                        DateTime::new_utc(2010, 4, 3, 0, 0, 0),
1✔
497
                        DateTime::new_utc(2010, 5, 14, 0, 0, 0),
1✔
498
                    )
1✔
499
                    .unwrap(),
1✔
500
                    ColumnSelection::all(),
1✔
501
                ),
1✔
502
                &query_context,
1✔
503
            )
1✔
504
            .await
1✔
505
            .unwrap();
1✔
506

507
        let mut result = Vec::new();
1✔
508
        while let Some(collection) = stream.next().await {
2✔
509
            result.push(collection.unwrap());
1✔
510
        }
1✔
511

512
        assert_eq!(result.len(), 1);
1✔
513

514
        let expected = MultiPointCollection::from_data(
1✔
515
            MultiPoint::many(vec![(0., 0.), (1., 1.), (2., 2.)]).unwrap(),
1✔
516
            vec![
1✔
517
                TimeInterval::new(
1✔
518
                    DateTime::new_utc(2010, 1, 1, 0, 0, 0),
1✔
519
                    DateTime::new_utc(2011, 1, 1, 0, 0, 0),
1✔
520
                )
521
                .unwrap(),
1✔
522
                TimeInterval::new(
1✔
523
                    DateTime::new_utc(2010, 1, 1, 0, 0, 0),
1✔
524
                    DateTime::new_utc(2011, 1, 1, 0, 0, 0),
1✔
525
                )
526
                .unwrap(),
1✔
527
                TimeInterval::new(
1✔
528
                    DateTime::new_utc(2010, 1, 1, 0, 0, 0),
1✔
529
                    DateTime::new_utc(2011, 1, 1, 0, 0, 0),
1✔
530
                )
531
                .unwrap(),
1✔
532
            ],
533
            Default::default(),
1✔
534
            CacheHint::default(),
1✔
535
        )
536
        .unwrap();
1✔
537

538
        assert!(result[0].chunks_equal_ignoring_cache_hint(&expected));
1✔
539
    }
1✔
540

541
    #[tokio::test]
542
    async fn over_a_year() {
1✔
543
        let execution_context = MockExecutionContext::test_default();
1✔
544
        let query_context = execution_context.mock_query_context_test_default();
1✔
545

546
        let source = MockFeatureCollectionSource::single(
1✔
547
            MultiPointCollection::from_data(
1✔
548
                MultiPoint::many(vec![(0., 0.), (1., 1.), (2., 2.)]).unwrap(),
1✔
549
                vec![
1✔
550
                    TimeInterval::new(
1✔
551
                        DateTime::new_utc(2009, 1, 1, 0, 0, 0),
1✔
552
                        DateTime::new_utc_with_millis(2010, 12, 31, 23, 59, 59, 999),
1✔
553
                    )
554
                    .unwrap(),
1✔
555
                    TimeInterval::new(
1✔
556
                        DateTime::new_utc(2009, 6, 3, 0, 0, 0),
1✔
557
                        DateTime::new_utc(2010, 7, 14, 0, 0, 0),
1✔
558
                    )
559
                    .unwrap(),
1✔
560
                    TimeInterval::new(
1✔
561
                        DateTime::new_utc(2010, 1, 1, 0, 0, 0),
1✔
562
                        DateTime::new_utc_with_millis(2011, 3, 31, 23, 59, 59, 999),
1✔
563
                    )
564
                    .unwrap(),
1✔
565
                ],
566
                Default::default(),
1✔
567
                CacheHint::default(),
1✔
568
            )
569
            .unwrap(),
1✔
570
        );
571

572
        let time_projection = TimeProjection {
1✔
573
            sources: SingleVectorSource {
1✔
574
                vector: source.boxed(),
1✔
575
            },
1✔
576
            params: TimeProjectionParams {
1✔
577
                step: TimeStep {
1✔
578
                    granularity: TimeGranularity::Years,
1✔
579
                    step: 1,
1✔
580
                },
1✔
581
                step_reference: None,
1✔
582
            },
1✔
583
        };
1✔
584

585
        let query_processor = time_projection
1✔
586
            .boxed()
1✔
587
            .initialize(WorkflowOperatorPath::initialize_root(), &execution_context)
1✔
588
            .await
1✔
589
            .unwrap()
1✔
590
            .query_processor()
1✔
591
            .unwrap()
1✔
592
            .multi_point()
1✔
593
            .unwrap();
1✔
594

595
        let mut stream = query_processor
1✔
596
            .vector_query(
1✔
597
                VectorQueryRectangle::new(
1✔
598
                    BoundingBox2D::new((0., 0.).into(), (2., 2.).into()).unwrap(),
1✔
599
                    TimeInterval::new(
1✔
600
                        DateTime::new_utc(2010, 4, 3, 0, 0, 0),
1✔
601
                        DateTime::new_utc(2010, 5, 14, 0, 0, 0),
1✔
602
                    )
1✔
603
                    .unwrap(),
1✔
604
                    ColumnSelection::all(),
1✔
605
                ),
1✔
606
                &query_context,
1✔
607
            )
1✔
608
            .await
1✔
609
            .unwrap();
1✔
610

611
        let mut result = Vec::new();
1✔
612
        while let Some(collection) = stream.next().await {
2✔
613
            result.push(collection.unwrap());
1✔
614
        }
1✔
615

616
        assert_eq!(result.len(), 1);
1✔
617

618
        let expected = MultiPointCollection::from_data(
1✔
619
            MultiPoint::many(vec![(0., 0.), (1., 1.), (2., 2.)]).unwrap(),
1✔
620
            vec![
1✔
621
                TimeInterval::new(
1✔
622
                    DateTime::new_utc(2009, 1, 1, 0, 0, 0),
1✔
623
                    DateTime::new_utc(2011, 1, 1, 0, 0, 0),
1✔
624
                )
625
                .unwrap(),
1✔
626
                TimeInterval::new(
1✔
627
                    DateTime::new_utc(2009, 1, 1, 0, 0, 0),
1✔
628
                    DateTime::new_utc(2011, 1, 1, 0, 0, 0),
1✔
629
                )
630
                .unwrap(),
1✔
631
                TimeInterval::new(
1✔
632
                    DateTime::new_utc(2010, 1, 1, 0, 0, 0),
1✔
633
                    DateTime::new_utc(2012, 1, 1, 0, 0, 0),
1✔
634
                )
635
                .unwrap(),
1✔
636
            ],
637
            Default::default(),
1✔
638
            CacheHint::default(),
1✔
639
        )
640
        .unwrap();
1✔
641

642
        assert!(result[0].chunks_equal_ignoring_cache_hint(&expected));
1✔
643
    }
1✔
644

645
    #[test]
646
    fn it_rewrites_result_descriptor() {
1✔
647
        let mut result_descriptor = VectorResultDescriptor {
1✔
648
            data_type: VectorDataType::MultiPoint,
1✔
649
            spatial_reference: SpatialReference::epsg_4326().into(),
1✔
650
            columns: Default::default(),
1✔
651
            time: Some(TimeInterval::new_unchecked(30_000, 90_000)),
1✔
652
            bbox: None,
1✔
653
        };
1✔
654

655
        rewrite_result_descriptor(
1✔
656
            &mut result_descriptor,
1✔
657
            TimeStep {
1✔
658
                granularity: TimeGranularity::Minutes,
1✔
659
                step: 1,
1✔
660
            },
1✔
661
            TimeInstance::from_millis_unchecked(0),
1✔
662
        )
663
        .unwrap();
1✔
664

665
        assert_eq!(
1✔
666
            result_descriptor,
667
            VectorResultDescriptor {
1✔
668
                data_type: VectorDataType::MultiPoint,
1✔
669
                spatial_reference: SpatialReference::epsg_4326().into(),
1✔
670
                columns: Default::default(),
1✔
671
                time: Some(TimeInterval::new_unchecked(0, 120_000)),
1✔
672
                bbox: None,
1✔
673
            }
1✔
674
        );
675
    }
1✔
676
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc