• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 7006568925

27 Nov 2023 02:07PM UTC coverage: 89.651% (+0.2%) from 89.498%
7006568925

push

github

web-flow
Merge pull request #888 from geo-engine/raster_stacks

raster stacking

4032 of 4274 new or added lines in 107 files covered. (94.34%)

12 existing lines in 8 files now uncovered.

113020 of 126066 relevant lines covered (89.65%)

59901.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.98
/operators/src/processing/time_shift.rs
1
use crate::engine::{
2
    CanonicOperatorName, ExecutionContext, InitializedRasterOperator,
3
    InitializedSingleRasterOrVectorOperator, InitializedSources, InitializedVectorOperator,
4
    Operator, OperatorName, QueryContext, RasterOperator, RasterQueryProcessor,
5
    RasterResultDescriptor, ResultDescriptor, SingleRasterOrVectorSource,
6
    TypedRasterQueryProcessor, TypedVectorQueryProcessor, VectorOperator, VectorQueryProcessor,
7
    VectorResultDescriptor, WorkflowOperatorPath,
8
};
9
use crate::util::Result;
10
use async_trait::async_trait;
11
use futures::stream::BoxStream;
12
use futures::StreamExt;
13
use geoengine_datatypes::collections::{
14
    FeatureCollection, FeatureCollectionInfos, FeatureCollectionModifications,
15
};
16
use geoengine_datatypes::error::{BoxedResultExt, ErrorSource};
17
use geoengine_datatypes::primitives::{
18
    BandSelection, ColumnSelection, Duration, Geometry, RasterQueryRectangle, TimeGranularity,
19
    TimeInstance, TimeInterval,
20
};
21
use geoengine_datatypes::primitives::{TimeStep, VectorQueryRectangle};
22
use geoengine_datatypes::raster::{Pixel, RasterTile2D};
23
use geoengine_datatypes::util::arrow::ArrowTyped;
24
use serde::{Deserialize, Serialize};
25
use snafu::{ensure, Snafu};
26

27
/// Project the query rectangle to a new time interval.
28
pub type TimeShift = Operator<TimeShiftParams, SingleRasterOrVectorSource>;
29

30
impl OperatorName for TimeShift {
31
    const TYPE_NAME: &'static str = "TimeShift";
32
}
33

34
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
26✔
35
#[serde(tag = "type", rename_all = "camelCase")]
36
pub enum TimeShiftParams {
37
    /// Shift the query rectangle relative with a time step
38
    #[serde(rename_all = "camelCase")]
39
    Relative {
40
        granularity: TimeGranularity,
41
        value: i32,
42
    },
43
    /// Set the time interval to a fixed value
44
    #[serde(rename_all = "camelCase")]
45
    Absolute { time_interval: TimeInterval },
46
}
47

48
pub trait TimeShiftOperation: Send + Sync + Copy {
49
    type State: Send + Sync + Copy;
50

51
    fn shift(
52
        &self,
53
        time_interval: TimeInterval,
54
    ) -> Result<(TimeInterval, Self::State), TimeShiftError>;
55

56
    fn reverse_shift(
57
        &self,
58
        time_interval: TimeInterval,
59
        state: Self::State,
60
    ) -> Result<TimeInterval, TimeShiftError>;
61
}
62

63
#[derive(Debug, Clone, Copy)]
×
64
pub struct RelativeForwardShift {
65
    step: TimeStep,
66
}
67

68
impl TimeShiftOperation for RelativeForwardShift {
69
    type State = ();
70

71
    fn shift(
9✔
72
        &self,
9✔
73
        time_interval: TimeInterval,
9✔
74
    ) -> Result<(TimeInterval, Self::State), TimeShiftError> {
9✔
75
        let time_interval = time_interval + self.step;
9✔
76
        let time_interval = time_interval.boxed_context(error::TimeOverflow)?;
9✔
77
        Ok((time_interval, ()))
9✔
78
    }
9✔
79

80
    fn reverse_shift(
9✔
81
        &self,
9✔
82
        time_interval: TimeInterval,
9✔
83
        _state: Self::State,
9✔
84
    ) -> Result<TimeInterval, TimeShiftError> {
9✔
85
        let reversed_time_interval = time_interval - self.step;
9✔
86
        reversed_time_interval.boxed_context(error::TimeOverflow)
9✔
87
    }
9✔
88
}
89

90
#[derive(Debug, Clone, Copy)]
×
91
pub struct RelativeBackwardShift {
92
    step: TimeStep,
93
}
94

95
impl TimeShiftOperation for RelativeBackwardShift {
96
    type State = ();
97

98
    fn shift(
6✔
99
        &self,
6✔
100
        time_interval: TimeInterval,
6✔
101
    ) -> Result<(TimeInterval, Self::State), TimeShiftError> {
6✔
102
        let time_interval = time_interval - self.step;
6✔
103
        let time_interval = time_interval.boxed_context(error::TimeOverflow)?;
6✔
104
        Ok((time_interval, ()))
6✔
105
    }
6✔
106

107
    fn reverse_shift(
9✔
108
        &self,
9✔
109
        time_interval: TimeInterval,
9✔
110
        _state: Self::State,
9✔
111
    ) -> Result<TimeInterval, TimeShiftError> {
9✔
112
        let reversed_time_interval = time_interval + self.step;
9✔
113
        reversed_time_interval.boxed_context(error::TimeOverflow)
9✔
114
    }
9✔
115
}
116

117
#[derive(Debug, Clone, Copy)]
×
118
pub struct AbsoluteShift {
119
    time_interval: TimeInterval,
120
}
121

122
impl TimeShiftOperation for AbsoluteShift {
123
    type State = (Duration, Duration);
124

125
    fn shift(
4✔
126
        &self,
4✔
127
        time_interval: TimeInterval,
4✔
128
    ) -> Result<(TimeInterval, Self::State), TimeShiftError> {
4✔
129
        let time_start_difference = time_interval.start() - self.time_interval.start();
4✔
130
        let time_end_difference = time_interval.end() - self.time_interval.end();
4✔
131

4✔
132
        Ok((
4✔
133
            self.time_interval,
4✔
134
            (time_start_difference, time_end_difference),
4✔
135
        ))
4✔
136
    }
4✔
137

138
    fn reverse_shift(
7✔
139
        &self,
7✔
140
        time_interval: TimeInterval,
7✔
141
        (time_start_difference, time_end_difference): Self::State,
7✔
142
    ) -> Result<TimeInterval, TimeShiftError> {
7✔
143
        let t1 = time_interval.start() + time_start_difference.num_milliseconds();
7✔
144
        let t2 = time_interval.end() + time_end_difference.num_milliseconds();
7✔
145
        TimeInterval::new(t1, t2).boxed_context(error::FaultyTimeInterval { t1, t2 })
7✔
146
    }
7✔
147
}
148

149
#[derive(Debug, Snafu)]
×
150
#[snafu(visibility(pub(crate)), context(suffix(false)), module(error))]
151
pub enum TimeShiftError {
152
    #[snafu(display("Output type must match the type of the source"))]
153
    UnmatchedOutput,
154
    #[snafu(display("Shifting the time led to an overflowing time interval"))]
155
    TimeOverflow { source: Box<dyn ErrorSource> },
156
    #[snafu(display("Shifting the time to a faulty time interval: {t1} / {t2}"))]
157
    FaultyTimeInterval {
158
        source: Box<dyn ErrorSource>,
159
        t1: TimeInstance,
160
        t2: TimeInstance,
161
    },
162
    #[snafu(display("Modifying the timestamps of the feature collection failed"))]
163
    FeatureCollectionTimeModification { source: Box<dyn ErrorSource> },
164
}
165

166
#[typetag::serde]
×
167
#[async_trait]
168
impl VectorOperator for TimeShift {
169
    async fn _initialize(
2✔
170
        self: Box<Self>,
2✔
171
        path: WorkflowOperatorPath,
2✔
172
        context: &dyn ExecutionContext,
2✔
173
    ) -> Result<Box<dyn InitializedVectorOperator>> {
2✔
174
        let name = CanonicOperatorName::from(&self);
2✔
175

176
        let init_sources = self.sources.initialize_sources(path, context).await?;
2✔
177

178
        match (init_sources.source, self.params) {
2✔
179
            (
180
                InitializedSingleRasterOrVectorOperator::Vector(source),
×
181
                TimeShiftParams::Relative { granularity, value },
×
182
            ) if value.is_positive() => {
1✔
183
                let shift = RelativeForwardShift {
×
184
                    step: TimeStep {
×
185
                        granularity,
×
186
                        step: value.unsigned_abs(),
×
187
                    },
×
188
                };
×
189

×
190
                let result_descriptor = shift_result_descriptor(source.result_descriptor(), shift);
×
191

×
192
                Ok(Box::new(InitializedVectorTimeShift {
×
193
                    name,
×
194
                    source,
×
195
                    result_descriptor,
×
196
                    shift,
×
197
                }))
×
198
            }
199
            (
200
                InitializedSingleRasterOrVectorOperator::Vector(source),
1✔
201
                TimeShiftParams::Relative { granularity, value },
1✔
202
            ) => {
1✔
203
                let shift = RelativeBackwardShift {
1✔
204
                    step: TimeStep {
1✔
205
                        granularity,
1✔
206
                        step: value.unsigned_abs(),
1✔
207
                    },
1✔
208
                };
1✔
209

1✔
210
                let result_descriptor = shift_result_descriptor(source.result_descriptor(), shift);
1✔
211

1✔
212
                Ok(Box::new(InitializedVectorTimeShift {
1✔
213
                    name,
1✔
214
                    source,
1✔
215
                    result_descriptor,
1✔
216
                    shift,
1✔
217
                }))
1✔
218
            }
219
            (
220
                InitializedSingleRasterOrVectorOperator::Vector(source),
1✔
221
                TimeShiftParams::Absolute { time_interval },
1✔
222
            ) => {
1✔
223
                let shift = AbsoluteShift { time_interval };
1✔
224

1✔
225
                let result_descriptor = shift_result_descriptor(source.result_descriptor(), shift);
1✔
226

1✔
227
                Ok(Box::new(InitializedVectorTimeShift {
1✔
228
                    name,
1✔
229
                    source,
1✔
230
                    result_descriptor,
1✔
231
                    shift,
1✔
232
                }))
1✔
233
            }
234
            (InitializedSingleRasterOrVectorOperator::Raster(_), _) => {
235
                Err(TimeShiftError::UnmatchedOutput.into())
×
236
            }
237
        }
238
    }
4✔
239

240
    span_fn!(TimeShift);
×
241
}
242

243
#[typetag::serde]
2✔
244
#[async_trait]
245
impl RasterOperator for TimeShift {
246
    async fn _initialize(
7✔
247
        self: Box<Self>,
7✔
248
        path: WorkflowOperatorPath,
7✔
249
        context: &dyn ExecutionContext,
7✔
250
    ) -> Result<Box<dyn InitializedRasterOperator>> {
7✔
251
        let name = CanonicOperatorName::from(&self);
7✔
252

253
        let init_sources = self.sources.initialize_sources(path, context).await?;
7✔
254

255
        match (init_sources.source, self.params) {
7✔
256
            (
257
                InitializedSingleRasterOrVectorOperator::Raster(source),
4✔
258
                TimeShiftParams::Relative { granularity, value },
4✔
259
            ) if value.is_positive() => {
5✔
260
                let shift = RelativeForwardShift {
4✔
261
                    step: TimeStep {
4✔
262
                        granularity,
4✔
263
                        step: value.unsigned_abs(),
4✔
264
                    },
4✔
265
                };
4✔
266

4✔
267
                let result_descriptor = shift_result_descriptor(source.result_descriptor(), shift);
4✔
268

4✔
269
                // TODO: implement multi-band functionality and remove this check
4✔
270
                ensure!(
4✔
271
                    result_descriptor.bands.len() == 1,
4✔
NEW
272
                    crate::error::OperatorDoesNotSupportMultiBandsSourcesYet {
×
NEW
273
                        operator: TimeShift::TYPE_NAME
×
NEW
274
                    }
×
275
                );
276

277
                Ok(Box::new(InitializedRasterTimeShift {
4✔
278
                    name,
4✔
279
                    source,
4✔
280
                    result_descriptor,
4✔
281
                    shift,
4✔
282
                }))
4✔
283
            }
284
            (
285
                InitializedSingleRasterOrVectorOperator::Raster(source),
1✔
286
                TimeShiftParams::Relative { granularity, value },
1✔
287
            ) => {
1✔
288
                let shift = RelativeBackwardShift {
1✔
289
                    step: TimeStep {
1✔
290
                        granularity,
1✔
291
                        step: value.unsigned_abs(),
1✔
292
                    },
1✔
293
                };
1✔
294

1✔
295
                let result_descriptor = shift_result_descriptor(source.result_descriptor(), shift);
1✔
296

1✔
297
                // TODO: implement multi-band functionality and remove this check
1✔
298
                ensure!(
1✔
299
                    result_descriptor.bands.len() == 1,
1✔
NEW
300
                    crate::error::OperatorDoesNotSupportMultiBandsSourcesYet {
×
NEW
301
                        operator: TimeShift::TYPE_NAME
×
NEW
302
                    }
×
303
                );
304

305
                Ok(Box::new(InitializedRasterTimeShift {
1✔
306
                    name,
1✔
307
                    source,
1✔
308
                    result_descriptor,
1✔
309
                    shift,
1✔
310
                }))
1✔
311
            }
312
            (
313
                InitializedSingleRasterOrVectorOperator::Raster(source),
2✔
314
                TimeShiftParams::Absolute { time_interval },
2✔
315
            ) => {
2✔
316
                let shift = AbsoluteShift { time_interval };
2✔
317

2✔
318
                let result_descriptor = shift_result_descriptor(source.result_descriptor(), shift);
2✔
319

2✔
320
                // TODO: implement multi-band functionality and remove this check
2✔
321
                ensure!(
2✔
322
                    result_descriptor.bands.len() == 1,
2✔
NEW
323
                    crate::error::OperatorDoesNotSupportMultiBandsSourcesYet {
×
NEW
324
                        operator: TimeShift::TYPE_NAME
×
NEW
325
                    }
×
326
                );
327

328
                Ok(Box::new(InitializedRasterTimeShift {
2✔
329
                    name,
2✔
330
                    source,
2✔
331
                    result_descriptor,
2✔
332
                    shift,
2✔
333
                }))
2✔
334
            }
335
            (InitializedSingleRasterOrVectorOperator::Vector(_), _) => {
336
                Err(TimeShiftError::UnmatchedOutput.into())
×
337
            }
338
        }
339
    }
14✔
340

341
    span_fn!(TimeShift);
×
342
}
343

344
fn shift_result_descriptor<R: ResultDescriptor, S: TimeShiftOperation>(
9✔
345
    result_descriptor: &R,
9✔
346
    shift: S,
9✔
347
) -> R {
9✔
348
    result_descriptor.map_time(|time| {
9✔
349
        if let Some(time) = time {
9✔
350
            shift.shift(*time).map(|r| r.0).ok()
5✔
351
        } else {
352
            None
4✔
353
        }
354
    })
9✔
355
}
9✔
356

357
pub struct InitializedVectorTimeShift<Shift: TimeShiftOperation> {
358
    name: CanonicOperatorName,
359
    source: Box<dyn InitializedVectorOperator>,
360
    result_descriptor: VectorResultDescriptor,
361
    shift: Shift,
362
}
363

364
pub struct InitializedRasterTimeShift<Shift: TimeShiftOperation> {
365
    name: CanonicOperatorName,
366
    source: Box<dyn InitializedRasterOperator>,
367
    result_descriptor: RasterResultDescriptor,
368
    shift: Shift,
369
}
370

371
impl<Shift: TimeShiftOperation + 'static> InitializedVectorOperator
372
    for InitializedVectorTimeShift<Shift>
373
{
374
    fn result_descriptor(&self) -> &VectorResultDescriptor {
×
375
        &self.result_descriptor
×
376
    }
×
377

378
    fn query_processor(&self) -> Result<TypedVectorQueryProcessor> {
2✔
379
        let source_processor = self.source.query_processor()?;
2✔
380

381
        Ok(
382
            call_on_generic_vector_processor!(source_processor, processor => VectorTimeShiftProcessor {
2✔
383
                processor,
2✔
384
                shift: self.shift,
2✔
385
            }.boxed().into()),
2✔
386
        )
387
    }
2✔
388

389
    fn canonic_name(&self) -> CanonicOperatorName {
×
390
        self.name.clone()
×
391
    }
×
392
}
393

394
impl<Shift: TimeShiftOperation + 'static> InitializedRasterOperator
395
    for InitializedRasterTimeShift<Shift>
396
{
397
    fn result_descriptor(&self) -> &RasterResultDescriptor {
3✔
398
        &self.result_descriptor
3✔
399
    }
3✔
400

401
    fn query_processor(&self) -> Result<TypedRasterQueryProcessor> {
6✔
402
        let source_processor = self.source.query_processor()?;
6✔
403

404
        Ok(
405
            call_on_generic_raster_processor!(source_processor, processor => RasterTimeShiftProcessor {
6✔
406
                processor,
6✔
407
                shift: self.shift,
6✔
408
            }.boxed().into()),
6✔
409
        )
410
    }
6✔
411

412
    fn canonic_name(&self) -> CanonicOperatorName {
×
413
        self.name.clone()
×
414
    }
×
415
}
416

417
pub struct RasterTimeShiftProcessor<Q, P, Shift: TimeShiftOperation>
418
where
419
    Q: RasterQueryProcessor<RasterType = P>,
420
{
421
    processor: Q,
422
    shift: Shift,
423
}
424

425
pub struct VectorTimeShiftProcessor<Q, G, Shift: TimeShiftOperation>
426
where
427
    G: Geometry,
428
    Q: VectorQueryProcessor<VectorType = FeatureCollection<G>>,
429
{
430
    processor: Q,
431
    shift: Shift,
432
}
433

434
#[async_trait]
435
impl<Q, G, Shift> VectorQueryProcessor for VectorTimeShiftProcessor<Q, G, Shift>
436
where
437
    G: Geometry + ArrowTyped + 'static,
438
    Q: VectorQueryProcessor<VectorType = FeatureCollection<G>>,
439
    Shift: TimeShiftOperation + 'static,
440
{
441
    type VectorType = FeatureCollection<G>;
442

443
    async fn vector_query<'a>(
2✔
444
        &'a self,
2✔
445
        query: VectorQueryRectangle,
2✔
446
        ctx: &'a dyn QueryContext,
2✔
447
    ) -> Result<BoxStream<'a, Result<Self::VectorType>>> {
2✔
448
        let (time_interval, state) = self.shift.shift(query.time_interval)?;
2✔
449

450
        let query = VectorQueryRectangle {
2✔
451
            spatial_bounds: query.spatial_bounds,
2✔
452
            time_interval,
2✔
453
            spatial_resolution: query.spatial_resolution,
2✔
454
            attributes: ColumnSelection::all(),
2✔
455
        };
2✔
456
        let stream = self.processor.vector_query(query, ctx).await?;
2✔
457

458
        let stream = stream.then(move |collection| async move {
2✔
459
            let collection = collection?;
2✔
460
            let shift = self.shift;
2✔
461

2✔
462
            crate::util::spawn_blocking(move || {
2✔
463
                let time_intervals = collection
2✔
464
                    .time_intervals()
2✔
465
                    .iter()
2✔
466
                    .map(move |time| shift.reverse_shift(*time, state))
3✔
467
                    .collect::<Result<Vec<TimeInterval>, TimeShiftError>>()?;
2✔
468

469
                collection
2✔
470
                    .replace_time(&time_intervals)
2✔
471
                    .boxed_context(error::FeatureCollectionTimeModification)
2✔
472
                    .map_err(Into::into)
2✔
473
            })
2✔
474
            .await?
2✔
475
        });
2✔
476

2✔
477
        Ok(stream.boxed())
2✔
478
    }
4✔
479
}
480

481
#[async_trait]
482
impl<Q, P, Shift> RasterQueryProcessor for RasterTimeShiftProcessor<Q, P, Shift>
483
where
484
    Q: RasterQueryProcessor<RasterType = P>,
485
    P: Pixel,
486
    Shift: TimeShiftOperation,
487
{
488
    type RasterType = P;
489

490
    async fn raster_query<'a>(
6✔
491
        &'a self,
6✔
492
        query: RasterQueryRectangle,
6✔
493
        ctx: &'a dyn QueryContext,
6✔
494
    ) -> Result<BoxStream<'a, Result<RasterTile2D<Self::RasterType>>>> {
6✔
495
        let (time_interval, state) = self.shift.shift(query.time_interval)?;
6✔
496
        let query = RasterQueryRectangle {
6✔
497
            spatial_bounds: query.spatial_bounds,
6✔
498
            time_interval,
6✔
499
            spatial_resolution: query.spatial_resolution,
6✔
500
            attributes: BandSelection::first(),
6✔
501
        };
6✔
502
        let stream = self.processor.raster_query(query, ctx).await?;
6✔
503

504
        let stream = stream.map(move |raster| {
16✔
505
            // reverse time shift for results
506
            let mut raster = raster?;
16✔
507

508
            raster.time = self.shift.reverse_shift(raster.time, state)?;
16✔
509

510
            Ok(raster)
16✔
511
        });
16✔
512

6✔
513
        Ok(Box::pin(stream))
6✔
514
    }
12✔
515
}
516

517
#[cfg(test)]
518
mod tests {
519
    use super::*;
520

521
    use crate::{
522
        engine::{MockExecutionContext, MockQueryContext, RasterBandDescriptors},
523
        mock::{MockFeatureCollectionSource, MockRasterSource, MockRasterSourceParams},
524
        processing::{Expression, ExpressionParams, ExpressionSources},
525
        source::{GdalSource, GdalSourceParameters},
526
        util::{gdal::add_ndvi_dataset, input::RasterOrVectorOperator},
527
    };
528
    use futures::StreamExt;
529
    use geoengine_datatypes::{
530
        collections::{ChunksEqualIgnoringCacheHint, MultiPointCollection},
531
        dataset::NamedData,
532
        primitives::{
533
            BoundingBox2D, CacheHint, DateTime, MultiPoint, SpatialPartition2D, SpatialResolution,
534
            TimeGranularity,
535
        },
536
        raster::{EmptyGrid2D, GridOrEmpty, RasterDataType, TileInformation, TilingSpecification},
537
        spatial_reference::SpatialReference,
538
        util::test::TestDefault,
539
    };
540

541
    #[test]
1✔
542
    fn test_ser_de_absolute() {
1✔
543
        let time_shift = TimeShift {
1✔
544
            sources: SingleRasterOrVectorSource {
1✔
545
                source: RasterOrVectorOperator::Raster(
1✔
546
                    GdalSource {
1✔
547
                        params: GdalSourceParameters {
1✔
548
                            data: NamedData::with_system_name("test-raster"),
1✔
549
                        },
1✔
550
                    }
1✔
551
                    .boxed(),
1✔
552
                ),
1✔
553
            },
1✔
554
            params: TimeShiftParams::Absolute {
1✔
555
                time_interval: TimeInterval::new_unchecked(
1✔
556
                    DateTime::new_utc(2011, 1, 1, 0, 0, 0),
1✔
557
                    DateTime::new_utc(2012, 1, 1, 0, 0, 0),
1✔
558
                ),
1✔
559
            },
1✔
560
        };
1✔
561

1✔
562
        let serialized = serde_json::to_value(&time_shift).unwrap();
1✔
563

1✔
564
        assert_eq!(
1✔
565
            serialized,
1✔
566
            serde_json::json!({
1✔
567
                "params": {
1✔
568
                    "type": "absolute",
1✔
569
                    "timeInterval": {
1✔
570
                        "start": 1_293_840_000_000_i64,
1✔
571
                        "end": 1_325_376_000_000_i64
1✔
572
                    }
1✔
573
                },
1✔
574
                "sources": {
1✔
575
                    "source": {
1✔
576
                        "type": "GdalSource",
1✔
577
                        "params": {
1✔
578
                            "data": "test-raster"
1✔
579
                        }
1✔
580
                    }
1✔
581
                }
1✔
582
            })
1✔
583
        );
1✔
584

585
        let deserialized: TimeShift = serde_json::from_value(serialized).unwrap();
1✔
586

1✔
587
        assert_eq!(time_shift.params, deserialized.params);
1✔
588
    }
1✔
589

590
    #[test]
1✔
591
    fn test_ser_de_relative() {
1✔
592
        let time_shift = TimeShift {
1✔
593
            sources: SingleRasterOrVectorSource {
1✔
594
                source: RasterOrVectorOperator::Raster(
1✔
595
                    GdalSource {
1✔
596
                        params: GdalSourceParameters {
1✔
597
                            data: NamedData::with_system_name("test-raster"),
1✔
598
                        },
1✔
599
                    }
1✔
600
                    .boxed(),
1✔
601
                ),
1✔
602
            },
1✔
603
            params: TimeShiftParams::Relative {
1✔
604
                granularity: TimeGranularity::Years,
1✔
605
                value: 1,
1✔
606
            },
1✔
607
        };
1✔
608

1✔
609
        let serialized = serde_json::to_value(&time_shift).unwrap();
1✔
610

1✔
611
        assert_eq!(
1✔
612
            serialized,
1✔
613
            serde_json::json!({
1✔
614
                "params": {
1✔
615
                    "type": "relative",
1✔
616
                    "granularity": "years",
1✔
617
                    "value": 1
1✔
618
                },
1✔
619
                "sources": {
1✔
620
                    "source": {
1✔
621
                        "type": "GdalSource",
1✔
622
                        "params": {
1✔
623
                            "data": "test-raster"
1✔
624
                        }
1✔
625
                    }
1✔
626
                }
1✔
627
            })
1✔
628
        );
1✔
629

630
        let deserialized: TimeShift = serde_json::from_value(serialized).unwrap();
1✔
631

1✔
632
        assert_eq!(time_shift.params, deserialized.params);
1✔
633
    }
1✔
634

635
    #[tokio::test]
1✔
636
    async fn test_absolute_vector_shift() {
1✔
637
        let execution_context = MockExecutionContext::test_default();
1✔
638
        let query_context = MockQueryContext::test_default();
1✔
639

1✔
640
        let source = MockFeatureCollectionSource::single(
1✔
641
            MultiPointCollection::from_data(
1✔
642
                MultiPoint::many(vec![(0., 0.), (1., 1.), (2., 2.)]).unwrap(),
1✔
643
                vec![
1✔
644
                    TimeInterval::new(
1✔
645
                        DateTime::new_utc(2009, 1, 1, 0, 0, 0),
1✔
646
                        DateTime::new_utc_with_millis(2010, 12, 31, 23, 59, 59, 999),
1✔
647
                    )
1✔
648
                    .unwrap(),
1✔
649
                    TimeInterval::new(
1✔
650
                        DateTime::new_utc(2009, 6, 3, 0, 0, 0),
1✔
651
                        DateTime::new_utc(2010, 7, 14, 0, 0, 0),
1✔
652
                    )
1✔
653
                    .unwrap(),
1✔
654
                    TimeInterval::new(
1✔
655
                        DateTime::new_utc(2010, 1, 1, 0, 0, 0),
1✔
656
                        DateTime::new_utc_with_millis(2011, 3, 31, 23, 59, 59, 999),
1✔
657
                    )
1✔
658
                    .unwrap(),
1✔
659
                ],
1✔
660
                Default::default(),
1✔
661
                CacheHint::default(),
1✔
662
            )
1✔
663
            .unwrap(),
1✔
664
        );
1✔
665

1✔
666
        let time_shift = TimeShift {
1✔
667
            sources: SingleRasterOrVectorSource {
1✔
668
                source: RasterOrVectorOperator::Vector(source.boxed()),
1✔
669
            },
1✔
670
            params: TimeShiftParams::Absolute {
1✔
671
                time_interval: TimeInterval::new(
1✔
672
                    DateTime::new_utc(2009, 1, 1, 0, 0, 0),
1✔
673
                    DateTime::new_utc(2009, 6, 1, 0, 0, 0),
1✔
674
                )
1✔
675
                .unwrap(),
1✔
676
            },
1✔
677
        };
1✔
678

679
        let query_processor = VectorOperator::boxed(time_shift)
1✔
680
            .initialize(WorkflowOperatorPath::initialize_root(), &execution_context)
1✔
681
            .await
×
682
            .unwrap()
1✔
683
            .query_processor()
1✔
684
            .unwrap()
1✔
685
            .multi_point()
1✔
686
            .unwrap();
1✔
687

688
        let mut stream = query_processor
1✔
689
            .vector_query(
1✔
690
                VectorQueryRectangle {
1✔
691
                    spatial_bounds: BoundingBox2D::new((0., 0.).into(), (2., 2.).into()).unwrap(),
1✔
692
                    time_interval: TimeInterval::new(
1✔
693
                        DateTime::new_utc(2009, 1, 1, 0, 0, 0),
1✔
694
                        DateTime::new_utc(2012, 1, 1, 0, 0, 0),
1✔
695
                    )
1✔
696
                    .unwrap(),
1✔
697
                    spatial_resolution: SpatialResolution::one(),
1✔
698
                    attributes: ColumnSelection::all(),
1✔
699
                },
1✔
700
                &query_context,
1✔
701
            )
1✔
702
            .await
×
703
            .unwrap();
1✔
704

1✔
705
        let mut result = Vec::new();
1✔
706
        while let Some(collection) = stream.next().await {
2✔
707
            result.push(collection.unwrap());
1✔
708
        }
1✔
709

710
        assert_eq!(result.len(), 1);
1✔
711

712
        let expected = MultiPointCollection::from_data(
1✔
713
            MultiPoint::many(vec![(0., 0.)]).unwrap(),
1✔
714
            vec![TimeInterval::new(
1✔
715
                DateTime::new_utc(2009, 1, 1, 0, 0, 0),
1✔
716
                DateTime::new_utc_with_millis(2013, 8, 1, 23, 59, 59, 999),
1✔
717
            )
1✔
718
            .unwrap()],
1✔
719
            Default::default(),
1✔
720
            CacheHint::default(),
1✔
721
        )
1✔
722
        .unwrap();
1✔
723

1✔
724
        assert!(result[0].chunks_equal_ignoring_cache_hint(&expected));
1✔
725
    }
726

727
    #[tokio::test]
1✔
728
    async fn test_relative_vector_shift() {
1✔
729
        let execution_context = MockExecutionContext::test_default();
1✔
730
        let query_context = MockQueryContext::test_default();
1✔
731

1✔
732
        let source = MockFeatureCollectionSource::single(
1✔
733
            MultiPointCollection::from_data(
1✔
734
                MultiPoint::many(vec![(0., 0.), (1., 1.), (2., 2.)]).unwrap(),
1✔
735
                vec![
1✔
736
                    TimeInterval::new(
1✔
737
                        DateTime::new_utc(2009, 1, 1, 0, 0, 0),
1✔
738
                        DateTime::new_utc_with_millis(2010, 12, 31, 23, 59, 59, 999),
1✔
739
                    )
1✔
740
                    .unwrap(),
1✔
741
                    TimeInterval::new(
1✔
742
                        DateTime::new_utc(2009, 6, 3, 0, 0, 0),
1✔
743
                        DateTime::new_utc(2010, 7, 14, 0, 0, 0),
1✔
744
                    )
1✔
745
                    .unwrap(),
1✔
746
                    TimeInterval::new(
1✔
747
                        DateTime::new_utc(2010, 1, 1, 0, 0, 0),
1✔
748
                        DateTime::new_utc_with_millis(2011, 3, 31, 23, 59, 59, 999),
1✔
749
                    )
1✔
750
                    .unwrap(),
1✔
751
                ],
1✔
752
                Default::default(),
1✔
753
                CacheHint::default(),
1✔
754
            )
1✔
755
            .unwrap(),
1✔
756
        );
1✔
757

1✔
758
        let time_shift = TimeShift {
1✔
759
            sources: SingleRasterOrVectorSource {
1✔
760
                source: RasterOrVectorOperator::Vector(source.boxed()),
1✔
761
            },
1✔
762
            params: TimeShiftParams::Relative {
1✔
763
                granularity: TimeGranularity::Years,
1✔
764
                value: -1,
1✔
765
            },
1✔
766
        };
1✔
767

768
        let query_processor = VectorOperator::boxed(time_shift)
1✔
769
            .initialize(WorkflowOperatorPath::initialize_root(), &execution_context)
1✔
770
            .await
×
771
            .unwrap()
1✔
772
            .query_processor()
1✔
773
            .unwrap()
1✔
774
            .multi_point()
1✔
775
            .unwrap();
1✔
776

777
        let mut stream = query_processor
1✔
778
            .vector_query(
1✔
779
                VectorQueryRectangle {
1✔
780
                    spatial_bounds: BoundingBox2D::new((0., 0.).into(), (2., 2.).into()).unwrap(),
1✔
781
                    time_interval: TimeInterval::new(
1✔
782
                        DateTime::new_utc(2010, 1, 1, 0, 0, 0),
1✔
783
                        DateTime::new_utc(2011, 1, 1, 0, 0, 0),
1✔
784
                    )
1✔
785
                    .unwrap(),
1✔
786
                    spatial_resolution: SpatialResolution::one(),
1✔
787
                    attributes: ColumnSelection::all(),
1✔
788
                },
1✔
789
                &query_context,
1✔
790
            )
1✔
791
            .await
×
792
            .unwrap();
1✔
793

1✔
794
        let mut result = Vec::new();
1✔
795
        while let Some(collection) = stream.next().await {
2✔
796
            result.push(collection.unwrap());
1✔
797
        }
1✔
798

799
        assert_eq!(result.len(), 1);
1✔
800

801
        let expected = MultiPointCollection::from_data(
1✔
802
            MultiPoint::many(vec![(0., 0.), (1., 1.)]).unwrap(),
1✔
803
            vec![
1✔
804
                TimeInterval::new(
1✔
805
                    DateTime::new_utc(2010, 1, 1, 0, 0, 0),
1✔
806
                    DateTime::new_utc_with_millis(2011, 12, 31, 23, 59, 59, 999),
1✔
807
                )
1✔
808
                .unwrap(),
1✔
809
                TimeInterval::new(
1✔
810
                    DateTime::new_utc(2010, 6, 3, 0, 0, 0),
1✔
811
                    DateTime::new_utc(2011, 7, 14, 0, 0, 0),
1✔
812
                )
1✔
813
                .unwrap(),
1✔
814
            ],
1✔
815
            Default::default(),
1✔
816
            CacheHint::default(),
1✔
817
        )
1✔
818
        .unwrap();
1✔
819

1✔
820
        assert!(result[0].chunks_equal_ignoring_cache_hint(&expected));
1✔
821
    }
822

823
    #[tokio::test]
1✔
824
    #[allow(clippy::too_many_lines)]
825
    async fn test_absolute_raster_shift() {
1✔
826
        let empty_grid = GridOrEmpty::Empty(EmptyGrid2D::<u8>::new([3, 2].into()));
1✔
827
        let raster_tiles = vec![
1✔
828
            RasterTile2D::new_with_tile_info(
1✔
829
                TimeInterval::new_unchecked(
1✔
830
                    DateTime::new_utc(2010, 1, 1, 0, 0, 0),
1✔
831
                    DateTime::new_utc(2011, 1, 1, 0, 0, 0),
1✔
832
                ),
1✔
833
                TileInformation {
1✔
834
                    global_tile_position: [-1, 0].into(),
1✔
835
                    tile_size_in_pixels: [3, 2].into(),
1✔
836
                    global_geo_transform: TestDefault::test_default(),
1✔
837
                },
1✔
838
                0,
1✔
839
                empty_grid.clone(),
1✔
840
                CacheHint::default(),
1✔
841
            ),
1✔
842
            RasterTile2D::new_with_tile_info(
1✔
843
                TimeInterval::new_unchecked(
1✔
844
                    DateTime::new_utc(2010, 1, 1, 0, 0, 0),
1✔
845
                    DateTime::new_utc(2011, 1, 1, 0, 0, 0),
1✔
846
                ),
1✔
847
                TileInformation {
1✔
848
                    global_tile_position: [-1, 1].into(),
1✔
849
                    tile_size_in_pixels: [3, 2].into(),
1✔
850
                    global_geo_transform: TestDefault::test_default(),
1✔
851
                },
1✔
852
                0,
1✔
853
                empty_grid.clone(),
1✔
854
                CacheHint::default(),
1✔
855
            ),
1✔
856
            RasterTile2D::new_with_tile_info(
1✔
857
                TimeInterval::new_unchecked(
1✔
858
                    DateTime::new_utc(2011, 1, 1, 0, 0, 0),
1✔
859
                    DateTime::new_utc(2012, 1, 1, 0, 0, 0),
1✔
860
                ),
1✔
861
                TileInformation {
1✔
862
                    global_tile_position: [-1, 0].into(),
1✔
863
                    tile_size_in_pixels: [3, 2].into(),
1✔
864
                    global_geo_transform: TestDefault::test_default(),
1✔
865
                },
1✔
866
                0,
1✔
867
                empty_grid.clone(),
1✔
868
                CacheHint::default(),
1✔
869
            ),
1✔
870
            RasterTile2D::new_with_tile_info(
1✔
871
                TimeInterval::new_unchecked(
1✔
872
                    DateTime::new_utc(2011, 1, 1, 0, 0, 0),
1✔
873
                    DateTime::new_utc(2012, 1, 1, 0, 0, 0),
1✔
874
                ),
1✔
875
                TileInformation {
1✔
876
                    global_tile_position: [-1, 1].into(),
1✔
877
                    tile_size_in_pixels: [3, 2].into(),
1✔
878
                    global_geo_transform: TestDefault::test_default(),
1✔
879
                },
1✔
880
                0,
1✔
881
                empty_grid.clone(),
1✔
882
                CacheHint::default(),
1✔
883
            ),
1✔
884
            RasterTile2D::new_with_tile_info(
1✔
885
                TimeInterval::new_unchecked(
1✔
886
                    DateTime::new_utc(2012, 1, 1, 0, 0, 0),
1✔
887
                    DateTime::new_utc(2013, 1, 1, 0, 0, 0),
1✔
888
                ),
1✔
889
                TileInformation {
1✔
890
                    global_tile_position: [-1, 0].into(),
1✔
891
                    tile_size_in_pixels: [3, 2].into(),
1✔
892
                    global_geo_transform: TestDefault::test_default(),
1✔
893
                },
1✔
894
                0,
1✔
895
                empty_grid.clone(),
1✔
896
                CacheHint::default(),
1✔
897
            ),
1✔
898
            RasterTile2D::new_with_tile_info(
1✔
899
                TimeInterval::new_unchecked(
1✔
900
                    DateTime::new_utc(2012, 1, 1, 0, 0, 0),
1✔
901
                    DateTime::new_utc(2013, 1, 1, 0, 0, 0),
1✔
902
                ),
1✔
903
                TileInformation {
1✔
904
                    global_tile_position: [-1, 1].into(),
1✔
905
                    tile_size_in_pixels: [3, 2].into(),
1✔
906
                    global_geo_transform: TestDefault::test_default(),
1✔
907
                },
1✔
908
                0,
1✔
909
                empty_grid.clone(),
1✔
910
                CacheHint::default(),
1✔
911
            ),
1✔
912
        ];
1✔
913

1✔
914
        let mrs = MockRasterSource {
1✔
915
            params: MockRasterSourceParams {
1✔
916
                data: raster_tiles,
1✔
917
                result_descriptor: RasterResultDescriptor {
1✔
918
                    data_type: RasterDataType::U8,
1✔
919
                    spatial_reference: SpatialReference::epsg_4326().into(),
1✔
920
                    time: None,
1✔
921
                    bbox: None,
1✔
922
                    resolution: None,
1✔
923
                    bands: RasterBandDescriptors::new_single_band(),
1✔
924
                },
1✔
925
            },
1✔
926
        }
1✔
927
        .boxed();
1✔
928

1✔
929
        let time_shift = TimeShift {
1✔
930
            sources: SingleRasterOrVectorSource {
1✔
931
                source: RasterOrVectorOperator::Raster(mrs),
1✔
932
            },
1✔
933
            params: TimeShiftParams::Absolute {
1✔
934
                time_interval: TimeInterval::new_unchecked(
1✔
935
                    DateTime::new_utc(2011, 1, 1, 0, 0, 0),
1✔
936
                    DateTime::new_utc(2012, 1, 1, 0, 0, 0),
1✔
937
                ),
1✔
938
            },
1✔
939
        };
1✔
940

1✔
941
        let execution_context = MockExecutionContext::new_with_tiling_spec(
1✔
942
            TilingSpecification::new((0., 0.).into(), [3, 2].into()),
1✔
943
        );
1✔
944
        let query_context = MockQueryContext::test_default();
1✔
945

946
        let query_processor = RasterOperator::boxed(time_shift)
1✔
947
            .initialize(WorkflowOperatorPath::initialize_root(), &execution_context)
1✔
948
            .await
×
949
            .unwrap()
1✔
950
            .query_processor()
1✔
951
            .unwrap()
1✔
952
            .get_u8()
1✔
953
            .unwrap();
1✔
954

955
        let mut stream = query_processor
1✔
956
            .raster_query(
1✔
957
                RasterQueryRectangle {
1✔
958
                    spatial_bounds: SpatialPartition2D::new_unchecked(
1✔
959
                        (0., 3.).into(),
1✔
960
                        (4., 0.).into(),
1✔
961
                    ),
1✔
962
                    time_interval: TimeInterval::new(
1✔
963
                        DateTime::new_utc(2010, 1, 1, 0, 0, 0),
1✔
964
                        DateTime::new_utc(2011, 1, 1, 0, 0, 0),
1✔
965
                    )
1✔
966
                    .unwrap(),
1✔
967
                    spatial_resolution: SpatialResolution::one(),
1✔
968
                    attributes: BandSelection::first(),
1✔
969
                },
1✔
970
                &query_context,
1✔
971
            )
1✔
972
            .await
×
973
            .unwrap();
1✔
974

1✔
975
        let mut result = Vec::new();
1✔
976
        while let Some(tile) = stream.next().await {
3✔
977
            result.push(tile.unwrap());
2✔
978
        }
2✔
979

980
        assert_eq!(result.len(), 2);
1✔
981

982
        assert_eq!(
1✔
983
            result[0].time,
1✔
984
            TimeInterval::new_unchecked(
1✔
985
                DateTime::new_utc(2010, 1, 1, 0, 0, 0),
1✔
986
                DateTime::new_utc(2011, 1, 1, 0, 0, 0),
1✔
987
            ),
1✔
988
        );
1✔
989
        assert_eq!(
1✔
990
            result[1].time,
1✔
991
            TimeInterval::new_unchecked(
1✔
992
                DateTime::new_utc(2010, 1, 1, 0, 0, 0),
1✔
993
                DateTime::new_utc(2011, 1, 1, 0, 0, 0),
1✔
994
            ),
1✔
995
        );
1✔
996
    }
997

998
    #[tokio::test]
1✔
999
    #[allow(clippy::too_many_lines)]
1000
    async fn test_relative_raster_shift() {
1✔
1001
        let empty_grid = GridOrEmpty::Empty(EmptyGrid2D::<u8>::new([3, 2].into()));
1✔
1002
        let raster_tiles = vec![
1✔
1003
            RasterTile2D::new_with_tile_info(
1✔
1004
                TimeInterval::new_unchecked(
1✔
1005
                    DateTime::new_utc(2010, 1, 1, 0, 0, 0),
1✔
1006
                    DateTime::new_utc(2011, 1, 1, 0, 0, 0),
1✔
1007
                ),
1✔
1008
                TileInformation {
1✔
1009
                    global_tile_position: [-1, 0].into(),
1✔
1010
                    tile_size_in_pixels: [3, 2].into(),
1✔
1011
                    global_geo_transform: TestDefault::test_default(),
1✔
1012
                },
1✔
1013
                0,
1✔
1014
                empty_grid.clone(),
1✔
1015
                CacheHint::default(),
1✔
1016
            ),
1✔
1017
            RasterTile2D::new_with_tile_info(
1✔
1018
                TimeInterval::new_unchecked(
1✔
1019
                    DateTime::new_utc(2010, 1, 1, 0, 0, 0),
1✔
1020
                    DateTime::new_utc(2011, 1, 1, 0, 0, 0),
1✔
1021
                ),
1✔
1022
                TileInformation {
1✔
1023
                    global_tile_position: [-1, 1].into(),
1✔
1024
                    tile_size_in_pixels: [3, 2].into(),
1✔
1025
                    global_geo_transform: TestDefault::test_default(),
1✔
1026
                },
1✔
1027
                0,
1✔
1028
                empty_grid.clone(),
1✔
1029
                CacheHint::default(),
1✔
1030
            ),
1✔
1031
            RasterTile2D::new_with_tile_info(
1✔
1032
                TimeInterval::new_unchecked(
1✔
1033
                    DateTime::new_utc(2011, 1, 1, 0, 0, 0),
1✔
1034
                    DateTime::new_utc(2012, 1, 1, 0, 0, 0),
1✔
1035
                ),
1✔
1036
                TileInformation {
1✔
1037
                    global_tile_position: [-1, 0].into(),
1✔
1038
                    tile_size_in_pixels: [3, 2].into(),
1✔
1039
                    global_geo_transform: TestDefault::test_default(),
1✔
1040
                },
1✔
1041
                0,
1✔
1042
                empty_grid.clone(),
1✔
1043
                CacheHint::default(),
1✔
1044
            ),
1✔
1045
            RasterTile2D::new_with_tile_info(
1✔
1046
                TimeInterval::new_unchecked(
1✔
1047
                    DateTime::new_utc(2011, 1, 1, 0, 0, 0),
1✔
1048
                    DateTime::new_utc(2012, 1, 1, 0, 0, 0),
1✔
1049
                ),
1✔
1050
                TileInformation {
1✔
1051
                    global_tile_position: [-1, 1].into(),
1✔
1052
                    tile_size_in_pixels: [3, 2].into(),
1✔
1053
                    global_geo_transform: TestDefault::test_default(),
1✔
1054
                },
1✔
1055
                0,
1✔
1056
                empty_grid.clone(),
1✔
1057
                CacheHint::default(),
1✔
1058
            ),
1✔
1059
            RasterTile2D::new_with_tile_info(
1✔
1060
                TimeInterval::new_unchecked(
1✔
1061
                    DateTime::new_utc(2012, 1, 1, 0, 0, 0),
1✔
1062
                    DateTime::new_utc(2013, 1, 1, 0, 0, 0),
1✔
1063
                ),
1✔
1064
                TileInformation {
1✔
1065
                    global_tile_position: [-1, 0].into(),
1✔
1066
                    tile_size_in_pixels: [3, 2].into(),
1✔
1067
                    global_geo_transform: TestDefault::test_default(),
1✔
1068
                },
1✔
1069
                0,
1✔
1070
                empty_grid.clone(),
1✔
1071
                CacheHint::default(),
1✔
1072
            ),
1✔
1073
            RasterTile2D::new_with_tile_info(
1✔
1074
                TimeInterval::new_unchecked(
1✔
1075
                    DateTime::new_utc(2012, 1, 1, 0, 0, 0),
1✔
1076
                    DateTime::new_utc(2013, 1, 1, 0, 0, 0),
1✔
1077
                ),
1✔
1078
                TileInformation {
1✔
1079
                    global_tile_position: [-1, 1].into(),
1✔
1080
                    tile_size_in_pixels: [3, 2].into(),
1✔
1081
                    global_geo_transform: TestDefault::test_default(),
1✔
1082
                },
1✔
1083
                0,
1✔
1084
                empty_grid.clone(),
1✔
1085
                CacheHint::default(),
1✔
1086
            ),
1✔
1087
        ];
1✔
1088

1✔
1089
        let mrs = MockRasterSource {
1✔
1090
            params: MockRasterSourceParams {
1✔
1091
                data: raster_tiles,
1✔
1092
                result_descriptor: RasterResultDescriptor {
1✔
1093
                    data_type: RasterDataType::U8,
1✔
1094
                    spatial_reference: SpatialReference::epsg_4326().into(),
1✔
1095
                    time: None,
1✔
1096
                    bbox: None,
1✔
1097
                    resolution: None,
1✔
1098
                    bands: RasterBandDescriptors::new_single_band(),
1✔
1099
                },
1✔
1100
            },
1✔
1101
        }
1✔
1102
        .boxed();
1✔
1103

1✔
1104
        let time_shift = TimeShift {
1✔
1105
            sources: SingleRasterOrVectorSource {
1✔
1106
                source: RasterOrVectorOperator::Raster(mrs),
1✔
1107
            },
1✔
1108
            params: TimeShiftParams::Relative {
1✔
1109
                granularity: TimeGranularity::Years,
1✔
1110
                value: 1,
1✔
1111
            },
1✔
1112
        };
1✔
1113

1✔
1114
        let execution_context = MockExecutionContext::new_with_tiling_spec(
1✔
1115
            TilingSpecification::new((0., 0.).into(), [3, 2].into()),
1✔
1116
        );
1✔
1117
        let query_context = MockQueryContext::test_default();
1✔
1118

1119
        let query_processor = RasterOperator::boxed(time_shift)
1✔
1120
            .initialize(WorkflowOperatorPath::initialize_root(), &execution_context)
1✔
1121
            .await
×
1122
            .unwrap()
1✔
1123
            .query_processor()
1✔
1124
            .unwrap()
1✔
1125
            .get_u8()
1✔
1126
            .unwrap();
1✔
1127

1128
        let mut stream = query_processor
1✔
1129
            .raster_query(
1✔
1130
                RasterQueryRectangle {
1✔
1131
                    spatial_bounds: SpatialPartition2D::new_unchecked(
1✔
1132
                        (0., 3.).into(),
1✔
1133
                        (4., 0.).into(),
1✔
1134
                    ),
1✔
1135
                    time_interval: TimeInterval::new(
1✔
1136
                        DateTime::new_utc(2010, 1, 1, 0, 0, 0),
1✔
1137
                        DateTime::new_utc(2011, 1, 1, 0, 0, 0),
1✔
1138
                    )
1✔
1139
                    .unwrap(),
1✔
1140
                    spatial_resolution: SpatialResolution::one(),
1✔
1141
                    attributes: BandSelection::first(),
1✔
1142
                },
1✔
1143
                &query_context,
1✔
1144
            )
1✔
1145
            .await
×
1146
            .unwrap();
1✔
1147

1✔
1148
        let mut result = Vec::new();
1✔
1149
        while let Some(tile) = stream.next().await {
3✔
1150
            result.push(tile.unwrap());
2✔
1151
        }
2✔
1152

1153
        assert_eq!(result.len(), 2);
1✔
1154

1155
        assert_eq!(
1✔
1156
            result[0].time,
1✔
1157
            TimeInterval::new_unchecked(
1✔
1158
                DateTime::new_utc(2010, 1, 1, 0, 0, 0),
1✔
1159
                DateTime::new_utc(2011, 1, 1, 0, 0, 0),
1✔
1160
            ),
1✔
1161
        );
1✔
1162
        assert_eq!(
1✔
1163
            result[1].time,
1✔
1164
            TimeInterval::new_unchecked(
1✔
1165
                DateTime::new_utc(2010, 1, 1, 0, 0, 0),
1✔
1166
                DateTime::new_utc(2011, 1, 1, 0, 0, 0),
1✔
1167
            ),
1✔
1168
        );
1✔
1169
    }
1170

1171
    #[tokio::test]
1✔
1172
    async fn test_expression_on_shifted_raster() {
1✔
1173
        let mut execution_context = MockExecutionContext::test_default();
1✔
1174

1✔
1175
        let ndvi_source = GdalSource {
1✔
1176
            params: GdalSourceParameters {
1✔
1177
                data: add_ndvi_dataset(&mut execution_context),
1✔
1178
            },
1✔
1179
        }
1✔
1180
        .boxed();
1✔
1181

1✔
1182
        let shifted_ndvi_source = RasterOperator::boxed(TimeShift {
1✔
1183
            params: TimeShiftParams::Relative {
1✔
1184
                granularity: TimeGranularity::Months,
1✔
1185
                value: -1,
1✔
1186
            },
1✔
1187
            sources: SingleRasterOrVectorSource {
1✔
1188
                source: RasterOrVectorOperator::Raster(ndvi_source.clone()),
1✔
1189
            },
1✔
1190
        });
1✔
1191

1✔
1192
        let expression = Expression {
1✔
1193
            params: ExpressionParams {
1✔
1194
                expression: "A - B".to_string(),
1✔
1195
                output_type: RasterDataType::F64,
1✔
1196
                output_measurement: None,
1✔
1197
                map_no_data: false,
1✔
1198
            },
1✔
1199
            sources: ExpressionSources::new_a_b(ndvi_source, shifted_ndvi_source),
1✔
1200
        }
1✔
1201
        .boxed();
1✔
1202

1203
        let query_processor = expression
1✔
1204
            .initialize(WorkflowOperatorPath::initialize_root(), &execution_context)
1✔
1205
            .await
×
1206
            .unwrap()
1✔
1207
            .query_processor()
1✔
1208
            .unwrap()
1✔
1209
            .get_f64()
1✔
1210
            .unwrap();
1✔
1211

1✔
1212
        let query_context = MockQueryContext::test_default();
1✔
1213

1214
        let mut stream = query_processor
1✔
1215
            .raster_query(
1✔
1216
                RasterQueryRectangle {
1✔
1217
                    spatial_bounds: SpatialPartition2D::new_unchecked(
1✔
1218
                        (-180., 90.).into(),
1✔
1219
                        (180., -90.).into(),
1✔
1220
                    ),
1✔
1221
                    time_interval: TimeInterval::new_instant(DateTime::new_utc(
1✔
1222
                        2014, 3, 1, 0, 0, 0,
1✔
1223
                    ))
1✔
1224
                    .unwrap(),
1✔
1225
                    spatial_resolution: SpatialResolution::one(),
1✔
1226
                    attributes: BandSelection::first(),
1✔
1227
                },
1✔
1228
                &query_context,
1✔
1229
            )
1✔
1230
            .await
×
1231
            .unwrap();
1✔
1232

1✔
1233
        let mut result = Vec::new();
1✔
1234
        while let Some(tile) = stream.next().await {
19✔
1235
            result.push(tile.unwrap());
4✔
1236
        }
4✔
1237

1238
        assert_eq!(result.len(), 4);
1✔
1239
        assert_eq!(
1✔
1240
            result[0].time,
1✔
1241
            TimeInterval::new(
1✔
1242
                DateTime::new_utc(2014, 3, 1, 0, 0, 0),
1✔
1243
                DateTime::new_utc(2014, 4, 1, 0, 0, 0)
1✔
1244
            )
1✔
1245
            .unwrap()
1✔
1246
        );
1✔
1247
    }
1248

1249
    #[tokio::test]
1✔
1250
    async fn test_expression_on_absolute_shifted_raster() {
1✔
1251
        let mut execution_context = MockExecutionContext::test_default();
1✔
1252

1✔
1253
        let ndvi_source = GdalSource {
1✔
1254
            params: GdalSourceParameters {
1✔
1255
                data: add_ndvi_dataset(&mut execution_context),
1✔
1256
            },
1✔
1257
        }
1✔
1258
        .boxed();
1✔
1259

1✔
1260
        let shifted_ndvi_source = RasterOperator::boxed(TimeShift {
1✔
1261
            params: TimeShiftParams::Absolute {
1✔
1262
                time_interval: TimeInterval::new_instant(DateTime::new_utc(2014, 5, 1, 0, 0, 0))
1✔
1263
                    .unwrap(),
1✔
1264
            },
1✔
1265
            sources: SingleRasterOrVectorSource {
1✔
1266
                source: RasterOrVectorOperator::Raster(ndvi_source),
1✔
1267
            },
1✔
1268
        });
1✔
1269

1270
        let query_processor = shifted_ndvi_source
1✔
1271
            .initialize(WorkflowOperatorPath::initialize_root(), &execution_context)
1✔
1272
            .await
×
1273
            .unwrap()
1✔
1274
            .query_processor()
1✔
1275
            .unwrap()
1✔
1276
            .get_u8()
1✔
1277
            .unwrap();
1✔
1278

1✔
1279
        let query_context = MockQueryContext::test_default();
1✔
1280

1281
        let mut stream = query_processor
1✔
1282
            .raster_query(
1✔
1283
                RasterQueryRectangle {
1✔
1284
                    spatial_bounds: SpatialPartition2D::new_unchecked(
1✔
1285
                        (-180., 90.).into(),
1✔
1286
                        (180., -90.).into(),
1✔
1287
                    ),
1✔
1288
                    time_interval: TimeInterval::new_instant(DateTime::new_utc(
1✔
1289
                        2014, 3, 1, 0, 0, 0,
1✔
1290
                    ))
1✔
1291
                    .unwrap(),
1✔
1292
                    spatial_resolution: SpatialResolution::one(),
1✔
1293
                    attributes: BandSelection::first(),
1✔
1294
                },
1✔
1295
                &query_context,
1✔
1296
            )
1✔
1297
            .await
×
1298
            .unwrap();
1✔
1299

1✔
1300
        let mut result = Vec::new();
1✔
1301
        while let Some(tile) = stream.next().await {
5✔
1302
            result.push(tile.unwrap());
4✔
1303
        }
4✔
1304

1305
        assert_eq!(result.len(), 4);
1✔
1306
        assert_eq!(
1✔
1307
            result[0].time,
1✔
1308
            TimeInterval::new(
1✔
1309
                DateTime::new_utc(2014, 3, 1, 0, 0, 0),
1✔
1310
                DateTime::new_utc(2014, 4, 1, 0, 0, 0)
1✔
1311
            )
1✔
1312
            .unwrap()
1✔
1313
        );
1✔
1314
    }
1315

1316
    #[test]
1✔
1317
    fn shift_eternal() {
1✔
1318
        let eternal = TimeInterval::default();
1✔
1319

1✔
1320
        let f_shift = RelativeForwardShift {
1✔
1321
            step: TimeStep {
1✔
1322
                granularity: TimeGranularity::Seconds,
1✔
1323
                step: 1,
1✔
1324
            },
1✔
1325
        };
1✔
1326

1✔
1327
        let (shifted, state) = f_shift.shift(eternal).unwrap();
1✔
1328
        assert_eq!(shifted, eternal);
1✔
1329
        let reverse_shifted = f_shift.reverse_shift(eternal, state).unwrap();
1✔
1330
        assert_eq!(reverse_shifted, eternal);
1✔
1331

1332
        let b_shift = RelativeBackwardShift {
1✔
1333
            step: TimeStep {
1✔
1334
                granularity: TimeGranularity::Seconds,
1✔
1335
                step: 1,
1✔
1336
            },
1✔
1337
        };
1✔
1338

1✔
1339
        let (shifted, state) = b_shift.shift(eternal).unwrap();
1✔
1340
        assert_eq!(shifted, eternal);
1✔
1341

1342
        let reverse_shifted = b_shift.reverse_shift(eternal, state).unwrap();
1✔
1343
        assert_eq!(reverse_shifted, eternal);
1✔
1344
    }
1✔
1345

1346
    #[test]
1✔
1347
    fn shift_begin_of_time() {
1✔
1348
        let time = TimeInterval::new_unchecked(TimeInstance::MIN, TimeInstance::EPOCH_START);
1✔
1349

1✔
1350
        let f_shift = RelativeForwardShift {
1✔
1351
            step: TimeStep {
1✔
1352
                granularity: TimeGranularity::Seconds,
1✔
1353
                step: 1,
1✔
1354
            },
1✔
1355
        };
1✔
1356

1✔
1357
        let expected_shift =
1✔
1358
            TimeInterval::new_unchecked(TimeInstance::MIN, TimeInstance::EPOCH_START + 1_000);
1✔
1359

1✔
1360
        let (shifted, state) = f_shift.shift(time).unwrap();
1✔
1361
        assert_eq!(shifted, expected_shift);
1✔
1362

1363
        let reverse_shifted = f_shift.reverse_shift(expected_shift, state).unwrap();
1✔
1364
        assert_eq!(reverse_shifted, time);
1✔
1365

1366
        let f_shift = RelativeBackwardShift {
1✔
1367
            step: TimeStep {
1✔
1368
                granularity: TimeGranularity::Seconds,
1✔
1369
                step: 1,
1✔
1370
            },
1✔
1371
        };
1✔
1372

1✔
1373
        let expected_shift =
1✔
1374
            TimeInterval::new_unchecked(TimeInstance::MIN, TimeInstance::EPOCH_START - 1_000);
1✔
1375

1✔
1376
        let (shifted, state) = f_shift.shift(time).unwrap();
1✔
1377
        assert_eq!(shifted, expected_shift);
1✔
1378

1379
        let reverse_shifted = f_shift.reverse_shift(expected_shift, state).unwrap();
1✔
1380
        assert_eq!(reverse_shifted, time);
1✔
1381
    }
1✔
1382

1383
    #[test]
1✔
1384
    fn shift_end_of_time() {
1✔
1385
        let time = TimeInterval::new_unchecked(TimeInstance::EPOCH_START, TimeInstance::MAX);
1✔
1386

1✔
1387
        let f_shift = RelativeForwardShift {
1✔
1388
            step: TimeStep {
1✔
1389
                granularity: TimeGranularity::Seconds,
1✔
1390
                step: 1,
1✔
1391
            },
1✔
1392
        };
1✔
1393

1✔
1394
        let expected_shift =
1✔
1395
            TimeInterval::new_unchecked(TimeInstance::EPOCH_START + 1_000, TimeInstance::MAX);
1✔
1396

1✔
1397
        let (shifted, state) = f_shift.shift(time).unwrap();
1✔
1398
        assert_eq!(shifted, expected_shift);
1✔
1399

1400
        let reverse_shifted = f_shift.reverse_shift(expected_shift, state).unwrap();
1✔
1401
        assert_eq!(reverse_shifted, time);
1✔
1402

1403
        let f_shift = RelativeBackwardShift {
1✔
1404
            step: TimeStep {
1✔
1405
                granularity: TimeGranularity::Seconds,
1✔
1406
                step: 1,
1✔
1407
            },
1✔
1408
        };
1✔
1409

1✔
1410
        let expected_shift =
1✔
1411
            TimeInterval::new_unchecked(TimeInstance::EPOCH_START - 1_000, TimeInstance::MAX);
1✔
1412

1✔
1413
        let (shifted, state) = f_shift.shift(time).unwrap();
1✔
1414
        assert_eq!(shifted, expected_shift);
1✔
1415

1416
        let reverse_shifted = f_shift.reverse_shift(expected_shift, state).unwrap();
1✔
1417
        assert_eq!(reverse_shifted, time);
1✔
1418
    }
1✔
1419
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc