• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 5006008836

pending completion
5006008836

push

github

GitHub
Merge #785 #787

936 of 936 new or added lines in 50 files covered. (100.0%)

96010 of 107707 relevant lines covered (89.14%)

72676.46 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.51
/operators/src/processing/time_shift.rs
1
use crate::engine::{
2
    CanonicOperatorName, ExecutionContext, InitializedRasterOperator,
3
    InitializedSingleRasterOrVectorOperator, InitializedSources, InitializedVectorOperator,
4
    Operator, OperatorName, QueryContext, RasterOperator, RasterQueryProcessor,
5
    RasterResultDescriptor, ResultDescriptor, SingleRasterOrVectorSource,
6
    TypedRasterQueryProcessor, TypedVectorQueryProcessor, VectorOperator, VectorQueryProcessor,
7
    VectorResultDescriptor, WorkflowOperatorPath,
8
};
9
use crate::util::Result;
10
use async_trait::async_trait;
11
use futures::stream::BoxStream;
12
use futures::StreamExt;
13
use geoengine_datatypes::collections::{
14
    FeatureCollection, FeatureCollectionInfos, FeatureCollectionModifications,
15
};
16
use geoengine_datatypes::error::{BoxedResultExt, ErrorSource};
17
use geoengine_datatypes::primitives::{
18
    Duration, Geometry, RasterQueryRectangle, TimeGranularity, TimeInstance, TimeInterval,
19
};
20
use geoengine_datatypes::primitives::{TimeStep, VectorQueryRectangle};
21
use geoengine_datatypes::raster::{Pixel, RasterTile2D};
22
use geoengine_datatypes::util::arrow::ArrowTyped;
23
use serde::{Deserialize, Serialize};
24
use snafu::Snafu;
25

26
/// Project the query rectangle to a new time interval.
27
pub type TimeShift = Operator<TimeShiftParams, SingleRasterOrVectorSource>;
28

29
impl OperatorName for TimeShift {
30
    const TYPE_NAME: &'static str = "TimeShift";
31
}
32

33
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
12✔
34
#[serde(tag = "type", rename_all = "camelCase")]
35
pub enum TimeShiftParams {
36
    /// Shift the query rectangle relative with a time step
37
    #[serde(rename_all = "camelCase")]
38
    Relative {
39
        granularity: TimeGranularity,
40
        value: i32,
41
    },
42
    /// Set the time interval to a fixed value
43
    #[serde(rename_all = "camelCase")]
44
    Absolute { time_interval: TimeInterval },
45
}
46

47
pub trait TimeShiftOperation: Send + Sync + Copy {
48
    type State: Send + Sync + Copy;
49

50
    fn shift(
51
        &self,
52
        time_interval: TimeInterval,
53
    ) -> Result<(TimeInterval, Self::State), TimeShiftError>;
54

55
    fn reverse_shift(
56
        &self,
57
        time_interval: TimeInterval,
58
        state: Self::State,
59
    ) -> Result<TimeInterval, TimeShiftError>;
60
}
61

62
#[derive(Debug, Clone, Copy)]
×
63
pub struct RelativeForwardShift {
64
    step: TimeStep,
65
}
66

67
impl TimeShiftOperation for RelativeForwardShift {
68
    type State = ();
69

70
    fn shift(
9✔
71
        &self,
9✔
72
        time_interval: TimeInterval,
9✔
73
    ) -> Result<(TimeInterval, Self::State), TimeShiftError> {
9✔
74
        let time_interval = time_interval + self.step;
9✔
75
        let time_interval = time_interval.boxed_context(error::TimeOverflow)?;
9✔
76
        Ok((time_interval, ()))
9✔
77
    }
9✔
78

79
    fn reverse_shift(
9✔
80
        &self,
9✔
81
        time_interval: TimeInterval,
9✔
82
        _state: Self::State,
9✔
83
    ) -> Result<TimeInterval, TimeShiftError> {
9✔
84
        let reversed_time_interval = time_interval - self.step;
9✔
85
        reversed_time_interval.boxed_context(error::TimeOverflow)
9✔
86
    }
9✔
87
}
88

89
#[derive(Debug, Clone, Copy)]
×
90
pub struct RelativeBackwardShift {
91
    step: TimeStep,
92
}
93

94
impl TimeShiftOperation for RelativeBackwardShift {
95
    type State = ();
96

97
    fn shift(
6✔
98
        &self,
6✔
99
        time_interval: TimeInterval,
6✔
100
    ) -> Result<(TimeInterval, Self::State), TimeShiftError> {
6✔
101
        let time_interval = time_interval - self.step;
6✔
102
        let time_interval = time_interval.boxed_context(error::TimeOverflow)?;
6✔
103
        Ok((time_interval, ()))
6✔
104
    }
6✔
105

106
    fn reverse_shift(
9✔
107
        &self,
9✔
108
        time_interval: TimeInterval,
9✔
109
        _state: Self::State,
9✔
110
    ) -> Result<TimeInterval, TimeShiftError> {
9✔
111
        let reversed_time_interval = time_interval + self.step;
9✔
112
        reversed_time_interval.boxed_context(error::TimeOverflow)
9✔
113
    }
9✔
114
}
115

116
#[derive(Debug, Clone, Copy)]
×
117
pub struct AbsoluteShift {
118
    time_interval: TimeInterval,
119
}
120

121
impl TimeShiftOperation for AbsoluteShift {
122
    type State = (Duration, Duration);
123

124
    fn shift(
4✔
125
        &self,
4✔
126
        time_interval: TimeInterval,
4✔
127
    ) -> Result<(TimeInterval, Self::State), TimeShiftError> {
4✔
128
        let time_start_difference = time_interval.start() - self.time_interval.start();
4✔
129
        let time_end_difference = time_interval.end() - self.time_interval.end();
4✔
130

4✔
131
        Ok((
4✔
132
            self.time_interval,
4✔
133
            (time_start_difference, time_end_difference),
4✔
134
        ))
4✔
135
    }
4✔
136

137
    fn reverse_shift(
7✔
138
        &self,
7✔
139
        time_interval: TimeInterval,
7✔
140
        (time_start_difference, time_end_difference): Self::State,
7✔
141
    ) -> Result<TimeInterval, TimeShiftError> {
7✔
142
        let t1 = time_interval.start() + time_start_difference.num_milliseconds();
7✔
143
        let t2 = time_interval.end() + time_end_difference.num_milliseconds();
7✔
144
        TimeInterval::new(t1, t2).boxed_context(error::FaultyTimeInterval { t1, t2 })
7✔
145
    }
7✔
146
}
147

148
#[derive(Debug, Snafu)]
×
149
#[snafu(visibility(pub(crate)), context(suffix(false)), module(error))]
150
pub enum TimeShiftError {
151
    #[snafu(display("Output type must match the type of the source"))]
152
    UnmatchedOutput,
153
    #[snafu(display("Shifting the time led to an overflowing time interval"))]
154
    TimeOverflow { source: Box<dyn ErrorSource> },
155
    #[snafu(display("Shifting the time to a faulty time interval: {t1} / {t2}"))]
156
    FaultyTimeInterval {
157
        source: Box<dyn ErrorSource>,
158
        t1: TimeInstance,
159
        t2: TimeInstance,
160
    },
161
    #[snafu(display("Modifying the timestamps of the feature collection failed"))]
162
    FeatureCollectionTimeModification { source: Box<dyn ErrorSource> },
163
}
164

165
#[typetag::serde]
×
166
#[async_trait]
167
impl VectorOperator for TimeShift {
168
    async fn _initialize(
2✔
169
        self: Box<Self>,
2✔
170
        path: WorkflowOperatorPath,
2✔
171
        context: &dyn ExecutionContext,
2✔
172
    ) -> Result<Box<dyn InitializedVectorOperator>> {
2✔
173
        let name = CanonicOperatorName::from(&self);
2✔
174

175
        let init_sources = self.sources.initialize_sources(path, context).await?;
2✔
176

177
        match (init_sources.source, self.params) {
2✔
178
            (
179
                InitializedSingleRasterOrVectorOperator::Vector(source),
×
180
                TimeShiftParams::Relative { granularity, value },
×
181
            ) if value.is_positive() => {
1✔
182
                let shift = RelativeForwardShift {
×
183
                    step: TimeStep {
×
184
                        granularity,
×
185
                        step: value.unsigned_abs(),
×
186
                    },
×
187
                };
×
188

×
189
                let result_descriptor = shift_result_descriptor(source.result_descriptor(), shift);
×
190

×
191
                Ok(Box::new(InitializedVectorTimeShift {
×
192
                    name,
×
193
                    source,
×
194
                    result_descriptor,
×
195
                    shift,
×
196
                }))
×
197
            }
198
            (
199
                InitializedSingleRasterOrVectorOperator::Vector(source),
1✔
200
                TimeShiftParams::Relative { granularity, value },
1✔
201
            ) => {
1✔
202
                let shift = RelativeBackwardShift {
1✔
203
                    step: TimeStep {
1✔
204
                        granularity,
1✔
205
                        step: value.unsigned_abs(),
1✔
206
                    },
1✔
207
                };
1✔
208

1✔
209
                let result_descriptor = shift_result_descriptor(source.result_descriptor(), shift);
1✔
210

1✔
211
                Ok(Box::new(InitializedVectorTimeShift {
1✔
212
                    name,
1✔
213
                    source,
1✔
214
                    result_descriptor,
1✔
215
                    shift,
1✔
216
                }))
1✔
217
            }
218
            (
219
                InitializedSingleRasterOrVectorOperator::Vector(source),
1✔
220
                TimeShiftParams::Absolute { time_interval },
1✔
221
            ) => {
1✔
222
                let shift = AbsoluteShift { time_interval };
1✔
223

1✔
224
                let result_descriptor = shift_result_descriptor(source.result_descriptor(), shift);
1✔
225

1✔
226
                Ok(Box::new(InitializedVectorTimeShift {
1✔
227
                    name,
1✔
228
                    source,
1✔
229
                    result_descriptor,
1✔
230
                    shift,
1✔
231
                }))
1✔
232
            }
233
            (InitializedSingleRasterOrVectorOperator::Raster(_), _) => {
234
                Err(TimeShiftError::UnmatchedOutput.into())
×
235
            }
236
        }
237
    }
4✔
238

239
    span_fn!(TimeShift);
×
240
}
241

242
#[typetag::serde]
×
243
#[async_trait]
244
impl RasterOperator for TimeShift {
245
    async fn _initialize(
7✔
246
        self: Box<Self>,
7✔
247
        path: WorkflowOperatorPath,
7✔
248
        context: &dyn ExecutionContext,
7✔
249
    ) -> Result<Box<dyn InitializedRasterOperator>> {
7✔
250
        let name = CanonicOperatorName::from(&self);
7✔
251

252
        let init_sources = self.sources.initialize_sources(path, context).await?;
7✔
253

254
        match (init_sources.source, self.params) {
7✔
255
            (
256
                InitializedSingleRasterOrVectorOperator::Raster(source),
4✔
257
                TimeShiftParams::Relative { granularity, value },
4✔
258
            ) if value.is_positive() => {
5✔
259
                let shift = RelativeForwardShift {
4✔
260
                    step: TimeStep {
4✔
261
                        granularity,
4✔
262
                        step: value.unsigned_abs(),
4✔
263
                    },
4✔
264
                };
4✔
265

4✔
266
                let result_descriptor = shift_result_descriptor(source.result_descriptor(), shift);
4✔
267

4✔
268
                Ok(Box::new(InitializedRasterTimeShift {
4✔
269
                    name,
4✔
270
                    source,
4✔
271
                    result_descriptor,
4✔
272
                    shift,
4✔
273
                }))
4✔
274
            }
275
            (
276
                InitializedSingleRasterOrVectorOperator::Raster(source),
1✔
277
                TimeShiftParams::Relative { granularity, value },
1✔
278
            ) => {
1✔
279
                let shift = RelativeBackwardShift {
1✔
280
                    step: TimeStep {
1✔
281
                        granularity,
1✔
282
                        step: value.unsigned_abs(),
1✔
283
                    },
1✔
284
                };
1✔
285

1✔
286
                let result_descriptor = shift_result_descriptor(source.result_descriptor(), shift);
1✔
287

1✔
288
                Ok(Box::new(InitializedRasterTimeShift {
1✔
289
                    name,
1✔
290
                    source,
1✔
291
                    result_descriptor,
1✔
292
                    shift,
1✔
293
                }))
1✔
294
            }
295
            (
296
                InitializedSingleRasterOrVectorOperator::Raster(source),
2✔
297
                TimeShiftParams::Absolute { time_interval },
2✔
298
            ) => {
2✔
299
                let shift = AbsoluteShift { time_interval };
2✔
300

2✔
301
                let result_descriptor = shift_result_descriptor(source.result_descriptor(), shift);
2✔
302

2✔
303
                Ok(Box::new(InitializedRasterTimeShift {
2✔
304
                    name,
2✔
305
                    source,
2✔
306
                    result_descriptor,
2✔
307
                    shift,
2✔
308
                }))
2✔
309
            }
310
            (InitializedSingleRasterOrVectorOperator::Vector(_), _) => {
311
                Err(TimeShiftError::UnmatchedOutput.into())
×
312
            }
313
        }
314
    }
14✔
315

316
    span_fn!(TimeShift);
×
317
}
318

319
fn shift_result_descriptor<R: ResultDescriptor, S: TimeShiftOperation>(
9✔
320
    result_descriptor: &R,
9✔
321
    shift: S,
9✔
322
) -> R {
9✔
323
    result_descriptor.map_time(|time| {
9✔
324
        if let Some(time) = time {
9✔
325
            shift.shift(*time).map(|r| r.0).ok()
5✔
326
        } else {
327
            None
4✔
328
        }
329
    })
9✔
330
}
9✔
331

332
pub struct InitializedVectorTimeShift<Shift: TimeShiftOperation> {
333
    name: CanonicOperatorName,
334
    source: Box<dyn InitializedVectorOperator>,
335
    result_descriptor: VectorResultDescriptor,
336
    shift: Shift,
337
}
338

339
pub struct InitializedRasterTimeShift<Shift: TimeShiftOperation> {
340
    name: CanonicOperatorName,
341
    source: Box<dyn InitializedRasterOperator>,
342
    result_descriptor: RasterResultDescriptor,
343
    shift: Shift,
344
}
345

346
impl<Shift: TimeShiftOperation + 'static> InitializedVectorOperator
347
    for InitializedVectorTimeShift<Shift>
348
{
349
    fn result_descriptor(&self) -> &VectorResultDescriptor {
×
350
        &self.result_descriptor
×
351
    }
×
352

353
    fn query_processor(&self) -> Result<TypedVectorQueryProcessor> {
2✔
354
        let source_processor = self.source.query_processor()?;
2✔
355

356
        Ok(
357
            call_on_generic_vector_processor!(source_processor, processor => VectorTimeShiftProcessor {
2✔
358
                processor,
2✔
359
                shift: self.shift,
2✔
360
            }.boxed().into()),
2✔
361
        )
362
    }
2✔
363

364
    fn canonic_name(&self) -> CanonicOperatorName {
×
365
        self.name.clone()
×
366
    }
×
367
}
368

369
impl<Shift: TimeShiftOperation + 'static> InitializedRasterOperator
370
    for InitializedRasterTimeShift<Shift>
371
{
372
    fn result_descriptor(&self) -> &RasterResultDescriptor {
3✔
373
        &self.result_descriptor
3✔
374
    }
3✔
375

376
    fn query_processor(&self) -> Result<TypedRasterQueryProcessor> {
6✔
377
        let source_processor = self.source.query_processor()?;
6✔
378

379
        Ok(
380
            call_on_generic_raster_processor!(source_processor, processor => RasterTimeShiftProcessor {
6✔
381
                processor,
6✔
382
                shift: self.shift,
6✔
383
            }.boxed().into()),
6✔
384
        )
385
    }
6✔
386

387
    fn canonic_name(&self) -> CanonicOperatorName {
×
388
        self.name.clone()
×
389
    }
×
390
}
391

392
pub struct RasterTimeShiftProcessor<Q, P, Shift: TimeShiftOperation>
393
where
394
    Q: RasterQueryProcessor<RasterType = P>,
395
{
396
    processor: Q,
397
    shift: Shift,
398
}
399

400
pub struct VectorTimeShiftProcessor<Q, G, Shift: TimeShiftOperation>
401
where
402
    G: Geometry,
403
    Q: VectorQueryProcessor<VectorType = FeatureCollection<G>>,
404
{
405
    processor: Q,
406
    shift: Shift,
407
}
408

409
#[async_trait]
410
impl<Q, G, Shift> VectorQueryProcessor for VectorTimeShiftProcessor<Q, G, Shift>
411
where
412
    G: Geometry + ArrowTyped + 'static,
413
    Q: VectorQueryProcessor<VectorType = FeatureCollection<G>>,
414
    Shift: TimeShiftOperation + 'static,
415
{
416
    type VectorType = FeatureCollection<G>;
417

418
    async fn vector_query<'a>(
2✔
419
        &'a self,
2✔
420
        query: VectorQueryRectangle,
2✔
421
        ctx: &'a dyn QueryContext,
2✔
422
    ) -> Result<BoxStream<'a, Result<Self::VectorType>>> {
2✔
423
        let (time_interval, state) = self.shift.shift(query.time_interval)?;
2✔
424

425
        let query = VectorQueryRectangle {
2✔
426
            spatial_bounds: query.spatial_bounds,
2✔
427
            time_interval,
2✔
428
            spatial_resolution: query.spatial_resolution,
2✔
429
        };
2✔
430
        let stream = self.processor.vector_query(query, ctx).await?;
2✔
431

432
        let stream = stream.then(move |collection| async move {
2✔
433
            let collection = collection?;
2✔
434
            let shift = self.shift;
2✔
435

2✔
436
            crate::util::spawn_blocking(move || {
2✔
437
                let time_intervals = collection
2✔
438
                    .time_intervals()
2✔
439
                    .iter()
2✔
440
                    .map(move |time| shift.reverse_shift(*time, state))
3✔
441
                    .collect::<Result<Vec<TimeInterval>, TimeShiftError>>()?;
2✔
442

443
                collection
2✔
444
                    .replace_time(&time_intervals)
2✔
445
                    .boxed_context(error::FeatureCollectionTimeModification)
2✔
446
                    .map_err(Into::into)
2✔
447
            })
2✔
448
            .await?
2✔
449
        });
2✔
450

2✔
451
        Ok(stream.boxed())
2✔
452
    }
4✔
453
}
454

455
#[async_trait]
456
impl<Q, P, Shift> RasterQueryProcessor for RasterTimeShiftProcessor<Q, P, Shift>
457
where
458
    Q: RasterQueryProcessor<RasterType = P>,
459
    P: Pixel,
460
    Shift: TimeShiftOperation,
461
{
462
    type RasterType = P;
463

464
    async fn raster_query<'a>(
6✔
465
        &'a self,
6✔
466
        query: RasterQueryRectangle,
6✔
467
        ctx: &'a dyn QueryContext,
6✔
468
    ) -> Result<BoxStream<'a, Result<RasterTile2D<Self::RasterType>>>> {
6✔
469
        let (time_interval, state) = self.shift.shift(query.time_interval)?;
6✔
470
        let query = RasterQueryRectangle {
6✔
471
            spatial_bounds: query.spatial_bounds,
6✔
472
            time_interval,
6✔
473
            spatial_resolution: query.spatial_resolution,
6✔
474
        };
6✔
475
        let stream = self.processor.raster_query(query, ctx).await?;
6✔
476

477
        let stream = stream.map(move |raster| {
16✔
478
            // reverse time shift for results
479
            let mut raster = raster?;
16✔
480

481
            raster.time = self.shift.reverse_shift(raster.time, state)?;
16✔
482

483
            Ok(raster)
16✔
484
        });
16✔
485

6✔
486
        Ok(Box::pin(stream))
6✔
487
    }
12✔
488
}
489

490
#[cfg(test)]
491
mod tests {
492
    use super::*;
493

494
    use crate::{
495
        engine::{MockExecutionContext, MockQueryContext},
496
        mock::{MockFeatureCollectionSource, MockRasterSource, MockRasterSourceParams},
497
        processing::{Expression, ExpressionParams, ExpressionSources},
498
        source::{GdalSource, GdalSourceParameters},
499
        util::{gdal::add_ndvi_dataset, input::RasterOrVectorOperator},
500
    };
501
    use futures::StreamExt;
502
    use geoengine_datatypes::{
503
        collections::MultiPointCollection,
504
        dataset::DatasetId,
505
        primitives::{
506
            BoundingBox2D, DateTime, Measurement, MultiPoint, SpatialPartition2D,
507
            SpatialResolution, TimeGranularity,
508
        },
509
        raster::{EmptyGrid2D, GridOrEmpty, RasterDataType, TileInformation, TilingSpecification},
510
        spatial_reference::SpatialReference,
511
        util::test::TestDefault,
512
    };
513

514
    #[test]
1✔
515
    fn test_ser_de_absolute() {
1✔
516
        let time_shift = TimeShift {
1✔
517
            sources: SingleRasterOrVectorSource {
1✔
518
                source: RasterOrVectorOperator::Raster(
1✔
519
                    GdalSource {
1✔
520
                        params: GdalSourceParameters {
1✔
521
                            data: DatasetId::from_u128(1337).into(),
1✔
522
                        },
1✔
523
                    }
1✔
524
                    .boxed(),
1✔
525
                ),
1✔
526
            },
1✔
527
            params: TimeShiftParams::Absolute {
1✔
528
                time_interval: TimeInterval::new_unchecked(
1✔
529
                    DateTime::new_utc(2011, 1, 1, 0, 0, 0),
1✔
530
                    DateTime::new_utc(2012, 1, 1, 0, 0, 0),
1✔
531
                ),
1✔
532
            },
1✔
533
        };
1✔
534

1✔
535
        let serialized = serde_json::to_value(&time_shift).unwrap();
1✔
536

1✔
537
        assert_eq!(
1✔
538
            serialized,
1✔
539
            serde_json::json!({
1✔
540
                "params": {
1✔
541
                    "type": "absolute",
1✔
542
                    "timeInterval": {
1✔
543
                        "start": 1_293_840_000_000_i64,
1✔
544
                        "end": 1_325_376_000_000_i64
1✔
545
                    }
1✔
546
                },
1✔
547
                "sources": {
1✔
548
                    "source": {
1✔
549
                        "type": "GdalSource",
1✔
550
                        "params": {
1✔
551
                            "data": {
1✔
552
                                "type": "internal",
1✔
553
                                "datasetId": "00000000-0000-0000-0000-000000000539"
1✔
554
                            }
1✔
555
                        }
1✔
556
                    }
1✔
557
                }
1✔
558
            })
1✔
559
        );
1✔
560

561
        let deserialized: TimeShift = serde_json::from_value(serialized).unwrap();
1✔
562

1✔
563
        assert_eq!(time_shift.params, deserialized.params);
1✔
564
    }
1✔
565

566
    #[test]
1✔
567
    fn test_ser_de_relative() {
1✔
568
        let time_shift = TimeShift {
1✔
569
            sources: SingleRasterOrVectorSource {
1✔
570
                source: RasterOrVectorOperator::Raster(
1✔
571
                    GdalSource {
1✔
572
                        params: GdalSourceParameters {
1✔
573
                            data: DatasetId::from_u128(1337).into(),
1✔
574
                        },
1✔
575
                    }
1✔
576
                    .boxed(),
1✔
577
                ),
1✔
578
            },
1✔
579
            params: TimeShiftParams::Relative {
1✔
580
                granularity: TimeGranularity::Years,
1✔
581
                value: 1,
1✔
582
            },
1✔
583
        };
1✔
584

1✔
585
        let serialized = serde_json::to_value(&time_shift).unwrap();
1✔
586

1✔
587
        assert_eq!(
1✔
588
            serialized,
1✔
589
            serde_json::json!({
1✔
590
                "params": {
1✔
591
                    "type": "relative",
1✔
592
                    "granularity": "years",
1✔
593
                    "value": 1
1✔
594
                },
1✔
595
                "sources": {
1✔
596
                    "source": {
1✔
597
                        "type": "GdalSource",
1✔
598
                        "params": {
1✔
599
                            "data": {
1✔
600
                                "type": "internal",
1✔
601
                                "datasetId": "00000000-0000-0000-0000-000000000539"
1✔
602
                            }
1✔
603
                        }
1✔
604
                    }
1✔
605
                }
1✔
606
            })
1✔
607
        );
1✔
608

609
        let deserialized: TimeShift = serde_json::from_value(serialized).unwrap();
1✔
610

1✔
611
        assert_eq!(time_shift.params, deserialized.params);
1✔
612
    }
1✔
613

614
    #[tokio::test]
1✔
615
    async fn test_absolute_vector_shift() {
1✔
616
        let execution_context = MockExecutionContext::test_default();
1✔
617
        let query_context = MockQueryContext::test_default();
1✔
618

1✔
619
        let source = MockFeatureCollectionSource::single(
1✔
620
            MultiPointCollection::from_data(
1✔
621
                MultiPoint::many(vec![(0., 0.), (1., 1.), (2., 2.)]).unwrap(),
1✔
622
                vec![
1✔
623
                    TimeInterval::new(
1✔
624
                        DateTime::new_utc(2009, 1, 1, 0, 0, 0),
1✔
625
                        DateTime::new_utc_with_millis(2010, 12, 31, 23, 59, 59, 999),
1✔
626
                    )
1✔
627
                    .unwrap(),
1✔
628
                    TimeInterval::new(
1✔
629
                        DateTime::new_utc(2009, 6, 3, 0, 0, 0),
1✔
630
                        DateTime::new_utc(2010, 7, 14, 0, 0, 0),
1✔
631
                    )
1✔
632
                    .unwrap(),
1✔
633
                    TimeInterval::new(
1✔
634
                        DateTime::new_utc(2010, 1, 1, 0, 0, 0),
1✔
635
                        DateTime::new_utc_with_millis(2011, 3, 31, 23, 59, 59, 999),
1✔
636
                    )
1✔
637
                    .unwrap(),
1✔
638
                ],
1✔
639
                Default::default(),
1✔
640
            )
1✔
641
            .unwrap(),
1✔
642
        );
1✔
643

1✔
644
        let time_shift = TimeShift {
1✔
645
            sources: SingleRasterOrVectorSource {
1✔
646
                source: RasterOrVectorOperator::Vector(source.boxed()),
1✔
647
            },
1✔
648
            params: TimeShiftParams::Absolute {
1✔
649
                time_interval: TimeInterval::new(
1✔
650
                    DateTime::new_utc(2009, 1, 1, 0, 0, 0),
1✔
651
                    DateTime::new_utc(2009, 6, 1, 0, 0, 0),
1✔
652
                )
1✔
653
                .unwrap(),
1✔
654
            },
1✔
655
        };
1✔
656

657
        let query_processor = VectorOperator::boxed(time_shift)
1✔
658
            .initialize(WorkflowOperatorPath::initialize_root(), &execution_context)
1✔
659
            .await
×
660
            .unwrap()
1✔
661
            .query_processor()
1✔
662
            .unwrap()
1✔
663
            .multi_point()
1✔
664
            .unwrap();
1✔
665

666
        let mut stream = query_processor
1✔
667
            .vector_query(
1✔
668
                VectorQueryRectangle {
1✔
669
                    spatial_bounds: BoundingBox2D::new((0., 0.).into(), (2., 2.).into()).unwrap(),
1✔
670
                    time_interval: TimeInterval::new(
1✔
671
                        DateTime::new_utc(2009, 1, 1, 0, 0, 0),
1✔
672
                        DateTime::new_utc(2012, 1, 1, 0, 0, 0),
1✔
673
                    )
1✔
674
                    .unwrap(),
1✔
675
                    spatial_resolution: SpatialResolution::one(),
1✔
676
                },
1✔
677
                &query_context,
1✔
678
            )
1✔
679
            .await
×
680
            .unwrap();
1✔
681

1✔
682
        let mut result = Vec::new();
1✔
683
        while let Some(collection) = stream.next().await {
2✔
684
            result.push(collection.unwrap());
1✔
685
        }
1✔
686

687
        assert_eq!(result.len(), 1);
1✔
688

689
        let expected = MultiPointCollection::from_data(
1✔
690
            MultiPoint::many(vec![(0., 0.)]).unwrap(),
1✔
691
            vec![TimeInterval::new(
1✔
692
                DateTime::new_utc(2009, 1, 1, 0, 0, 0),
1✔
693
                DateTime::new_utc_with_millis(2013, 8, 1, 23, 59, 59, 999),
1✔
694
            )
1✔
695
            .unwrap()],
1✔
696
            Default::default(),
1✔
697
        )
1✔
698
        .unwrap();
1✔
699

1✔
700
        assert_eq!(result[0], expected);
1✔
701
    }
702

703
    #[tokio::test]
1✔
704
    async fn test_relative_vector_shift() {
1✔
705
        let execution_context = MockExecutionContext::test_default();
1✔
706
        let query_context = MockQueryContext::test_default();
1✔
707

1✔
708
        let source = MockFeatureCollectionSource::single(
1✔
709
            MultiPointCollection::from_data(
1✔
710
                MultiPoint::many(vec![(0., 0.), (1., 1.), (2., 2.)]).unwrap(),
1✔
711
                vec![
1✔
712
                    TimeInterval::new(
1✔
713
                        DateTime::new_utc(2009, 1, 1, 0, 0, 0),
1✔
714
                        DateTime::new_utc_with_millis(2010, 12, 31, 23, 59, 59, 999),
1✔
715
                    )
1✔
716
                    .unwrap(),
1✔
717
                    TimeInterval::new(
1✔
718
                        DateTime::new_utc(2009, 6, 3, 0, 0, 0),
1✔
719
                        DateTime::new_utc(2010, 7, 14, 0, 0, 0),
1✔
720
                    )
1✔
721
                    .unwrap(),
1✔
722
                    TimeInterval::new(
1✔
723
                        DateTime::new_utc(2010, 1, 1, 0, 0, 0),
1✔
724
                        DateTime::new_utc_with_millis(2011, 3, 31, 23, 59, 59, 999),
1✔
725
                    )
1✔
726
                    .unwrap(),
1✔
727
                ],
1✔
728
                Default::default(),
1✔
729
            )
1✔
730
            .unwrap(),
1✔
731
        );
1✔
732

1✔
733
        let time_shift = TimeShift {
1✔
734
            sources: SingleRasterOrVectorSource {
1✔
735
                source: RasterOrVectorOperator::Vector(source.boxed()),
1✔
736
            },
1✔
737
            params: TimeShiftParams::Relative {
1✔
738
                granularity: TimeGranularity::Years,
1✔
739
                value: -1,
1✔
740
            },
1✔
741
        };
1✔
742

743
        let query_processor = VectorOperator::boxed(time_shift)
1✔
744
            .initialize(WorkflowOperatorPath::initialize_root(), &execution_context)
1✔
745
            .await
×
746
            .unwrap()
1✔
747
            .query_processor()
1✔
748
            .unwrap()
1✔
749
            .multi_point()
1✔
750
            .unwrap();
1✔
751

752
        let mut stream = query_processor
1✔
753
            .vector_query(
1✔
754
                VectorQueryRectangle {
1✔
755
                    spatial_bounds: BoundingBox2D::new((0., 0.).into(), (2., 2.).into()).unwrap(),
1✔
756
                    time_interval: TimeInterval::new(
1✔
757
                        DateTime::new_utc(2010, 1, 1, 0, 0, 0),
1✔
758
                        DateTime::new_utc(2011, 1, 1, 0, 0, 0),
1✔
759
                    )
1✔
760
                    .unwrap(),
1✔
761
                    spatial_resolution: SpatialResolution::one(),
1✔
762
                },
1✔
763
                &query_context,
1✔
764
            )
1✔
765
            .await
×
766
            .unwrap();
1✔
767

1✔
768
        let mut result = Vec::new();
1✔
769
        while let Some(collection) = stream.next().await {
2✔
770
            result.push(collection.unwrap());
1✔
771
        }
1✔
772

773
        assert_eq!(result.len(), 1);
1✔
774

775
        let expected = MultiPointCollection::from_data(
1✔
776
            MultiPoint::many(vec![(0., 0.), (1., 1.)]).unwrap(),
1✔
777
            vec![
1✔
778
                TimeInterval::new(
1✔
779
                    DateTime::new_utc(2010, 1, 1, 0, 0, 0),
1✔
780
                    DateTime::new_utc_with_millis(2011, 12, 31, 23, 59, 59, 999),
1✔
781
                )
1✔
782
                .unwrap(),
1✔
783
                TimeInterval::new(
1✔
784
                    DateTime::new_utc(2010, 6, 3, 0, 0, 0),
1✔
785
                    DateTime::new_utc(2011, 7, 14, 0, 0, 0),
1✔
786
                )
1✔
787
                .unwrap(),
1✔
788
            ],
1✔
789
            Default::default(),
1✔
790
        )
1✔
791
        .unwrap();
1✔
792

1✔
793
        assert_eq!(result[0], expected);
1✔
794
    }
795

796
    #[tokio::test]
1✔
797
    #[allow(clippy::too_many_lines)]
798
    async fn test_absolute_raster_shift() {
1✔
799
        let empty_grid = GridOrEmpty::Empty(EmptyGrid2D::<u8>::new([3, 2].into()));
1✔
800
        let raster_tiles = vec![
1✔
801
            RasterTile2D::new_with_tile_info(
1✔
802
                TimeInterval::new_unchecked(
1✔
803
                    DateTime::new_utc(2010, 1, 1, 0, 0, 0),
1✔
804
                    DateTime::new_utc(2011, 1, 1, 0, 0, 0),
1✔
805
                ),
1✔
806
                TileInformation {
1✔
807
                    global_tile_position: [-1, 0].into(),
1✔
808
                    tile_size_in_pixels: [3, 2].into(),
1✔
809
                    global_geo_transform: TestDefault::test_default(),
1✔
810
                },
1✔
811
                empty_grid.clone(),
1✔
812
            ),
1✔
813
            RasterTile2D::new_with_tile_info(
1✔
814
                TimeInterval::new_unchecked(
1✔
815
                    DateTime::new_utc(2010, 1, 1, 0, 0, 0),
1✔
816
                    DateTime::new_utc(2011, 1, 1, 0, 0, 0),
1✔
817
                ),
1✔
818
                TileInformation {
1✔
819
                    global_tile_position: [-1, 1].into(),
1✔
820
                    tile_size_in_pixels: [3, 2].into(),
1✔
821
                    global_geo_transform: TestDefault::test_default(),
1✔
822
                },
1✔
823
                empty_grid.clone(),
1✔
824
            ),
1✔
825
            RasterTile2D::new_with_tile_info(
1✔
826
                TimeInterval::new_unchecked(
1✔
827
                    DateTime::new_utc(2011, 1, 1, 0, 0, 0),
1✔
828
                    DateTime::new_utc(2012, 1, 1, 0, 0, 0),
1✔
829
                ),
1✔
830
                TileInformation {
1✔
831
                    global_tile_position: [-1, 0].into(),
1✔
832
                    tile_size_in_pixels: [3, 2].into(),
1✔
833
                    global_geo_transform: TestDefault::test_default(),
1✔
834
                },
1✔
835
                empty_grid.clone(),
1✔
836
            ),
1✔
837
            RasterTile2D::new_with_tile_info(
1✔
838
                TimeInterval::new_unchecked(
1✔
839
                    DateTime::new_utc(2011, 1, 1, 0, 0, 0),
1✔
840
                    DateTime::new_utc(2012, 1, 1, 0, 0, 0),
1✔
841
                ),
1✔
842
                TileInformation {
1✔
843
                    global_tile_position: [-1, 1].into(),
1✔
844
                    tile_size_in_pixels: [3, 2].into(),
1✔
845
                    global_geo_transform: TestDefault::test_default(),
1✔
846
                },
1✔
847
                empty_grid.clone(),
1✔
848
            ),
1✔
849
            RasterTile2D::new_with_tile_info(
1✔
850
                TimeInterval::new_unchecked(
1✔
851
                    DateTime::new_utc(2012, 1, 1, 0, 0, 0),
1✔
852
                    DateTime::new_utc(2013, 1, 1, 0, 0, 0),
1✔
853
                ),
1✔
854
                TileInformation {
1✔
855
                    global_tile_position: [-1, 0].into(),
1✔
856
                    tile_size_in_pixels: [3, 2].into(),
1✔
857
                    global_geo_transform: TestDefault::test_default(),
1✔
858
                },
1✔
859
                empty_grid.clone(),
1✔
860
            ),
1✔
861
            RasterTile2D::new_with_tile_info(
1✔
862
                TimeInterval::new_unchecked(
1✔
863
                    DateTime::new_utc(2012, 1, 1, 0, 0, 0),
1✔
864
                    DateTime::new_utc(2013, 1, 1, 0, 0, 0),
1✔
865
                ),
1✔
866
                TileInformation {
1✔
867
                    global_tile_position: [-1, 1].into(),
1✔
868
                    tile_size_in_pixels: [3, 2].into(),
1✔
869
                    global_geo_transform: TestDefault::test_default(),
1✔
870
                },
1✔
871
                empty_grid.clone(),
1✔
872
            ),
1✔
873
        ];
1✔
874

1✔
875
        let mrs = MockRasterSource {
1✔
876
            params: MockRasterSourceParams {
1✔
877
                data: raster_tiles,
1✔
878
                result_descriptor: RasterResultDescriptor {
1✔
879
                    data_type: RasterDataType::U8,
1✔
880
                    spatial_reference: SpatialReference::epsg_4326().into(),
1✔
881
                    measurement: Measurement::Unitless,
1✔
882
                    time: None,
1✔
883
                    bbox: None,
1✔
884
                    resolution: None,
1✔
885
                },
1✔
886
            },
1✔
887
        }
1✔
888
        .boxed();
1✔
889

1✔
890
        let time_shift = TimeShift {
1✔
891
            sources: SingleRasterOrVectorSource {
1✔
892
                source: RasterOrVectorOperator::Raster(mrs),
1✔
893
            },
1✔
894
            params: TimeShiftParams::Absolute {
1✔
895
                time_interval: TimeInterval::new_unchecked(
1✔
896
                    DateTime::new_utc(2011, 1, 1, 0, 0, 0),
1✔
897
                    DateTime::new_utc(2012, 1, 1, 0, 0, 0),
1✔
898
                ),
1✔
899
            },
1✔
900
        };
1✔
901

1✔
902
        let execution_context = MockExecutionContext::new_with_tiling_spec(
1✔
903
            TilingSpecification::new((0., 0.).into(), [3, 2].into()),
1✔
904
        );
1✔
905
        let query_context = MockQueryContext::test_default();
1✔
906

907
        let query_processor = RasterOperator::boxed(time_shift)
1✔
908
            .initialize(WorkflowOperatorPath::initialize_root(), &execution_context)
1✔
909
            .await
×
910
            .unwrap()
1✔
911
            .query_processor()
1✔
912
            .unwrap()
1✔
913
            .get_u8()
1✔
914
            .unwrap();
1✔
915

916
        let mut stream = query_processor
1✔
917
            .raster_query(
1✔
918
                RasterQueryRectangle {
1✔
919
                    spatial_bounds: SpatialPartition2D::new_unchecked(
1✔
920
                        (0., 3.).into(),
1✔
921
                        (4., 0.).into(),
1✔
922
                    ),
1✔
923
                    time_interval: TimeInterval::new(
1✔
924
                        DateTime::new_utc(2010, 1, 1, 0, 0, 0),
1✔
925
                        DateTime::new_utc(2011, 1, 1, 0, 0, 0),
1✔
926
                    )
1✔
927
                    .unwrap(),
1✔
928
                    spatial_resolution: SpatialResolution::one(),
1✔
929
                },
1✔
930
                &query_context,
1✔
931
            )
1✔
932
            .await
×
933
            .unwrap();
1✔
934

1✔
935
        let mut result = Vec::new();
1✔
936
        while let Some(tile) = stream.next().await {
3✔
937
            result.push(tile.unwrap());
2✔
938
        }
2✔
939

940
        assert_eq!(result.len(), 2);
1✔
941

942
        assert_eq!(
1✔
943
            result[0].time,
1✔
944
            TimeInterval::new_unchecked(
1✔
945
                DateTime::new_utc(2010, 1, 1, 0, 0, 0),
1✔
946
                DateTime::new_utc(2011, 1, 1, 0, 0, 0),
1✔
947
            ),
1✔
948
        );
1✔
949
        assert_eq!(
1✔
950
            result[1].time,
1✔
951
            TimeInterval::new_unchecked(
1✔
952
                DateTime::new_utc(2010, 1, 1, 0, 0, 0),
1✔
953
                DateTime::new_utc(2011, 1, 1, 0, 0, 0),
1✔
954
            ),
1✔
955
        );
1✔
956
    }
957

958
    #[tokio::test]
1✔
959
    #[allow(clippy::too_many_lines)]
960
    async fn test_relative_raster_shift() {
1✔
961
        let empty_grid = GridOrEmpty::Empty(EmptyGrid2D::<u8>::new([3, 2].into()));
1✔
962
        let raster_tiles = vec![
1✔
963
            RasterTile2D::new_with_tile_info(
1✔
964
                TimeInterval::new_unchecked(
1✔
965
                    DateTime::new_utc(2010, 1, 1, 0, 0, 0),
1✔
966
                    DateTime::new_utc(2011, 1, 1, 0, 0, 0),
1✔
967
                ),
1✔
968
                TileInformation {
1✔
969
                    global_tile_position: [-1, 0].into(),
1✔
970
                    tile_size_in_pixels: [3, 2].into(),
1✔
971
                    global_geo_transform: TestDefault::test_default(),
1✔
972
                },
1✔
973
                empty_grid.clone(),
1✔
974
            ),
1✔
975
            RasterTile2D::new_with_tile_info(
1✔
976
                TimeInterval::new_unchecked(
1✔
977
                    DateTime::new_utc(2010, 1, 1, 0, 0, 0),
1✔
978
                    DateTime::new_utc(2011, 1, 1, 0, 0, 0),
1✔
979
                ),
1✔
980
                TileInformation {
1✔
981
                    global_tile_position: [-1, 1].into(),
1✔
982
                    tile_size_in_pixels: [3, 2].into(),
1✔
983
                    global_geo_transform: TestDefault::test_default(),
1✔
984
                },
1✔
985
                empty_grid.clone(),
1✔
986
            ),
1✔
987
            RasterTile2D::new_with_tile_info(
1✔
988
                TimeInterval::new_unchecked(
1✔
989
                    DateTime::new_utc(2011, 1, 1, 0, 0, 0),
1✔
990
                    DateTime::new_utc(2012, 1, 1, 0, 0, 0),
1✔
991
                ),
1✔
992
                TileInformation {
1✔
993
                    global_tile_position: [-1, 0].into(),
1✔
994
                    tile_size_in_pixels: [3, 2].into(),
1✔
995
                    global_geo_transform: TestDefault::test_default(),
1✔
996
                },
1✔
997
                empty_grid.clone(),
1✔
998
            ),
1✔
999
            RasterTile2D::new_with_tile_info(
1✔
1000
                TimeInterval::new_unchecked(
1✔
1001
                    DateTime::new_utc(2011, 1, 1, 0, 0, 0),
1✔
1002
                    DateTime::new_utc(2012, 1, 1, 0, 0, 0),
1✔
1003
                ),
1✔
1004
                TileInformation {
1✔
1005
                    global_tile_position: [-1, 1].into(),
1✔
1006
                    tile_size_in_pixels: [3, 2].into(),
1✔
1007
                    global_geo_transform: TestDefault::test_default(),
1✔
1008
                },
1✔
1009
                empty_grid.clone(),
1✔
1010
            ),
1✔
1011
            RasterTile2D::new_with_tile_info(
1✔
1012
                TimeInterval::new_unchecked(
1✔
1013
                    DateTime::new_utc(2012, 1, 1, 0, 0, 0),
1✔
1014
                    DateTime::new_utc(2013, 1, 1, 0, 0, 0),
1✔
1015
                ),
1✔
1016
                TileInformation {
1✔
1017
                    global_tile_position: [-1, 0].into(),
1✔
1018
                    tile_size_in_pixels: [3, 2].into(),
1✔
1019
                    global_geo_transform: TestDefault::test_default(),
1✔
1020
                },
1✔
1021
                empty_grid.clone(),
1✔
1022
            ),
1✔
1023
            RasterTile2D::new_with_tile_info(
1✔
1024
                TimeInterval::new_unchecked(
1✔
1025
                    DateTime::new_utc(2012, 1, 1, 0, 0, 0),
1✔
1026
                    DateTime::new_utc(2013, 1, 1, 0, 0, 0),
1✔
1027
                ),
1✔
1028
                TileInformation {
1✔
1029
                    global_tile_position: [-1, 1].into(),
1✔
1030
                    tile_size_in_pixels: [3, 2].into(),
1✔
1031
                    global_geo_transform: TestDefault::test_default(),
1✔
1032
                },
1✔
1033
                empty_grid.clone(),
1✔
1034
            ),
1✔
1035
        ];
1✔
1036

1✔
1037
        let mrs = MockRasterSource {
1✔
1038
            params: MockRasterSourceParams {
1✔
1039
                data: raster_tiles,
1✔
1040
                result_descriptor: RasterResultDescriptor {
1✔
1041
                    data_type: RasterDataType::U8,
1✔
1042
                    spatial_reference: SpatialReference::epsg_4326().into(),
1✔
1043
                    measurement: Measurement::Unitless,
1✔
1044
                    time: None,
1✔
1045
                    bbox: None,
1✔
1046
                    resolution: None,
1✔
1047
                },
1✔
1048
            },
1✔
1049
        }
1✔
1050
        .boxed();
1✔
1051

1✔
1052
        let time_shift = TimeShift {
1✔
1053
            sources: SingleRasterOrVectorSource {
1✔
1054
                source: RasterOrVectorOperator::Raster(mrs),
1✔
1055
            },
1✔
1056
            params: TimeShiftParams::Relative {
1✔
1057
                granularity: TimeGranularity::Years,
1✔
1058
                value: 1,
1✔
1059
            },
1✔
1060
        };
1✔
1061

1✔
1062
        let execution_context = MockExecutionContext::new_with_tiling_spec(
1✔
1063
            TilingSpecification::new((0., 0.).into(), [3, 2].into()),
1✔
1064
        );
1✔
1065
        let query_context = MockQueryContext::test_default();
1✔
1066

1067
        let query_processor = RasterOperator::boxed(time_shift)
1✔
1068
            .initialize(WorkflowOperatorPath::initialize_root(), &execution_context)
1✔
1069
            .await
×
1070
            .unwrap()
1✔
1071
            .query_processor()
1✔
1072
            .unwrap()
1✔
1073
            .get_u8()
1✔
1074
            .unwrap();
1✔
1075

1076
        let mut stream = query_processor
1✔
1077
            .raster_query(
1✔
1078
                RasterQueryRectangle {
1✔
1079
                    spatial_bounds: SpatialPartition2D::new_unchecked(
1✔
1080
                        (0., 3.).into(),
1✔
1081
                        (4., 0.).into(),
1✔
1082
                    ),
1✔
1083
                    time_interval: TimeInterval::new(
1✔
1084
                        DateTime::new_utc(2010, 1, 1, 0, 0, 0),
1✔
1085
                        DateTime::new_utc(2011, 1, 1, 0, 0, 0),
1✔
1086
                    )
1✔
1087
                    .unwrap(),
1✔
1088
                    spatial_resolution: SpatialResolution::one(),
1✔
1089
                },
1✔
1090
                &query_context,
1✔
1091
            )
1✔
1092
            .await
×
1093
            .unwrap();
1✔
1094

1✔
1095
        let mut result = Vec::new();
1✔
1096
        while let Some(tile) = stream.next().await {
3✔
1097
            result.push(tile.unwrap());
2✔
1098
        }
2✔
1099

1100
        assert_eq!(result.len(), 2);
1✔
1101

1102
        assert_eq!(
1✔
1103
            result[0].time,
1✔
1104
            TimeInterval::new_unchecked(
1✔
1105
                DateTime::new_utc(2010, 1, 1, 0, 0, 0),
1✔
1106
                DateTime::new_utc(2011, 1, 1, 0, 0, 0),
1✔
1107
            ),
1✔
1108
        );
1✔
1109
        assert_eq!(
1✔
1110
            result[1].time,
1✔
1111
            TimeInterval::new_unchecked(
1✔
1112
                DateTime::new_utc(2010, 1, 1, 0, 0, 0),
1✔
1113
                DateTime::new_utc(2011, 1, 1, 0, 0, 0),
1✔
1114
            ),
1✔
1115
        );
1✔
1116
    }
1117

1118
    #[tokio::test]
1✔
1119
    async fn test_expression_on_shifted_raster() {
1✔
1120
        let mut execution_context = MockExecutionContext::test_default();
1✔
1121

1✔
1122
        let ndvi_source = GdalSource {
1✔
1123
            params: GdalSourceParameters {
1✔
1124
                data: add_ndvi_dataset(&mut execution_context),
1✔
1125
            },
1✔
1126
        }
1✔
1127
        .boxed();
1✔
1128

1✔
1129
        let shifted_ndvi_source = RasterOperator::boxed(TimeShift {
1✔
1130
            params: TimeShiftParams::Relative {
1✔
1131
                granularity: TimeGranularity::Months,
1✔
1132
                value: -1,
1✔
1133
            },
1✔
1134
            sources: SingleRasterOrVectorSource {
1✔
1135
                source: RasterOrVectorOperator::Raster(ndvi_source.clone()),
1✔
1136
            },
1✔
1137
        });
1✔
1138

1✔
1139
        let expression = Expression {
1✔
1140
            params: ExpressionParams {
1✔
1141
                expression: "A - B".to_string(),
1✔
1142
                output_type: RasterDataType::F64,
1✔
1143
                output_measurement: None,
1✔
1144
                map_no_data: false,
1✔
1145
            },
1✔
1146
            sources: ExpressionSources::new_a_b(ndvi_source, shifted_ndvi_source),
1✔
1147
        }
1✔
1148
        .boxed();
1✔
1149

1150
        let query_processor = expression
1✔
1151
            .initialize(WorkflowOperatorPath::initialize_root(), &execution_context)
1✔
1152
            .await
×
1153
            .unwrap()
1✔
1154
            .query_processor()
1✔
1155
            .unwrap()
1✔
1156
            .get_f64()
1✔
1157
            .unwrap();
1✔
1158

1✔
1159
        let query_context = MockQueryContext::test_default();
1✔
1160

1161
        let mut stream = query_processor
1✔
1162
            .raster_query(
1✔
1163
                RasterQueryRectangle {
1✔
1164
                    spatial_bounds: SpatialPartition2D::new_unchecked(
1✔
1165
                        (-180., 90.).into(),
1✔
1166
                        (180., -90.).into(),
1✔
1167
                    ),
1✔
1168
                    time_interval: TimeInterval::new_instant(DateTime::new_utc(
1✔
1169
                        2014, 3, 1, 0, 0, 0,
1✔
1170
                    ))
1✔
1171
                    .unwrap(),
1✔
1172
                    spatial_resolution: SpatialResolution::one(),
1✔
1173
                },
1✔
1174
                &query_context,
1✔
1175
            )
1✔
1176
            .await
×
1177
            .unwrap();
1✔
1178

1✔
1179
        let mut result = Vec::new();
1✔
1180
        while let Some(tile) = stream.next().await {
20✔
1181
            result.push(tile.unwrap());
4✔
1182
        }
4✔
1183

1184
        assert_eq!(result.len(), 4);
1✔
1185
        assert_eq!(
1✔
1186
            result[0].time,
1✔
1187
            TimeInterval::new(
1✔
1188
                DateTime::new_utc(2014, 3, 1, 0, 0, 0),
1✔
1189
                DateTime::new_utc(2014, 4, 1, 0, 0, 0)
1✔
1190
            )
1✔
1191
            .unwrap()
1✔
1192
        );
1✔
1193
    }
1194

1195
    #[tokio::test]
1✔
1196
    async fn test_expression_on_absolute_shifted_raster() {
1✔
1197
        let mut execution_context = MockExecutionContext::test_default();
1✔
1198

1✔
1199
        let ndvi_source = GdalSource {
1✔
1200
            params: GdalSourceParameters {
1✔
1201
                data: add_ndvi_dataset(&mut execution_context),
1✔
1202
            },
1✔
1203
        }
1✔
1204
        .boxed();
1✔
1205

1✔
1206
        let shifted_ndvi_source = RasterOperator::boxed(TimeShift {
1✔
1207
            params: TimeShiftParams::Absolute {
1✔
1208
                time_interval: TimeInterval::new_instant(DateTime::new_utc(2014, 5, 1, 0, 0, 0))
1✔
1209
                    .unwrap(),
1✔
1210
            },
1✔
1211
            sources: SingleRasterOrVectorSource {
1✔
1212
                source: RasterOrVectorOperator::Raster(ndvi_source),
1✔
1213
            },
1✔
1214
        });
1✔
1215

1216
        let query_processor = shifted_ndvi_source
1✔
1217
            .initialize(WorkflowOperatorPath::initialize_root(), &execution_context)
1✔
1218
            .await
×
1219
            .unwrap()
1✔
1220
            .query_processor()
1✔
1221
            .unwrap()
1✔
1222
            .get_u8()
1✔
1223
            .unwrap();
1✔
1224

1✔
1225
        let query_context = MockQueryContext::test_default();
1✔
1226

1227
        let mut stream = query_processor
1✔
1228
            .raster_query(
1✔
1229
                RasterQueryRectangle {
1✔
1230
                    spatial_bounds: SpatialPartition2D::new_unchecked(
1✔
1231
                        (-180., 90.).into(),
1✔
1232
                        (180., -90.).into(),
1✔
1233
                    ),
1✔
1234
                    time_interval: TimeInterval::new_instant(DateTime::new_utc(
1✔
1235
                        2014, 3, 1, 0, 0, 0,
1✔
1236
                    ))
1✔
1237
                    .unwrap(),
1✔
1238
                    spatial_resolution: SpatialResolution::one(),
1✔
1239
                },
1✔
1240
                &query_context,
1✔
1241
            )
1✔
1242
            .await
×
1243
            .unwrap();
1✔
1244

1✔
1245
        let mut result = Vec::new();
1✔
1246
        while let Some(tile) = stream.next().await {
5✔
1247
            result.push(tile.unwrap());
4✔
1248
        }
4✔
1249

1250
        assert_eq!(result.len(), 4);
1✔
1251
        assert_eq!(
1✔
1252
            result[0].time,
1✔
1253
            TimeInterval::new(
1✔
1254
                DateTime::new_utc(2014, 3, 1, 0, 0, 0),
1✔
1255
                DateTime::new_utc(2014, 4, 1, 0, 0, 0)
1✔
1256
            )
1✔
1257
            .unwrap()
1✔
1258
        );
1✔
1259
    }
1260

1261
    #[test]
1✔
1262
    fn shift_eternal() {
1✔
1263
        let eternal = TimeInterval::default();
1✔
1264

1✔
1265
        let f_shift = RelativeForwardShift {
1✔
1266
            step: TimeStep {
1✔
1267
                granularity: TimeGranularity::Seconds,
1✔
1268
                step: 1,
1✔
1269
            },
1✔
1270
        };
1✔
1271

1✔
1272
        let (shifted, state) = f_shift.shift(eternal).unwrap();
1✔
1273
        assert_eq!(shifted, eternal);
1✔
1274
        let reverse_shifted = f_shift.reverse_shift(eternal, state).unwrap();
1✔
1275
        assert_eq!(reverse_shifted, eternal);
1✔
1276

1277
        let b_shift = RelativeBackwardShift {
1✔
1278
            step: TimeStep {
1✔
1279
                granularity: TimeGranularity::Seconds,
1✔
1280
                step: 1,
1✔
1281
            },
1✔
1282
        };
1✔
1283

1✔
1284
        let (shifted, state) = b_shift.shift(eternal).unwrap();
1✔
1285
        assert_eq!(shifted, eternal);
1✔
1286

1287
        let reverse_shifted = b_shift.reverse_shift(eternal, state).unwrap();
1✔
1288
        assert_eq!(reverse_shifted, eternal);
1✔
1289
    }
1✔
1290

1291
    #[test]
1✔
1292
    fn shift_begin_of_time() {
1✔
1293
        let time = TimeInterval::new_unchecked(TimeInstance::MIN, TimeInstance::EPOCH_START);
1✔
1294

1✔
1295
        let f_shift = RelativeForwardShift {
1✔
1296
            step: TimeStep {
1✔
1297
                granularity: TimeGranularity::Seconds,
1✔
1298
                step: 1,
1✔
1299
            },
1✔
1300
        };
1✔
1301

1✔
1302
        let expected_shift =
1✔
1303
            TimeInterval::new_unchecked(TimeInstance::MIN, TimeInstance::EPOCH_START + 1_000);
1✔
1304

1✔
1305
        let (shifted, state) = f_shift.shift(time).unwrap();
1✔
1306
        assert_eq!(shifted, expected_shift);
1✔
1307

1308
        let reverse_shifted = f_shift.reverse_shift(expected_shift, state).unwrap();
1✔
1309
        assert_eq!(reverse_shifted, time);
1✔
1310

1311
        let f_shift = RelativeBackwardShift {
1✔
1312
            step: TimeStep {
1✔
1313
                granularity: TimeGranularity::Seconds,
1✔
1314
                step: 1,
1✔
1315
            },
1✔
1316
        };
1✔
1317

1✔
1318
        let expected_shift =
1✔
1319
            TimeInterval::new_unchecked(TimeInstance::MIN, TimeInstance::EPOCH_START - 1_000);
1✔
1320

1✔
1321
        let (shifted, state) = f_shift.shift(time).unwrap();
1✔
1322
        assert_eq!(shifted, expected_shift);
1✔
1323

1324
        let reverse_shifted = f_shift.reverse_shift(expected_shift, state).unwrap();
1✔
1325
        assert_eq!(reverse_shifted, time);
1✔
1326
    }
1✔
1327

1328
    #[test]
1✔
1329
    fn shift_end_of_time() {
1✔
1330
        let time = TimeInterval::new_unchecked(TimeInstance::EPOCH_START, TimeInstance::MAX);
1✔
1331

1✔
1332
        let f_shift = RelativeForwardShift {
1✔
1333
            step: TimeStep {
1✔
1334
                granularity: TimeGranularity::Seconds,
1✔
1335
                step: 1,
1✔
1336
            },
1✔
1337
        };
1✔
1338

1✔
1339
        let expected_shift =
1✔
1340
            TimeInterval::new_unchecked(TimeInstance::EPOCH_START + 1_000, TimeInstance::MAX);
1✔
1341

1✔
1342
        let (shifted, state) = f_shift.shift(time).unwrap();
1✔
1343
        assert_eq!(shifted, expected_shift);
1✔
1344

1345
        let reverse_shifted = f_shift.reverse_shift(expected_shift, state).unwrap();
1✔
1346
        assert_eq!(reverse_shifted, time);
1✔
1347

1348
        let f_shift = RelativeBackwardShift {
1✔
1349
            step: TimeStep {
1✔
1350
                granularity: TimeGranularity::Seconds,
1✔
1351
                step: 1,
1✔
1352
            },
1✔
1353
        };
1✔
1354

1✔
1355
        let expected_shift =
1✔
1356
            TimeInterval::new_unchecked(TimeInstance::EPOCH_START - 1_000, TimeInstance::MAX);
1✔
1357

1✔
1358
        let (shifted, state) = f_shift.shift(time).unwrap();
1✔
1359
        assert_eq!(shifted, expected_shift);
1✔
1360

1361
        let reverse_shifted = f_shift.reverse_shift(expected_shift, state).unwrap();
1✔
1362
        assert_eq!(reverse_shifted, time);
1✔
1363
    }
1✔
1364
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc