• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 12469296660

23 Dec 2024 03:15PM UTC coverage: 90.56% (-0.1%) from 90.695%
12469296660

push

github

web-flow
Merge pull request #998 from geo-engine/quota_log_wip

Quota and Data usage Logging

859 of 1214 new or added lines in 66 files covered. (70.76%)

3 existing lines in 2 files now uncovered.

133923 of 147883 relevant lines covered (90.56%)

54439.32 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.81
/operators/src/processing/time_projection/mod.rs
1
use std::sync::Arc;
2

3
use crate::engine::{
4
    CanonicOperatorName, ExecutionContext, InitializedSources, InitializedVectorOperator, Operator,
5
    OperatorName, QueryContext, SingleVectorSource, TypedVectorQueryProcessor, VectorOperator,
6
    VectorQueryProcessor, VectorResultDescriptor, WorkflowOperatorPath,
7
};
8
use crate::util::Result;
9
use async_trait::async_trait;
10
use futures::stream::BoxStream;
11
use futures::{StreamExt, TryStreamExt};
12
use geoengine_datatypes::collections::{
13
    FeatureCollection, FeatureCollectionInfos, FeatureCollectionModifications,
14
};
15
use geoengine_datatypes::primitives::{ColumnSelection, Geometry, TimeInterval};
16
use geoengine_datatypes::primitives::{TimeInstance, TimeStep, VectorQueryRectangle};
17
use geoengine_datatypes::util::arrow::ArrowTyped;
18
use log::debug;
19
use rayon::iter::{IndexedParallelIterator, IntoParallelRefIterator, ParallelIterator};
20
use rayon::ThreadPool;
21
use serde::{Deserialize, Serialize};
22
use snafu::{ensure, ResultExt, Snafu};
23

24
/// Projection of time information in queries and data
25
///
26
/// This operator changes the temporal validity of the queried data.
27
/// In order to query all valid data, it is necessary to change the query rectangle as well.
28
///
29
pub type TimeProjection = Operator<TimeProjectionParams, SingleVectorSource>;
30

31
impl OperatorName for TimeProjection {
32
    const TYPE_NAME: &'static str = "TimeProjection";
33
}
34

35
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
36
pub struct TimeProjectionParams {
37
    /// Specify the time step granularity and size
38
    step: TimeStep,
39
    /// Define an anchor point for `step`
40
    /// If `None`, the anchor point is `1970-01-01T00:00:00Z` by default
41
    step_reference: Option<TimeInstance>,
42
}
43

44
#[derive(Debug, Snafu)]
×
45
#[snafu(visibility(pub(crate)), context(suffix(false)), module(error))]
46
pub enum TimeProjectionError {
47
    #[snafu(display("Time step must be larger than zero"))]
48
    WindowSizeMustNotBeZero,
49

50
    #[snafu(display("Query rectangle expansion failed: {}", source))]
51
    CannotExpandQueryRectangle {
52
        source: geoengine_datatypes::error::Error,
53
    },
54

55
    #[snafu(display("Feature time interval expansion failed: {}", source))]
56
    CannotExpandFeatureTimeInterval {
57
        source: geoengine_datatypes::error::Error,
58
    },
59
}
60

61
#[typetag::serde]
×
62
#[async_trait]
63
impl VectorOperator for TimeProjection {
64
    async fn _initialize(
65
        self: Box<Self>,
66
        path: WorkflowOperatorPath,
67
        context: &dyn ExecutionContext,
68
    ) -> Result<Box<dyn InitializedVectorOperator>> {
2✔
69
        ensure!(self.params.step.step > 0, error::WindowSizeMustNotBeZero);
2✔
70

71
        let name = CanonicOperatorName::from(&self);
2✔
72

73
        let initialized_sources = self
2✔
74
            .sources
2✔
75
            .initialize_sources(path.clone(), context)
2✔
76
            .await?;
2✔
77

78
        debug!("Initializing `TimeProjection` with {:?}.", &self.params);
2✔
79

80
        let step_reference = self
2✔
81
            .params
2✔
82
            .step_reference
2✔
83
            // use UTC 0 as default
2✔
84
            .unwrap_or(TimeInstance::EPOCH_START);
2✔
85

2✔
86
        let mut result_descriptor = initialized_sources.vector.result_descriptor().clone();
2✔
87
        rewrite_result_descriptor(&mut result_descriptor, self.params.step, step_reference)?;
2✔
88

89
        let initialized_operator = InitializedVectorTimeProjection {
2✔
90
            name,
2✔
91
            path,
2✔
92
            source: initialized_sources.vector,
2✔
93
            result_descriptor,
2✔
94
            step: self.params.step,
2✔
95
            step_reference,
2✔
96
        };
2✔
97

2✔
98
        Ok(initialized_operator.boxed())
2✔
99
    }
4✔
100

101
    span_fn!(TimeProjection);
102
}
103

104
fn rewrite_result_descriptor(
3✔
105
    result_descriptor: &mut VectorResultDescriptor,
3✔
106
    step: TimeStep,
3✔
107
    step_reference: TimeInstance,
3✔
108
) -> Result<()> {
3✔
109
    if let Some(time) = result_descriptor.time {
3✔
110
        let start = step.snap_relative(step_reference, time.start())?;
1✔
111
        let end = (step.snap_relative(step_reference, time.end())? + step)?;
1✔
112

113
        result_descriptor.time = Some(TimeInterval::new(start, end)?);
1✔
114
    }
2✔
115
    Ok(())
3✔
116
}
3✔
117

118
pub struct InitializedVectorTimeProjection {
119
    name: CanonicOperatorName,
120
    path: WorkflowOperatorPath,
121
    source: Box<dyn InitializedVectorOperator>,
122
    result_descriptor: VectorResultDescriptor,
123
    step: TimeStep,
124
    step_reference: TimeInstance,
125
}
126

127
impl InitializedVectorOperator for InitializedVectorTimeProjection {
128
    fn result_descriptor(&self) -> &VectorResultDescriptor {
×
129
        &self.result_descriptor
×
130
    }
×
131

132
    fn query_processor(&self) -> Result<TypedVectorQueryProcessor> {
2✔
133
        let source_processor = self.source.query_processor()?;
2✔
134

135
        Ok(
136
            call_on_generic_vector_processor!(source_processor, processor => VectorTimeProjectionProcessor {
2✔
137
                processor,
×
138
                result_descriptor: self.result_descriptor.clone(),
×
139
                step: self.step,
×
140
                step_reference: self.step_reference,
×
141
            }.boxed().into()),
×
142
        )
143
    }
2✔
144

145
    fn canonic_name(&self) -> CanonicOperatorName {
×
146
        self.name.clone()
×
147
    }
×
148

NEW
149
    fn name(&self) -> &'static str {
×
NEW
150
        TimeProjection::TYPE_NAME
×
NEW
151
    }
×
152

NEW
153
    fn path(&self) -> WorkflowOperatorPath {
×
NEW
154
        self.path.clone()
×
NEW
155
    }
×
156
}
157

158
pub struct VectorTimeProjectionProcessor<G>
159
where
160
    G: Geometry,
161
{
162
    processor: Box<dyn VectorQueryProcessor<VectorType = FeatureCollection<G>>>,
163
    result_descriptor: VectorResultDescriptor,
164
    step: TimeStep,
165
    step_reference: TimeInstance,
166
}
167

168
#[async_trait]
169
impl<G> VectorQueryProcessor for VectorTimeProjectionProcessor<G>
170
where
171
    G: Geometry + ArrowTyped + 'static,
172
{
173
    type VectorType = FeatureCollection<G>;
174

175
    async fn vector_query<'a>(
176
        &'a self,
177
        query: VectorQueryRectangle,
178
        ctx: &'a dyn QueryContext,
179
    ) -> Result<BoxStream<'a, Result<Self::VectorType>>> {
2✔
180
        let query = self.expand_query_rectangle(&query)?;
2✔
181
        let stream = self
2✔
182
            .processor
2✔
183
            .vector_query(query, ctx)
2✔
184
            .await?
2✔
185
            .and_then(|collection| {
2✔
186
                self.expand_feature_collection_result(collection, ctx.thread_pool().clone())
2✔
187
            })
2✔
188
            .boxed();
2✔
189
        Ok(stream)
2✔
190
    }
4✔
191

192
    fn vector_result_descriptor(&self) -> &VectorResultDescriptor {
2✔
193
        &self.result_descriptor
2✔
194
    }
2✔
195
}
196

197
impl<G> VectorTimeProjectionProcessor<G>
198
where
199
    G: Geometry + ArrowTyped + 'static,
200
{
201
    fn expand_query_rectangle(&self, query: &VectorQueryRectangle) -> Result<VectorQueryRectangle> {
2✔
202
        Ok(expand_query_rectangle(
2✔
203
            self.step,
2✔
204
            self.step_reference,
2✔
205
            query,
2✔
206
        )?)
2✔
207
    }
2✔
208

209
    async fn expand_feature_collection_result(
2✔
210
        &self,
2✔
211
        feature_collection: FeatureCollection<G>,
2✔
212
        thread_pool: Arc<ThreadPool>,
2✔
213
    ) -> Result<FeatureCollection<G>> {
2✔
214
        let step = self.step;
2✔
215
        let step_reference = self.step_reference;
2✔
216

2✔
217
        crate::util::spawn_blocking_with_thread_pool(thread_pool, move || {
2✔
218
            Self::expand_feature_collection_result_inner(feature_collection, step, step_reference)
2✔
219
        })
2✔
220
        .await?
2✔
221
        .map_err(Into::into)
2✔
222
    }
2✔
223

224
    fn expand_feature_collection_result_inner(
2✔
225
        feature_collection: FeatureCollection<G>,
2✔
226
        step: TimeStep,
2✔
227
        step_reference: TimeInstance,
2✔
228
    ) -> Result<FeatureCollection<G>, TimeProjectionError> {
2✔
229
        let time_intervals: Option<Result<Vec<TimeInterval>, TimeProjectionError>> =
2✔
230
            feature_collection
2✔
231
                .time_intervals()
2✔
232
                .par_iter()
2✔
233
                .with_min_len(128) // TODO: find good default
2✔
234
                .map(|time_interval| expand_time_interval(step, step_reference, *time_interval))
6✔
235
                // TODO: change to [`try_collect_into_vec`](https://github.com/rayon-rs/rayon/issues/713) if available
2✔
236
                .try_fold_with(Vec::new(), |mut acc, time_interval| {
6✔
237
                    acc.push(time_interval?);
6✔
238
                    Ok(acc)
6✔
239
                })
6✔
240
                .try_reduce_with(|a, b| Ok([a, b].concat()));
2✔
241

242
        match time_intervals {
2✔
243
            Some(Ok(time_intervals)) => feature_collection
2✔
244
                .replace_time(&time_intervals)
2✔
245
                .context(error::CannotExpandFeatureTimeInterval),
2✔
246
            Some(Err(error)) => Err(error),
×
247
            None => Ok(feature_collection), // was empty
×
248
        }
249
    }
2✔
250
}
251

252
fn expand_query_rectangle(
2✔
253
    step: TimeStep,
2✔
254
    step_reference: TimeInstance,
2✔
255
    query: &VectorQueryRectangle,
2✔
256
) -> Result<VectorQueryRectangle, TimeProjectionError> {
2✔
257
    Ok(VectorQueryRectangle {
2✔
258
        spatial_bounds: query.spatial_bounds,
2✔
259
        time_interval: expand_time_interval(step, step_reference, query.time_interval)?,
2✔
260
        spatial_resolution: query.spatial_resolution,
2✔
261
        attributes: ColumnSelection::all(),
2✔
262
    })
263
}
2✔
264

265
fn expand_time_interval(
15✔
266
    step: TimeStep,
15✔
267
    step_reference: TimeInstance,
15✔
268
    time_interval: TimeInterval,
15✔
269
) -> Result<TimeInterval, TimeProjectionError> {
15✔
270
    let start = step.snap_relative_preserve_bounds(step_reference, time_interval.start());
15✔
271
    let mut end = step.snap_relative_preserve_bounds(step_reference, time_interval.end());
15✔
272

15✔
273
    // Since snap_relative snaps to the "left side", we have to add one step to the end
15✔
274
    // This only applies if `time_interval.end()` is not the snap point itself.
15✔
275
    if end < time_interval.end() {
15✔
276
        end = match end + step {
14✔
277
            Ok(end) => end,
12✔
278
            Err(_) => TimeInstance::MAX, // `TimeInterval::MAX` can overflow
2✔
279
        };
280
    }
1✔
281

282
    TimeInterval::new(start, end).context(error::CannotExpandQueryRectangle)
15✔
283
}
15✔
284

285
#[cfg(test)]
286
mod tests {
287
    use super::*;
288

289
    use crate::{
290
        engine::{MockExecutionContext, MockQueryContext},
291
        mock::MockFeatureCollectionSource,
292
    };
293
    use geoengine_datatypes::{
294
        collections::{ChunksEqualIgnoringCacheHint, MultiPointCollection, VectorDataType},
295
        primitives::{
296
            BoundingBox2D, CacheHint, DateTime, MultiPoint, SpatialResolution, TimeGranularity,
297
            TimeInterval,
298
        },
299
        spatial_reference::SpatialReference,
300
        util::test::TestDefault,
301
    };
302

303
    #[test]
304
    #[allow(clippy::too_many_lines)]
305
    fn test_expand_query_time_interval() {
1✔
306
        fn assert_time_interval_transform<T1: TryInto<TimeInstance>, T2: TryInto<TimeInstance>>(
7✔
307
            t1: T1,
7✔
308
            t2: T1,
7✔
309
            step: TimeStep,
7✔
310
            step_reference: TimeInstance,
7✔
311
            t1_expanded: T2,
7✔
312
            t2_expanded: T2,
7✔
313
        ) where
7✔
314
            T1::Error: std::fmt::Debug,
7✔
315
            T2::Error: std::fmt::Debug,
7✔
316
        {
7✔
317
            let result = expand_time_interval(
7✔
318
                step,
7✔
319
                step_reference,
7✔
320
                TimeInterval::new(t1.try_into().unwrap(), t2.try_into().unwrap()).unwrap(),
7✔
321
            )
7✔
322
            .unwrap();
7✔
323
            let expected = TimeInterval::new(
7✔
324
                t1_expanded.try_into().unwrap(),
7✔
325
                t2_expanded.try_into().unwrap(),
7✔
326
            )
7✔
327
            .unwrap();
7✔
328

7✔
329
            assert_eq!(
7✔
330
                result,
331
                expected,
332
                "[{}, {}) != [{}, {})",
×
333
                result.start().as_datetime_string(),
×
334
                result.end().as_datetime_string(),
×
335
                expected.start().as_datetime_string(),
×
336
                expected.end().as_datetime_string(),
×
337
            );
338
        }
7✔
339

340
        assert_time_interval_transform(
1✔
341
            DateTime::new_utc(2010, 1, 1, 0, 0, 0),
1✔
342
            DateTime::new_utc(2011, 1, 1, 0, 0, 0),
1✔
343
            TimeStep {
1✔
344
                granularity: TimeGranularity::Years,
1✔
345
                step: 1,
1✔
346
            },
1✔
347
            TimeInstance::from_millis(0).unwrap(),
1✔
348
            DateTime::new_utc(2010, 1, 1, 0, 0, 0),
1✔
349
            DateTime::new_utc(2011, 1, 1, 0, 0, 0),
1✔
350
        );
1✔
351

1✔
352
        assert_time_interval_transform(
1✔
353
            DateTime::new_utc(2010, 4, 3, 0, 0, 0),
1✔
354
            DateTime::new_utc(2010, 5, 14, 0, 0, 0),
1✔
355
            TimeStep {
1✔
356
                granularity: TimeGranularity::Years,
1✔
357
                step: 1,
1✔
358
            },
1✔
359
            TimeInstance::from_millis(0).unwrap(),
1✔
360
            DateTime::new_utc(2010, 1, 1, 0, 0, 0),
1✔
361
            DateTime::new_utc(2011, 1, 1, 0, 0, 0),
1✔
362
        );
1✔
363

1✔
364
        assert_time_interval_transform(
1✔
365
            DateTime::new_utc(2009, 4, 3, 0, 0, 0),
1✔
366
            DateTime::new_utc(2010, 5, 14, 0, 0, 0),
1✔
367
            TimeStep {
1✔
368
                granularity: TimeGranularity::Years,
1✔
369
                step: 1,
1✔
370
            },
1✔
371
            TimeInstance::from_millis(0).unwrap(),
1✔
372
            DateTime::new_utc(2009, 1, 1, 0, 0, 0),
1✔
373
            DateTime::new_utc(2011, 1, 1, 0, 0, 0),
1✔
374
        );
1✔
375

1✔
376
        assert_time_interval_transform(
1✔
377
            DateTime::new_utc(2009, 4, 3, 0, 0, 0),
1✔
378
            DateTime::new_utc(2010, 5, 14, 0, 0, 0),
1✔
379
            TimeStep {
1✔
380
                granularity: TimeGranularity::Months,
1✔
381
                step: 6,
1✔
382
            },
1✔
383
            TimeInstance::from(DateTime::new_utc(2010, 3, 1, 0, 0, 0)),
1✔
384
            DateTime::new_utc(2009, 3, 1, 0, 0, 0),
1✔
385
            DateTime::new_utc(2010, 9, 1, 0, 0, 0),
1✔
386
        );
1✔
387

1✔
388
        assert_time_interval_transform(
1✔
389
            DateTime::new_utc(2009, 4, 3, 0, 0, 0),
1✔
390
            DateTime::new_utc(2010, 5, 14, 0, 0, 0),
1✔
391
            TimeStep {
1✔
392
                granularity: TimeGranularity::Months,
1✔
393
                step: 6,
1✔
394
            },
1✔
395
            TimeInstance::from(DateTime::new_utc(2020, 1, 1, 0, 0, 0)),
1✔
396
            DateTime::new_utc(2009, 1, 1, 0, 0, 0),
1✔
397
            DateTime::new_utc(2010, 7, 1, 0, 0, 0),
1✔
398
        );
1✔
399

1✔
400
        assert_time_interval_transform(
1✔
401
            TimeInstance::MIN,
1✔
402
            TimeInstance::MAX,
1✔
403
            TimeStep {
1✔
404
                granularity: TimeGranularity::Months,
1✔
405
                step: 6,
1✔
406
            },
1✔
407
            TimeInstance::from(DateTime::new_utc(2010, 3, 1, 0, 0, 0)),
1✔
408
            TimeInstance::MIN,
1✔
409
            TimeInstance::MAX,
1✔
410
        );
1✔
411

1✔
412
        assert_time_interval_transform(
1✔
413
            TimeInstance::MIN + 1,
1✔
414
            TimeInstance::MAX - 1,
1✔
415
            TimeStep {
1✔
416
                granularity: TimeGranularity::Months,
1✔
417
                step: 6,
1✔
418
            },
1✔
419
            TimeInstance::from(DateTime::new_utc(2010, 3, 1, 0, 0, 0)),
1✔
420
            TimeInstance::MIN,
1✔
421
            TimeInstance::MAX,
1✔
422
        );
1✔
423
    }
1✔
424

425
    #[tokio::test]
426
    async fn single_year() {
1✔
427
        let execution_context = MockExecutionContext::test_default();
1✔
428
        let query_context = MockQueryContext::test_default();
1✔
429

1✔
430
        let source = MockFeatureCollectionSource::single(
1✔
431
            MultiPointCollection::from_data(
1✔
432
                MultiPoint::many(vec![(0., 0.), (1., 1.), (2., 2.)]).unwrap(),
1✔
433
                vec![
1✔
434
                    TimeInterval::new(
1✔
435
                        DateTime::new_utc(2010, 1, 1, 0, 0, 0),
1✔
436
                        DateTime::new_utc_with_millis(2010, 12, 31, 23, 59, 59, 999),
1✔
437
                    )
1✔
438
                    .unwrap(),
1✔
439
                    TimeInterval::new(
1✔
440
                        DateTime::new_utc(2010, 6, 3, 0, 0, 0),
1✔
441
                        DateTime::new_utc(2010, 7, 14, 0, 0, 0),
1✔
442
                    )
1✔
443
                    .unwrap(),
1✔
444
                    TimeInterval::new(
1✔
445
                        DateTime::new_utc(2010, 1, 1, 0, 0, 0),
1✔
446
                        DateTime::new_utc_with_millis(2010, 3, 31, 23, 59, 59, 999),
1✔
447
                    )
1✔
448
                    .unwrap(),
1✔
449
                ],
1✔
450
                Default::default(),
1✔
451
                CacheHint::default(),
1✔
452
            )
1✔
453
            .unwrap(),
1✔
454
        );
1✔
455

1✔
456
        let time_projection = TimeProjection {
1✔
457
            sources: SingleVectorSource {
1✔
458
                vector: source.boxed(),
1✔
459
            },
1✔
460
            params: TimeProjectionParams {
1✔
461
                step: TimeStep {
1✔
462
                    granularity: TimeGranularity::Years,
1✔
463
                    step: 1,
1✔
464
                },
1✔
465
                step_reference: None,
1✔
466
            },
1✔
467
        };
1✔
468

1✔
469
        let query_processor = time_projection
1✔
470
            .boxed()
1✔
471
            .initialize(WorkflowOperatorPath::initialize_root(), &execution_context)
1✔
472
            .await
1✔
473
            .unwrap()
1✔
474
            .query_processor()
1✔
475
            .unwrap()
1✔
476
            .multi_point()
1✔
477
            .unwrap();
1✔
478

1✔
479
        let mut stream = query_processor
1✔
480
            .vector_query(
1✔
481
                VectorQueryRectangle {
1✔
482
                    spatial_bounds: BoundingBox2D::new((0., 0.).into(), (2., 2.).into()).unwrap(),
1✔
483
                    time_interval: TimeInterval::new(
1✔
484
                        DateTime::new_utc(2010, 4, 3, 0, 0, 0),
1✔
485
                        DateTime::new_utc(2010, 5, 14, 0, 0, 0),
1✔
486
                    )
1✔
487
                    .unwrap(),
1✔
488
                    spatial_resolution: SpatialResolution::one(),
1✔
489
                    attributes: ColumnSelection::all(),
1✔
490
                },
1✔
491
                &query_context,
1✔
492
            )
1✔
493
            .await
1✔
494
            .unwrap();
1✔
495

1✔
496
        let mut result = Vec::new();
1✔
497
        while let Some(collection) = stream.next().await {
2✔
498
            result.push(collection.unwrap());
1✔
499
        }
1✔
500

1✔
501
        assert_eq!(result.len(), 1);
1✔
502

1✔
503
        let expected = MultiPointCollection::from_data(
1✔
504
            MultiPoint::many(vec![(0., 0.), (1., 1.), (2., 2.)]).unwrap(),
1✔
505
            vec![
1✔
506
                TimeInterval::new(
1✔
507
                    DateTime::new_utc(2010, 1, 1, 0, 0, 0),
1✔
508
                    DateTime::new_utc(2011, 1, 1, 0, 0, 0),
1✔
509
                )
1✔
510
                .unwrap(),
1✔
511
                TimeInterval::new(
1✔
512
                    DateTime::new_utc(2010, 1, 1, 0, 0, 0),
1✔
513
                    DateTime::new_utc(2011, 1, 1, 0, 0, 0),
1✔
514
                )
1✔
515
                .unwrap(),
1✔
516
                TimeInterval::new(
1✔
517
                    DateTime::new_utc(2010, 1, 1, 0, 0, 0),
1✔
518
                    DateTime::new_utc(2011, 1, 1, 0, 0, 0),
1✔
519
                )
1✔
520
                .unwrap(),
1✔
521
            ],
1✔
522
            Default::default(),
1✔
523
            CacheHint::default(),
1✔
524
        )
1✔
525
        .unwrap();
1✔
526

1✔
527
        assert!(result[0].chunks_equal_ignoring_cache_hint(&expected));
1✔
528
    }
1✔
529

530
    #[tokio::test]
531
    async fn over_a_year() {
1✔
532
        let execution_context = MockExecutionContext::test_default();
1✔
533
        let query_context = MockQueryContext::test_default();
1✔
534

1✔
535
        let source = MockFeatureCollectionSource::single(
1✔
536
            MultiPointCollection::from_data(
1✔
537
                MultiPoint::many(vec![(0., 0.), (1., 1.), (2., 2.)]).unwrap(),
1✔
538
                vec![
1✔
539
                    TimeInterval::new(
1✔
540
                        DateTime::new_utc(2009, 1, 1, 0, 0, 0),
1✔
541
                        DateTime::new_utc_with_millis(2010, 12, 31, 23, 59, 59, 999),
1✔
542
                    )
1✔
543
                    .unwrap(),
1✔
544
                    TimeInterval::new(
1✔
545
                        DateTime::new_utc(2009, 6, 3, 0, 0, 0),
1✔
546
                        DateTime::new_utc(2010, 7, 14, 0, 0, 0),
1✔
547
                    )
1✔
548
                    .unwrap(),
1✔
549
                    TimeInterval::new(
1✔
550
                        DateTime::new_utc(2010, 1, 1, 0, 0, 0),
1✔
551
                        DateTime::new_utc_with_millis(2011, 3, 31, 23, 59, 59, 999),
1✔
552
                    )
1✔
553
                    .unwrap(),
1✔
554
                ],
1✔
555
                Default::default(),
1✔
556
                CacheHint::default(),
1✔
557
            )
1✔
558
            .unwrap(),
1✔
559
        );
1✔
560

1✔
561
        let time_projection = TimeProjection {
1✔
562
            sources: SingleVectorSource {
1✔
563
                vector: source.boxed(),
1✔
564
            },
1✔
565
            params: TimeProjectionParams {
1✔
566
                step: TimeStep {
1✔
567
                    granularity: TimeGranularity::Years,
1✔
568
                    step: 1,
1✔
569
                },
1✔
570
                step_reference: None,
1✔
571
            },
1✔
572
        };
1✔
573

1✔
574
        let query_processor = time_projection
1✔
575
            .boxed()
1✔
576
            .initialize(WorkflowOperatorPath::initialize_root(), &execution_context)
1✔
577
            .await
1✔
578
            .unwrap()
1✔
579
            .query_processor()
1✔
580
            .unwrap()
1✔
581
            .multi_point()
1✔
582
            .unwrap();
1✔
583

1✔
584
        let mut stream = query_processor
1✔
585
            .vector_query(
1✔
586
                VectorQueryRectangle {
1✔
587
                    spatial_bounds: BoundingBox2D::new((0., 0.).into(), (2., 2.).into()).unwrap(),
1✔
588
                    time_interval: TimeInterval::new(
1✔
589
                        DateTime::new_utc(2010, 4, 3, 0, 0, 0),
1✔
590
                        DateTime::new_utc(2010, 5, 14, 0, 0, 0),
1✔
591
                    )
1✔
592
                    .unwrap(),
1✔
593
                    spatial_resolution: SpatialResolution::one(),
1✔
594
                    attributes: ColumnSelection::all(),
1✔
595
                },
1✔
596
                &query_context,
1✔
597
            )
1✔
598
            .await
1✔
599
            .unwrap();
1✔
600

1✔
601
        let mut result = Vec::new();
1✔
602
        while let Some(collection) = stream.next().await {
2✔
603
            result.push(collection.unwrap());
1✔
604
        }
1✔
605

1✔
606
        assert_eq!(result.len(), 1);
1✔
607

1✔
608
        let expected = MultiPointCollection::from_data(
1✔
609
            MultiPoint::many(vec![(0., 0.), (1., 1.), (2., 2.)]).unwrap(),
1✔
610
            vec![
1✔
611
                TimeInterval::new(
1✔
612
                    DateTime::new_utc(2009, 1, 1, 0, 0, 0),
1✔
613
                    DateTime::new_utc(2011, 1, 1, 0, 0, 0),
1✔
614
                )
1✔
615
                .unwrap(),
1✔
616
                TimeInterval::new(
1✔
617
                    DateTime::new_utc(2009, 1, 1, 0, 0, 0),
1✔
618
                    DateTime::new_utc(2011, 1, 1, 0, 0, 0),
1✔
619
                )
1✔
620
                .unwrap(),
1✔
621
                TimeInterval::new(
1✔
622
                    DateTime::new_utc(2010, 1, 1, 0, 0, 0),
1✔
623
                    DateTime::new_utc(2012, 1, 1, 0, 0, 0),
1✔
624
                )
1✔
625
                .unwrap(),
1✔
626
            ],
1✔
627
            Default::default(),
1✔
628
            CacheHint::default(),
1✔
629
        )
1✔
630
        .unwrap();
1✔
631

1✔
632
        assert!(result[0].chunks_equal_ignoring_cache_hint(&expected));
1✔
633
    }
1✔
634

635
    #[test]
636
    fn it_rewrites_result_descriptor() {
1✔
637
        let mut result_descriptor = VectorResultDescriptor {
1✔
638
            data_type: VectorDataType::MultiPoint,
1✔
639
            spatial_reference: SpatialReference::epsg_4326().into(),
1✔
640
            columns: Default::default(),
1✔
641
            time: Some(TimeInterval::new_unchecked(30_000, 90_000)),
1✔
642
            bbox: None,
1✔
643
        };
1✔
644

1✔
645
        rewrite_result_descriptor(
1✔
646
            &mut result_descriptor,
1✔
647
            TimeStep {
1✔
648
                granularity: TimeGranularity::Minutes,
1✔
649
                step: 1,
1✔
650
            },
1✔
651
            TimeInstance::from_millis_unchecked(0),
1✔
652
        )
1✔
653
        .unwrap();
1✔
654

1✔
655
        assert_eq!(
1✔
656
            result_descriptor,
1✔
657
            VectorResultDescriptor {
1✔
658
                data_type: VectorDataType::MultiPoint,
1✔
659
                spatial_reference: SpatialReference::epsg_4326().into(),
1✔
660
                columns: Default::default(),
1✔
661
                time: Some(TimeInterval::new_unchecked(0, 120_000)),
1✔
662
                bbox: None,
1✔
663
            }
1✔
664
        );
1✔
665
    }
1✔
666
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc