• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 11911118784

19 Nov 2024 10:06AM UTC coverage: 90.448% (-0.2%) from 90.687%
11911118784

push

github

web-flow
Merge pull request #994 from geo-engine/workspace-dependencies

use workspace dependencies, update toolchain, use global lock in expression

9 of 11 new or added lines in 6 files covered. (81.82%)

369 existing lines in 74 files now uncovered.

132871 of 146904 relevant lines covered (90.45%)

54798.62 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.53
/operators/src/processing/time_projection/mod.rs
1
use std::sync::Arc;
2

3
use crate::engine::{
4
    CanonicOperatorName, ExecutionContext, InitializedSources, InitializedVectorOperator, Operator,
5
    OperatorName, QueryContext, SingleVectorSource, TypedVectorQueryProcessor, VectorOperator,
6
    VectorQueryProcessor, VectorResultDescriptor, WorkflowOperatorPath,
7
};
8
use crate::util::Result;
9
use async_trait::async_trait;
10
use futures::stream::BoxStream;
11
use futures::{StreamExt, TryStreamExt};
12
use geoengine_datatypes::collections::{
13
    FeatureCollection, FeatureCollectionInfos, FeatureCollectionModifications,
14
};
15
use geoengine_datatypes::primitives::{ColumnSelection, Geometry, TimeInterval};
16
use geoengine_datatypes::primitives::{TimeInstance, TimeStep, VectorQueryRectangle};
17
use geoengine_datatypes::util::arrow::ArrowTyped;
18
use log::debug;
19
use rayon::iter::{IndexedParallelIterator, IntoParallelRefIterator, ParallelIterator};
20
use rayon::ThreadPool;
21
use serde::{Deserialize, Serialize};
22
use snafu::{ensure, ResultExt, Snafu};
23

24
/// Projection of time information in queries and data
25
///
26
/// This operator changes the temporal validity of the queried data.
27
/// In order to query all valid data, it is necessary to change the query rectangle as well.
28
///
29
pub type TimeProjection = Operator<TimeProjectionParams, SingleVectorSource>;
30

31
impl OperatorName for TimeProjection {
32
    const TYPE_NAME: &'static str = "TimeProjection";
33
}
34

35
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
×
36
pub struct TimeProjectionParams {
37
    /// Specify the time step granularity and size
38
    step: TimeStep,
39
    /// Define an anchor point for `step`
40
    /// If `None`, the anchor point is `1970-01-01T00:00:00Z` by default
41
    step_reference: Option<TimeInstance>,
42
}
43

44
#[derive(Debug, Snafu)]
×
45
#[snafu(visibility(pub(crate)), context(suffix(false)), module(error))]
46
pub enum TimeProjectionError {
47
    #[snafu(display("Time step must be larger than zero"))]
48
    WindowSizeMustNotBeZero,
49

50
    #[snafu(display("Query rectangle expansion failed: {}", source))]
51
    CannotExpandQueryRectangle {
52
        source: geoengine_datatypes::error::Error,
53
    },
54

55
    #[snafu(display("Feature time interval expansion failed: {}", source))]
56
    CannotExpandFeatureTimeInterval {
57
        source: geoengine_datatypes::error::Error,
58
    },
59
}
60

61
#[typetag::serde]
×
62
#[async_trait]
63
impl VectorOperator for TimeProjection {
64
    async fn _initialize(
65
        self: Box<Self>,
66
        path: WorkflowOperatorPath,
67
        context: &dyn ExecutionContext,
68
    ) -> Result<Box<dyn InitializedVectorOperator>> {
2✔
69
        ensure!(self.params.step.step > 0, error::WindowSizeMustNotBeZero);
2✔
70

71
        let name = CanonicOperatorName::from(&self);
2✔
72

73
        let initialized_sources = self.sources.initialize_sources(path, context).await?;
2✔
74

75
        debug!("Initializing `TimeProjection` with {:?}.", &self.params);
2✔
76

77
        let step_reference = self
2✔
78
            .params
2✔
79
            .step_reference
2✔
80
            // use UTC 0 as default
2✔
81
            .unwrap_or(TimeInstance::EPOCH_START);
2✔
82

2✔
83
        let mut result_descriptor = initialized_sources.vector.result_descriptor().clone();
2✔
84
        rewrite_result_descriptor(&mut result_descriptor, self.params.step, step_reference)?;
2✔
85

86
        let initialized_operator = InitializedVectorTimeProjection {
2✔
87
            name,
2✔
88
            source: initialized_sources.vector,
2✔
89
            result_descriptor,
2✔
90
            step: self.params.step,
2✔
91
            step_reference,
2✔
92
        };
2✔
93

2✔
94
        Ok(initialized_operator.boxed())
2✔
95
    }
4✔
96

97
    span_fn!(TimeProjection);
98
}
99

100
fn rewrite_result_descriptor(
3✔
101
    result_descriptor: &mut VectorResultDescriptor,
3✔
102
    step: TimeStep,
3✔
103
    step_reference: TimeInstance,
3✔
104
) -> Result<()> {
3✔
105
    if let Some(time) = result_descriptor.time {
3✔
106
        let start = step.snap_relative(step_reference, time.start())?;
1✔
107
        let end = (step.snap_relative(step_reference, time.end())? + step)?;
1✔
108

109
        result_descriptor.time = Some(TimeInterval::new(start, end)?);
1✔
110
    }
2✔
111
    Ok(())
3✔
112
}
3✔
113

114
pub struct InitializedVectorTimeProjection {
115
    name: CanonicOperatorName,
116
    source: Box<dyn InitializedVectorOperator>,
117
    result_descriptor: VectorResultDescriptor,
118
    step: TimeStep,
119
    step_reference: TimeInstance,
120
}
121

122
impl InitializedVectorOperator for InitializedVectorTimeProjection {
123
    fn result_descriptor(&self) -> &VectorResultDescriptor {
×
124
        &self.result_descriptor
×
125
    }
×
126

127
    fn query_processor(&self) -> Result<TypedVectorQueryProcessor> {
2✔
128
        let source_processor = self.source.query_processor()?;
2✔
129

130
        Ok(
131
            call_on_generic_vector_processor!(source_processor, processor => VectorTimeProjectionProcessor {
2✔
132
                processor,
×
133
                result_descriptor: self.result_descriptor.clone(),
×
134
                step: self.step,
×
135
                step_reference: self.step_reference,
×
136
            }.boxed().into()),
×
137
        )
138
    }
2✔
139

140
    fn canonic_name(&self) -> CanonicOperatorName {
×
141
        self.name.clone()
×
142
    }
×
143
}
144

145
pub struct VectorTimeProjectionProcessor<G>
146
where
147
    G: Geometry,
148
{
149
    processor: Box<dyn VectorQueryProcessor<VectorType = FeatureCollection<G>>>,
150
    result_descriptor: VectorResultDescriptor,
151
    step: TimeStep,
152
    step_reference: TimeInstance,
153
}
154

155
#[async_trait]
156
impl<G> VectorQueryProcessor for VectorTimeProjectionProcessor<G>
157
where
158
    G: Geometry + ArrowTyped + 'static,
159
{
160
    type VectorType = FeatureCollection<G>;
161

162
    async fn vector_query<'a>(
163
        &'a self,
164
        query: VectorQueryRectangle,
165
        ctx: &'a dyn QueryContext,
166
    ) -> Result<BoxStream<'a, Result<Self::VectorType>>> {
2✔
167
        let query = self.expand_query_rectangle(&query)?;
2✔
168
        let stream = self
2✔
169
            .processor
2✔
170
            .vector_query(query, ctx)
2✔
UNCOV
171
            .await?
×
172
            .and_then(|collection| {
2✔
173
                self.expand_feature_collection_result(collection, ctx.thread_pool().clone())
2✔
174
            })
2✔
175
            .boxed();
2✔
176
        Ok(stream)
2✔
177
    }
4✔
178

179
    fn vector_result_descriptor(&self) -> &VectorResultDescriptor {
2✔
180
        &self.result_descriptor
2✔
181
    }
2✔
182
}
183

184
impl<G> VectorTimeProjectionProcessor<G>
185
where
186
    G: Geometry + ArrowTyped + 'static,
187
{
188
    fn expand_query_rectangle(&self, query: &VectorQueryRectangle) -> Result<VectorQueryRectangle> {
2✔
189
        Ok(expand_query_rectangle(
2✔
190
            self.step,
2✔
191
            self.step_reference,
2✔
192
            query,
2✔
193
        )?)
2✔
194
    }
2✔
195

196
    async fn expand_feature_collection_result(
2✔
197
        &self,
2✔
198
        feature_collection: FeatureCollection<G>,
2✔
199
        thread_pool: Arc<ThreadPool>,
2✔
200
    ) -> Result<FeatureCollection<G>> {
2✔
201
        let step = self.step;
2✔
202
        let step_reference = self.step_reference;
2✔
203

2✔
204
        crate::util::spawn_blocking_with_thread_pool(thread_pool, move || {
2✔
205
            Self::expand_feature_collection_result_inner(feature_collection, step, step_reference)
2✔
206
        })
2✔
207
        .await?
2✔
208
        .map_err(Into::into)
2✔
209
    }
2✔
210

211
    fn expand_feature_collection_result_inner(
2✔
212
        feature_collection: FeatureCollection<G>,
2✔
213
        step: TimeStep,
2✔
214
        step_reference: TimeInstance,
2✔
215
    ) -> Result<FeatureCollection<G>, TimeProjectionError> {
2✔
216
        let time_intervals: Option<Result<Vec<TimeInterval>, TimeProjectionError>> =
2✔
217
            feature_collection
2✔
218
                .time_intervals()
2✔
219
                .par_iter()
2✔
220
                .with_min_len(128) // TODO: find good default
2✔
221
                .map(|time_interval| expand_time_interval(step, step_reference, *time_interval))
6✔
222
                // TODO: change to [`try_collect_into_vec`](https://github.com/rayon-rs/rayon/issues/713) if available
2✔
223
                .try_fold_with(Vec::new(), |mut acc, time_interval| {
6✔
224
                    acc.push(time_interval?);
6✔
225
                    Ok(acc)
6✔
226
                })
6✔
227
                .try_reduce_with(|a, b| Ok([a, b].concat()));
2✔
228

229
        match time_intervals {
2✔
230
            Some(Ok(time_intervals)) => feature_collection
2✔
231
                .replace_time(&time_intervals)
2✔
232
                .context(error::CannotExpandFeatureTimeInterval),
2✔
233
            Some(Err(error)) => Err(error),
×
234
            None => Ok(feature_collection), // was empty
×
235
        }
236
    }
2✔
237
}
238

239
fn expand_query_rectangle(
2✔
240
    step: TimeStep,
2✔
241
    step_reference: TimeInstance,
2✔
242
    query: &VectorQueryRectangle,
2✔
243
) -> Result<VectorQueryRectangle, TimeProjectionError> {
2✔
244
    Ok(VectorQueryRectangle {
2✔
245
        spatial_bounds: query.spatial_bounds,
2✔
246
        time_interval: expand_time_interval(step, step_reference, query.time_interval)?,
2✔
247
        spatial_resolution: query.spatial_resolution,
2✔
248
        attributes: ColumnSelection::all(),
2✔
249
    })
250
}
2✔
251

252
fn expand_time_interval(
15✔
253
    step: TimeStep,
15✔
254
    step_reference: TimeInstance,
15✔
255
    time_interval: TimeInterval,
15✔
256
) -> Result<TimeInterval, TimeProjectionError> {
15✔
257
    let start = step.snap_relative_preserve_bounds(step_reference, time_interval.start());
15✔
258
    let mut end = step.snap_relative_preserve_bounds(step_reference, time_interval.end());
15✔
259

15✔
260
    // Since snap_relative snaps to the "left side", we have to add one step to the end
15✔
261
    // This only applies if `time_interval.end()` is not the snap point itself.
15✔
262
    if end < time_interval.end() {
15✔
263
        end = match end + step {
14✔
264
            Ok(end) => end,
12✔
265
            Err(_) => TimeInstance::MAX, // `TimeInterval::MAX` can overflow
2✔
266
        };
267
    }
1✔
268

269
    TimeInterval::new(start, end).context(error::CannotExpandQueryRectangle)
15✔
270
}
15✔
271

272
#[cfg(test)]
273
mod tests {
274
    use super::*;
275

276
    use crate::{
277
        engine::{MockExecutionContext, MockQueryContext},
278
        mock::MockFeatureCollectionSource,
279
    };
280
    use geoengine_datatypes::{
281
        collections::{ChunksEqualIgnoringCacheHint, MultiPointCollection, VectorDataType},
282
        primitives::{
283
            BoundingBox2D, CacheHint, DateTime, MultiPoint, SpatialResolution, TimeGranularity,
284
            TimeInterval,
285
        },
286
        spatial_reference::SpatialReference,
287
        util::test::TestDefault,
288
    };
289

290
    #[test]
291
    #[allow(clippy::too_many_lines)]
292
    fn test_expand_query_time_interval() {
1✔
293
        fn assert_time_interval_transform<T1: TryInto<TimeInstance>, T2: TryInto<TimeInstance>>(
7✔
294
            t1: T1,
7✔
295
            t2: T1,
7✔
296
            step: TimeStep,
7✔
297
            step_reference: TimeInstance,
7✔
298
            t1_expanded: T2,
7✔
299
            t2_expanded: T2,
7✔
300
        ) where
7✔
301
            T1::Error: std::fmt::Debug,
7✔
302
            T2::Error: std::fmt::Debug,
7✔
303
        {
7✔
304
            let result = expand_time_interval(
7✔
305
                step,
7✔
306
                step_reference,
7✔
307
                TimeInterval::new(t1.try_into().unwrap(), t2.try_into().unwrap()).unwrap(),
7✔
308
            )
7✔
309
            .unwrap();
7✔
310
            let expected = TimeInterval::new(
7✔
311
                t1_expanded.try_into().unwrap(),
7✔
312
                t2_expanded.try_into().unwrap(),
7✔
313
            )
7✔
314
            .unwrap();
7✔
315

7✔
316
            assert_eq!(
7✔
317
                result,
318
                expected,
UNCOV
319
                "[{}, {}) != [{}, {})",
×
320
                result.start().as_datetime_string(),
×
321
                result.end().as_datetime_string(),
×
322
                expected.start().as_datetime_string(),
×
323
                expected.end().as_datetime_string(),
×
324
            );
325
        }
7✔
326

327
        assert_time_interval_transform(
1✔
328
            DateTime::new_utc(2010, 1, 1, 0, 0, 0),
1✔
329
            DateTime::new_utc(2011, 1, 1, 0, 0, 0),
1✔
330
            TimeStep {
1✔
331
                granularity: TimeGranularity::Years,
1✔
332
                step: 1,
1✔
333
            },
1✔
334
            TimeInstance::from_millis(0).unwrap(),
1✔
335
            DateTime::new_utc(2010, 1, 1, 0, 0, 0),
1✔
336
            DateTime::new_utc(2011, 1, 1, 0, 0, 0),
1✔
337
        );
1✔
338

1✔
339
        assert_time_interval_transform(
1✔
340
            DateTime::new_utc(2010, 4, 3, 0, 0, 0),
1✔
341
            DateTime::new_utc(2010, 5, 14, 0, 0, 0),
1✔
342
            TimeStep {
1✔
343
                granularity: TimeGranularity::Years,
1✔
344
                step: 1,
1✔
345
            },
1✔
346
            TimeInstance::from_millis(0).unwrap(),
1✔
347
            DateTime::new_utc(2010, 1, 1, 0, 0, 0),
1✔
348
            DateTime::new_utc(2011, 1, 1, 0, 0, 0),
1✔
349
        );
1✔
350

1✔
351
        assert_time_interval_transform(
1✔
352
            DateTime::new_utc(2009, 4, 3, 0, 0, 0),
1✔
353
            DateTime::new_utc(2010, 5, 14, 0, 0, 0),
1✔
354
            TimeStep {
1✔
355
                granularity: TimeGranularity::Years,
1✔
356
                step: 1,
1✔
357
            },
1✔
358
            TimeInstance::from_millis(0).unwrap(),
1✔
359
            DateTime::new_utc(2009, 1, 1, 0, 0, 0),
1✔
360
            DateTime::new_utc(2011, 1, 1, 0, 0, 0),
1✔
361
        );
1✔
362

1✔
363
        assert_time_interval_transform(
1✔
364
            DateTime::new_utc(2009, 4, 3, 0, 0, 0),
1✔
365
            DateTime::new_utc(2010, 5, 14, 0, 0, 0),
1✔
366
            TimeStep {
1✔
367
                granularity: TimeGranularity::Months,
1✔
368
                step: 6,
1✔
369
            },
1✔
370
            TimeInstance::from(DateTime::new_utc(2010, 3, 1, 0, 0, 0)),
1✔
371
            DateTime::new_utc(2009, 3, 1, 0, 0, 0),
1✔
372
            DateTime::new_utc(2010, 9, 1, 0, 0, 0),
1✔
373
        );
1✔
374

1✔
375
        assert_time_interval_transform(
1✔
376
            DateTime::new_utc(2009, 4, 3, 0, 0, 0),
1✔
377
            DateTime::new_utc(2010, 5, 14, 0, 0, 0),
1✔
378
            TimeStep {
1✔
379
                granularity: TimeGranularity::Months,
1✔
380
                step: 6,
1✔
381
            },
1✔
382
            TimeInstance::from(DateTime::new_utc(2020, 1, 1, 0, 0, 0)),
1✔
383
            DateTime::new_utc(2009, 1, 1, 0, 0, 0),
1✔
384
            DateTime::new_utc(2010, 7, 1, 0, 0, 0),
1✔
385
        );
1✔
386

1✔
387
        assert_time_interval_transform(
1✔
388
            TimeInstance::MIN,
1✔
389
            TimeInstance::MAX,
1✔
390
            TimeStep {
1✔
391
                granularity: TimeGranularity::Months,
1✔
392
                step: 6,
1✔
393
            },
1✔
394
            TimeInstance::from(DateTime::new_utc(2010, 3, 1, 0, 0, 0)),
1✔
395
            TimeInstance::MIN,
1✔
396
            TimeInstance::MAX,
1✔
397
        );
1✔
398

1✔
399
        assert_time_interval_transform(
1✔
400
            TimeInstance::MIN + 1,
1✔
401
            TimeInstance::MAX - 1,
1✔
402
            TimeStep {
1✔
403
                granularity: TimeGranularity::Months,
1✔
404
                step: 6,
1✔
405
            },
1✔
406
            TimeInstance::from(DateTime::new_utc(2010, 3, 1, 0, 0, 0)),
1✔
407
            TimeInstance::MIN,
1✔
408
            TimeInstance::MAX,
1✔
409
        );
1✔
410
    }
1✔
411

412
    #[tokio::test]
413
    async fn single_year() {
1✔
414
        let execution_context = MockExecutionContext::test_default();
1✔
415
        let query_context = MockQueryContext::test_default();
1✔
416

1✔
417
        let source = MockFeatureCollectionSource::single(
1✔
418
            MultiPointCollection::from_data(
1✔
419
                MultiPoint::many(vec![(0., 0.), (1., 1.), (2., 2.)]).unwrap(),
1✔
420
                vec![
1✔
421
                    TimeInterval::new(
1✔
422
                        DateTime::new_utc(2010, 1, 1, 0, 0, 0),
1✔
423
                        DateTime::new_utc_with_millis(2010, 12, 31, 23, 59, 59, 999),
1✔
424
                    )
1✔
425
                    .unwrap(),
1✔
426
                    TimeInterval::new(
1✔
427
                        DateTime::new_utc(2010, 6, 3, 0, 0, 0),
1✔
428
                        DateTime::new_utc(2010, 7, 14, 0, 0, 0),
1✔
429
                    )
1✔
430
                    .unwrap(),
1✔
431
                    TimeInterval::new(
1✔
432
                        DateTime::new_utc(2010, 1, 1, 0, 0, 0),
1✔
433
                        DateTime::new_utc_with_millis(2010, 3, 31, 23, 59, 59, 999),
1✔
434
                    )
1✔
435
                    .unwrap(),
1✔
436
                ],
1✔
437
                Default::default(),
1✔
438
                CacheHint::default(),
1✔
439
            )
1✔
440
            .unwrap(),
1✔
441
        );
1✔
442

1✔
443
        let time_projection = TimeProjection {
1✔
444
            sources: SingleVectorSource {
1✔
445
                vector: source.boxed(),
1✔
446
            },
1✔
447
            params: TimeProjectionParams {
1✔
448
                step: TimeStep {
1✔
449
                    granularity: TimeGranularity::Years,
1✔
450
                    step: 1,
1✔
451
                },
1✔
452
                step_reference: None,
1✔
453
            },
1✔
454
        };
1✔
455

1✔
456
        let query_processor = time_projection
1✔
457
            .boxed()
1✔
458
            .initialize(WorkflowOperatorPath::initialize_root(), &execution_context)
1✔
459
            .await
1✔
460
            .unwrap()
1✔
461
            .query_processor()
1✔
462
            .unwrap()
1✔
463
            .multi_point()
1✔
464
            .unwrap();
1✔
465

1✔
466
        let mut stream = query_processor
1✔
467
            .vector_query(
1✔
468
                VectorQueryRectangle {
1✔
469
                    spatial_bounds: BoundingBox2D::new((0., 0.).into(), (2., 2.).into()).unwrap(),
1✔
470
                    time_interval: TimeInterval::new(
1✔
471
                        DateTime::new_utc(2010, 4, 3, 0, 0, 0),
1✔
472
                        DateTime::new_utc(2010, 5, 14, 0, 0, 0),
1✔
473
                    )
1✔
474
                    .unwrap(),
1✔
475
                    spatial_resolution: SpatialResolution::one(),
1✔
476
                    attributes: ColumnSelection::all(),
1✔
477
                },
1✔
478
                &query_context,
1✔
479
            )
1✔
480
            .await
1✔
481
            .unwrap();
1✔
482

1✔
483
        let mut result = Vec::new();
1✔
484
        while let Some(collection) = stream.next().await {
2✔
485
            result.push(collection.unwrap());
1✔
486
        }
1✔
487

1✔
488
        assert_eq!(result.len(), 1);
1✔
489

1✔
490
        let expected = MultiPointCollection::from_data(
1✔
491
            MultiPoint::many(vec![(0., 0.), (1., 1.), (2., 2.)]).unwrap(),
1✔
492
            vec![
1✔
493
                TimeInterval::new(
1✔
494
                    DateTime::new_utc(2010, 1, 1, 0, 0, 0),
1✔
495
                    DateTime::new_utc(2011, 1, 1, 0, 0, 0),
1✔
496
                )
1✔
497
                .unwrap(),
1✔
498
                TimeInterval::new(
1✔
499
                    DateTime::new_utc(2010, 1, 1, 0, 0, 0),
1✔
500
                    DateTime::new_utc(2011, 1, 1, 0, 0, 0),
1✔
501
                )
1✔
502
                .unwrap(),
1✔
503
                TimeInterval::new(
1✔
504
                    DateTime::new_utc(2010, 1, 1, 0, 0, 0),
1✔
505
                    DateTime::new_utc(2011, 1, 1, 0, 0, 0),
1✔
506
                )
1✔
507
                .unwrap(),
1✔
508
            ],
1✔
509
            Default::default(),
1✔
510
            CacheHint::default(),
1✔
511
        )
1✔
512
        .unwrap();
1✔
513

1✔
514
        assert!(result[0].chunks_equal_ignoring_cache_hint(&expected));
1✔
515
    }
1✔
516

517
    #[tokio::test]
518
    async fn over_a_year() {
1✔
519
        let execution_context = MockExecutionContext::test_default();
1✔
520
        let query_context = MockQueryContext::test_default();
1✔
521

1✔
522
        let source = MockFeatureCollectionSource::single(
1✔
523
            MultiPointCollection::from_data(
1✔
524
                MultiPoint::many(vec![(0., 0.), (1., 1.), (2., 2.)]).unwrap(),
1✔
525
                vec![
1✔
526
                    TimeInterval::new(
1✔
527
                        DateTime::new_utc(2009, 1, 1, 0, 0, 0),
1✔
528
                        DateTime::new_utc_with_millis(2010, 12, 31, 23, 59, 59, 999),
1✔
529
                    )
1✔
530
                    .unwrap(),
1✔
531
                    TimeInterval::new(
1✔
532
                        DateTime::new_utc(2009, 6, 3, 0, 0, 0),
1✔
533
                        DateTime::new_utc(2010, 7, 14, 0, 0, 0),
1✔
534
                    )
1✔
535
                    .unwrap(),
1✔
536
                    TimeInterval::new(
1✔
537
                        DateTime::new_utc(2010, 1, 1, 0, 0, 0),
1✔
538
                        DateTime::new_utc_with_millis(2011, 3, 31, 23, 59, 59, 999),
1✔
539
                    )
1✔
540
                    .unwrap(),
1✔
541
                ],
1✔
542
                Default::default(),
1✔
543
                CacheHint::default(),
1✔
544
            )
1✔
545
            .unwrap(),
1✔
546
        );
1✔
547

1✔
548
        let time_projection = TimeProjection {
1✔
549
            sources: SingleVectorSource {
1✔
550
                vector: source.boxed(),
1✔
551
            },
1✔
552
            params: TimeProjectionParams {
1✔
553
                step: TimeStep {
1✔
554
                    granularity: TimeGranularity::Years,
1✔
555
                    step: 1,
1✔
556
                },
1✔
557
                step_reference: None,
1✔
558
            },
1✔
559
        };
1✔
560

1✔
561
        let query_processor = time_projection
1✔
562
            .boxed()
1✔
563
            .initialize(WorkflowOperatorPath::initialize_root(), &execution_context)
1✔
564
            .await
1✔
565
            .unwrap()
1✔
566
            .query_processor()
1✔
567
            .unwrap()
1✔
568
            .multi_point()
1✔
569
            .unwrap();
1✔
570

1✔
571
        let mut stream = query_processor
1✔
572
            .vector_query(
1✔
573
                VectorQueryRectangle {
1✔
574
                    spatial_bounds: BoundingBox2D::new((0., 0.).into(), (2., 2.).into()).unwrap(),
1✔
575
                    time_interval: TimeInterval::new(
1✔
576
                        DateTime::new_utc(2010, 4, 3, 0, 0, 0),
1✔
577
                        DateTime::new_utc(2010, 5, 14, 0, 0, 0),
1✔
578
                    )
1✔
579
                    .unwrap(),
1✔
580
                    spatial_resolution: SpatialResolution::one(),
1✔
581
                    attributes: ColumnSelection::all(),
1✔
582
                },
1✔
583
                &query_context,
1✔
584
            )
1✔
585
            .await
1✔
586
            .unwrap();
1✔
587

1✔
588
        let mut result = Vec::new();
1✔
589
        while let Some(collection) = stream.next().await {
2✔
590
            result.push(collection.unwrap());
1✔
591
        }
1✔
592

1✔
593
        assert_eq!(result.len(), 1);
1✔
594

1✔
595
        let expected = MultiPointCollection::from_data(
1✔
596
            MultiPoint::many(vec![(0., 0.), (1., 1.), (2., 2.)]).unwrap(),
1✔
597
            vec![
1✔
598
                TimeInterval::new(
1✔
599
                    DateTime::new_utc(2009, 1, 1, 0, 0, 0),
1✔
600
                    DateTime::new_utc(2011, 1, 1, 0, 0, 0),
1✔
601
                )
1✔
602
                .unwrap(),
1✔
603
                TimeInterval::new(
1✔
604
                    DateTime::new_utc(2009, 1, 1, 0, 0, 0),
1✔
605
                    DateTime::new_utc(2011, 1, 1, 0, 0, 0),
1✔
606
                )
1✔
607
                .unwrap(),
1✔
608
                TimeInterval::new(
1✔
609
                    DateTime::new_utc(2010, 1, 1, 0, 0, 0),
1✔
610
                    DateTime::new_utc(2012, 1, 1, 0, 0, 0),
1✔
611
                )
1✔
612
                .unwrap(),
1✔
613
            ],
1✔
614
            Default::default(),
1✔
615
            CacheHint::default(),
1✔
616
        )
1✔
617
        .unwrap();
1✔
618

1✔
619
        assert!(result[0].chunks_equal_ignoring_cache_hint(&expected));
1✔
620
    }
1✔
621

622
    #[test]
623
    fn it_rewrites_result_descriptor() {
1✔
624
        let mut result_descriptor = VectorResultDescriptor {
1✔
625
            data_type: VectorDataType::MultiPoint,
1✔
626
            spatial_reference: SpatialReference::epsg_4326().into(),
1✔
627
            columns: Default::default(),
1✔
628
            time: Some(TimeInterval::new_unchecked(30_000, 90_000)),
1✔
629
            bbox: None,
1✔
630
        };
1✔
631

1✔
632
        rewrite_result_descriptor(
1✔
633
            &mut result_descriptor,
1✔
634
            TimeStep {
1✔
635
                granularity: TimeGranularity::Minutes,
1✔
636
                step: 1,
1✔
637
            },
1✔
638
            TimeInstance::from_millis_unchecked(0),
1✔
639
        )
1✔
640
        .unwrap();
1✔
641

1✔
642
        assert_eq!(
1✔
643
            result_descriptor,
1✔
644
            VectorResultDescriptor {
1✔
645
                data_type: VectorDataType::MultiPoint,
1✔
646
                spatial_reference: SpatialReference::epsg_4326().into(),
1✔
647
                columns: Default::default(),
1✔
648
                time: Some(TimeInterval::new_unchecked(0, 120_000)),
1✔
649
                bbox: None,
1✔
650
            }
1✔
651
        );
1✔
652
    }
1✔
653
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc