• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 12469296660

23 Dec 2024 03:15PM UTC coverage: 90.56% (-0.1%) from 90.695%
12469296660

push

github

web-flow
Merge pull request #998 from geo-engine/quota_log_wip

Quota and Data usage Logging

859 of 1214 new or added lines in 66 files covered. (70.76%)

3 existing lines in 2 files now uncovered.

133923 of 147883 relevant lines covered (90.56%)

54439.32 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.86
/operators/src/processing/time_shift.rs
1
use crate::engine::{
2
    CanonicOperatorName, ExecutionContext, InitializedRasterOperator,
3
    InitializedSingleRasterOrVectorOperator, InitializedSources, InitializedVectorOperator,
4
    Operator, OperatorName, QueryContext, RasterOperator, RasterQueryProcessor,
5
    RasterResultDescriptor, ResultDescriptor, SingleRasterOrVectorSource,
6
    TypedRasterQueryProcessor, TypedVectorQueryProcessor, VectorOperator, VectorQueryProcessor,
7
    VectorResultDescriptor, WorkflowOperatorPath,
8
};
9
use crate::util::Result;
10
use async_trait::async_trait;
11
use futures::stream::BoxStream;
12
use futures::StreamExt;
13
use geoengine_datatypes::collections::{
14
    FeatureCollection, FeatureCollectionInfos, FeatureCollectionModifications,
15
};
16
use geoengine_datatypes::error::{BoxedResultExt, ErrorSource};
17
use geoengine_datatypes::primitives::{
18
    ColumnSelection, Duration, Geometry, RasterQueryRectangle, TimeGranularity, TimeInstance,
19
    TimeInterval,
20
};
21
use geoengine_datatypes::primitives::{TimeStep, VectorQueryRectangle};
22
use geoengine_datatypes::raster::{Pixel, RasterTile2D};
23
use geoengine_datatypes::util::arrow::ArrowTyped;
24
use serde::{Deserialize, Serialize};
25
use snafu::Snafu;
26

27
/// Project the query rectangle to a new time interval.
28
pub type TimeShift = Operator<TimeShiftParams, SingleRasterOrVectorSource>;
29

30
impl OperatorName for TimeShift {
31
    const TYPE_NAME: &'static str = "TimeShift";
32
}
33

34
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
35
#[serde(tag = "type", rename_all = "camelCase")]
36
pub enum TimeShiftParams {
37
    /// Shift the query rectangle relative with a time step
38
    #[serde(rename_all = "camelCase")]
39
    Relative {
40
        granularity: TimeGranularity,
41
        value: i32,
42
    },
43
    /// Set the time interval to a fixed value
44
    #[serde(rename_all = "camelCase")]
45
    Absolute { time_interval: TimeInterval },
46
}
47

48
pub trait TimeShiftOperation: Send + Sync + Copy {
49
    type State: Send + Sync + Copy;
50

51
    fn shift(
52
        &self,
53
        time_interval: TimeInterval,
54
    ) -> Result<(TimeInterval, Self::State), TimeShiftError>;
55

56
    fn reverse_shift(
57
        &self,
58
        time_interval: TimeInterval,
59
        state: Self::State,
60
    ) -> Result<TimeInterval, TimeShiftError>;
61
}
62

63
#[derive(Debug, Clone, Copy)]
64
pub struct RelativeForwardShift {
65
    step: TimeStep,
66
}
67

68
impl TimeShiftOperation for RelativeForwardShift {
69
    type State = ();
70

71
    fn shift(
9✔
72
        &self,
9✔
73
        time_interval: TimeInterval,
9✔
74
    ) -> Result<(TimeInterval, Self::State), TimeShiftError> {
9✔
75
        let time_interval = time_interval + self.step;
9✔
76
        let time_interval = time_interval.boxed_context(error::TimeOverflow)?;
9✔
77
        Ok((time_interval, ()))
9✔
78
    }
9✔
79

80
    fn reverse_shift(
13✔
81
        &self,
13✔
82
        time_interval: TimeInterval,
13✔
83
        _state: Self::State,
13✔
84
    ) -> Result<TimeInterval, TimeShiftError> {
13✔
85
        let reversed_time_interval = time_interval - self.step;
13✔
86
        reversed_time_interval.boxed_context(error::TimeOverflow)
13✔
87
    }
13✔
88
}
89

90
#[derive(Debug, Clone, Copy)]
91
pub struct RelativeBackwardShift {
92
    step: TimeStep,
93
}
94

95
impl TimeShiftOperation for RelativeBackwardShift {
96
    type State = ();
97

98
    fn shift(
6✔
99
        &self,
6✔
100
        time_interval: TimeInterval,
6✔
101
    ) -> Result<(TimeInterval, Self::State), TimeShiftError> {
6✔
102
        let time_interval = time_interval - self.step;
6✔
103
        let time_interval = time_interval.boxed_context(error::TimeOverflow)?;
6✔
104
        Ok((time_interval, ()))
6✔
105
    }
6✔
106

107
    fn reverse_shift(
9✔
108
        &self,
9✔
109
        time_interval: TimeInterval,
9✔
110
        _state: Self::State,
9✔
111
    ) -> Result<TimeInterval, TimeShiftError> {
9✔
112
        let reversed_time_interval = time_interval + self.step;
9✔
113
        reversed_time_interval.boxed_context(error::TimeOverflow)
9✔
114
    }
9✔
115
}
116

117
#[derive(Debug, Clone, Copy)]
118
pub struct AbsoluteShift {
119
    time_interval: TimeInterval,
120
}
121

122
impl TimeShiftOperation for AbsoluteShift {
123
    type State = (Duration, Duration);
124

125
    fn shift(
4✔
126
        &self,
4✔
127
        time_interval: TimeInterval,
4✔
128
    ) -> Result<(TimeInterval, Self::State), TimeShiftError> {
4✔
129
        let time_start_difference = time_interval.start() - self.time_interval.start();
4✔
130
        let time_end_difference = time_interval.end() - self.time_interval.end();
4✔
131

4✔
132
        Ok((
4✔
133
            self.time_interval,
4✔
134
            (time_start_difference, time_end_difference),
4✔
135
        ))
4✔
136
    }
4✔
137

138
    fn reverse_shift(
7✔
139
        &self,
7✔
140
        time_interval: TimeInterval,
7✔
141
        (time_start_difference, time_end_difference): Self::State,
7✔
142
    ) -> Result<TimeInterval, TimeShiftError> {
7✔
143
        let t1 = time_interval.start() + time_start_difference.num_milliseconds();
7✔
144
        let t2 = time_interval.end() + time_end_difference.num_milliseconds();
7✔
145
        TimeInterval::new(t1, t2).boxed_context(error::FaultyTimeInterval { t1, t2 })
7✔
146
    }
7✔
147
}
148

149
#[derive(Debug, Snafu)]
×
150
#[snafu(visibility(pub(crate)), context(suffix(false)), module(error))]
151
pub enum TimeShiftError {
152
    #[snafu(display("Output type must match the type of the source"))]
153
    UnmatchedOutput,
154
    #[snafu(display("Shifting the time led to an overflowing time interval"))]
155
    TimeOverflow { source: Box<dyn ErrorSource> },
156
    #[snafu(display("Shifting the time to a faulty time interval: {t1} / {t2}"))]
157
    FaultyTimeInterval {
158
        source: Box<dyn ErrorSource>,
159
        t1: TimeInstance,
160
        t2: TimeInstance,
161
    },
162
    #[snafu(display("Modifying the timestamps of the feature collection failed"))]
163
    FeatureCollectionTimeModification { source: Box<dyn ErrorSource> },
164
}
165

166
#[typetag::serde]
×
167
#[async_trait]
168
impl VectorOperator for TimeShift {
169
    async fn _initialize(
170
        self: Box<Self>,
171
        path: WorkflowOperatorPath,
172
        context: &dyn ExecutionContext,
173
    ) -> Result<Box<dyn InitializedVectorOperator>> {
2✔
174
        let name = CanonicOperatorName::from(&self);
2✔
175

176
        let init_sources = self
2✔
177
            .sources
2✔
178
            .initialize_sources(path.clone(), context)
2✔
179
            .await?;
2✔
180

181
        match (init_sources.source, self.params) {
2✔
182
            (
183
                InitializedSingleRasterOrVectorOperator::Vector(source),
×
184
                TimeShiftParams::Relative { granularity, value },
×
185
            ) if value.is_positive() => {
1✔
186
                let shift = RelativeForwardShift {
×
187
                    step: TimeStep {
×
188
                        granularity,
×
189
                        step: value.unsigned_abs(),
×
190
                    },
×
191
                };
×
192

×
193
                let result_descriptor = shift_result_descriptor(source.result_descriptor(), shift);
×
194

×
195
                Ok(Box::new(InitializedVectorTimeShift {
×
196
                    name,
×
NEW
197
                    path,
×
198
                    source,
×
199
                    result_descriptor,
×
200
                    shift,
×
201
                }))
×
202
            }
203
            (
204
                InitializedSingleRasterOrVectorOperator::Vector(source),
1✔
205
                TimeShiftParams::Relative { granularity, value },
1✔
206
            ) => {
1✔
207
                let shift = RelativeBackwardShift {
1✔
208
                    step: TimeStep {
1✔
209
                        granularity,
1✔
210
                        step: value.unsigned_abs(),
1✔
211
                    },
1✔
212
                };
1✔
213

1✔
214
                let result_descriptor = shift_result_descriptor(source.result_descriptor(), shift);
1✔
215

1✔
216
                Ok(Box::new(InitializedVectorTimeShift {
1✔
217
                    name,
1✔
218
                    path,
1✔
219
                    source,
1✔
220
                    result_descriptor,
1✔
221
                    shift,
1✔
222
                }))
1✔
223
            }
224
            (
225
                InitializedSingleRasterOrVectorOperator::Vector(source),
1✔
226
                TimeShiftParams::Absolute { time_interval },
1✔
227
            ) => {
1✔
228
                let shift = AbsoluteShift { time_interval };
1✔
229

1✔
230
                let result_descriptor = shift_result_descriptor(source.result_descriptor(), shift);
1✔
231

1✔
232
                Ok(Box::new(InitializedVectorTimeShift {
1✔
233
                    name,
1✔
234
                    path,
1✔
235
                    source,
1✔
236
                    result_descriptor,
1✔
237
                    shift,
1✔
238
                }))
1✔
239
            }
240
            (InitializedSingleRasterOrVectorOperator::Raster(_), _) => {
241
                Err(TimeShiftError::UnmatchedOutput.into())
×
242
            }
243
        }
244
    }
4✔
245

246
    span_fn!(TimeShift);
247
}
248

249
#[typetag::serde]
2✔
250
#[async_trait]
251
impl RasterOperator for TimeShift {
252
    async fn _initialize(
253
        self: Box<Self>,
254
        path: WorkflowOperatorPath,
255
        context: &dyn ExecutionContext,
256
    ) -> Result<Box<dyn InitializedRasterOperator>> {
7✔
257
        let name = CanonicOperatorName::from(&self);
7✔
258

259
        let init_sources = self
7✔
260
            .sources
7✔
261
            .initialize_sources(path.clone(), context)
7✔
262
            .await?;
7✔
263

264
        match (init_sources.source, self.params) {
7✔
265
            (
266
                InitializedSingleRasterOrVectorOperator::Raster(source),
4✔
267
                TimeShiftParams::Relative { granularity, value },
4✔
268
            ) if value.is_positive() => {
5✔
269
                let shift = RelativeForwardShift {
4✔
270
                    step: TimeStep {
4✔
271
                        granularity,
4✔
272
                        step: value.unsigned_abs(),
4✔
273
                    },
4✔
274
                };
4✔
275

4✔
276
                let result_descriptor = shift_result_descriptor(source.result_descriptor(), shift);
4✔
277

4✔
278
                Ok(Box::new(InitializedRasterTimeShift {
4✔
279
                    name,
4✔
280
                    path,
4✔
281
                    source,
4✔
282
                    result_descriptor,
4✔
283
                    shift,
4✔
284
                }))
4✔
285
            }
286
            (
287
                InitializedSingleRasterOrVectorOperator::Raster(source),
1✔
288
                TimeShiftParams::Relative { granularity, value },
1✔
289
            ) => {
1✔
290
                let shift = RelativeBackwardShift {
1✔
291
                    step: TimeStep {
1✔
292
                        granularity,
1✔
293
                        step: value.unsigned_abs(),
1✔
294
                    },
1✔
295
                };
1✔
296

1✔
297
                let result_descriptor = shift_result_descriptor(source.result_descriptor(), shift);
1✔
298

1✔
299
                Ok(Box::new(InitializedRasterTimeShift {
1✔
300
                    name,
1✔
301
                    path,
1✔
302
                    source,
1✔
303
                    result_descriptor,
1✔
304
                    shift,
1✔
305
                }))
1✔
306
            }
307
            (
308
                InitializedSingleRasterOrVectorOperator::Raster(source),
2✔
309
                TimeShiftParams::Absolute { time_interval },
2✔
310
            ) => {
2✔
311
                let shift = AbsoluteShift { time_interval };
2✔
312

2✔
313
                let result_descriptor = shift_result_descriptor(source.result_descriptor(), shift);
2✔
314

2✔
315
                Ok(Box::new(InitializedRasterTimeShift {
2✔
316
                    name,
2✔
317
                    path,
2✔
318
                    source,
2✔
319
                    result_descriptor,
2✔
320
                    shift,
2✔
321
                }))
2✔
322
            }
323
            (InitializedSingleRasterOrVectorOperator::Vector(_), _) => {
324
                Err(TimeShiftError::UnmatchedOutput.into())
×
325
            }
326
        }
327
    }
14✔
328

329
    span_fn!(TimeShift);
330
}
331

332
fn shift_result_descriptor<R: ResultDescriptor, S: TimeShiftOperation>(
9✔
333
    result_descriptor: &R,
9✔
334
    shift: S,
9✔
335
) -> R {
9✔
336
    result_descriptor.map_time(|time| {
9✔
337
        if let Some(time) = time {
9✔
338
            shift.shift(*time).map(|r| r.0).ok()
5✔
339
        } else {
340
            None
4✔
341
        }
342
    })
9✔
343
}
9✔
344

345
pub struct InitializedVectorTimeShift<Shift: TimeShiftOperation> {
346
    name: CanonicOperatorName,
347
    path: WorkflowOperatorPath,
348
    source: Box<dyn InitializedVectorOperator>,
349
    result_descriptor: VectorResultDescriptor,
350
    shift: Shift,
351
}
352

353
pub struct InitializedRasterTimeShift<Shift: TimeShiftOperation> {
354
    name: CanonicOperatorName,
355
    path: WorkflowOperatorPath,
356
    source: Box<dyn InitializedRasterOperator>,
357
    result_descriptor: RasterResultDescriptor,
358
    shift: Shift,
359
}
360

361
impl<Shift: TimeShiftOperation + 'static> InitializedVectorOperator
362
    for InitializedVectorTimeShift<Shift>
363
{
364
    fn result_descriptor(&self) -> &VectorResultDescriptor {
×
365
        &self.result_descriptor
×
366
    }
×
367

368
    fn query_processor(&self) -> Result<TypedVectorQueryProcessor> {
2✔
369
        let source_processor = self.source.query_processor()?;
2✔
370

371
        Ok(
372
            call_on_generic_vector_processor!(source_processor, processor => VectorTimeShiftProcessor {
2✔
373
                processor,
×
374
                result_descriptor: self.result_descriptor.clone(),
×
375
                shift: self.shift,
×
376
            }.boxed().into()),
×
377
        )
378
    }
2✔
379

380
    fn canonic_name(&self) -> CanonicOperatorName {
×
381
        self.name.clone()
×
382
    }
×
383

NEW
384
    fn name(&self) -> &'static str {
×
NEW
385
        TimeShift::TYPE_NAME
×
NEW
386
    }
×
387

NEW
388
    fn path(&self) -> WorkflowOperatorPath {
×
NEW
389
        self.path.clone()
×
NEW
390
    }
×
391
}
392

393
impl<Shift: TimeShiftOperation + 'static> InitializedRasterOperator
394
    for InitializedRasterTimeShift<Shift>
395
{
396
    fn result_descriptor(&self) -> &RasterResultDescriptor {
3✔
397
        &self.result_descriptor
3✔
398
    }
3✔
399

400
    fn query_processor(&self) -> Result<TypedRasterQueryProcessor> {
6✔
401
        let source_processor = self.source.query_processor()?;
6✔
402

403
        Ok(
404
            call_on_generic_raster_processor!(source_processor, processor => RasterTimeShiftProcessor {
6✔
405
                processor,
6✔
406
                result_descriptor: self.result_descriptor.clone(),
6✔
407
                shift: self.shift,
6✔
408
            }.boxed().into()),
6✔
409
        )
410
    }
6✔
411

412
    fn canonic_name(&self) -> CanonicOperatorName {
×
413
        self.name.clone()
×
414
    }
×
415

NEW
416
    fn name(&self) -> &'static str {
×
NEW
417
        TimeShift::TYPE_NAME
×
NEW
418
    }
×
419

NEW
420
    fn path(&self) -> WorkflowOperatorPath {
×
NEW
421
        self.path.clone()
×
NEW
422
    }
×
423
}
424

425
pub struct RasterTimeShiftProcessor<Q, P, Shift: TimeShiftOperation>
426
where
427
    Q: RasterQueryProcessor<RasterType = P>,
428
{
429
    processor: Q,
430
    result_descriptor: RasterResultDescriptor,
431
    shift: Shift,
432
}
433

434
pub struct VectorTimeShiftProcessor<Q, G, Shift: TimeShiftOperation>
435
where
436
    G: Geometry,
437
    Q: VectorQueryProcessor<VectorType = FeatureCollection<G>>,
438
{
439
    processor: Q,
440
    result_descriptor: VectorResultDescriptor,
441
    shift: Shift,
442
}
443

444
#[async_trait]
445
impl<Q, G, Shift> VectorQueryProcessor for VectorTimeShiftProcessor<Q, G, Shift>
446
where
447
    G: Geometry + ArrowTyped + 'static,
448
    Q: VectorQueryProcessor<VectorType = FeatureCollection<G>>,
449
    Shift: TimeShiftOperation + 'static,
450
{
451
    type VectorType = FeatureCollection<G>;
452

453
    async fn vector_query<'a>(
454
        &'a self,
455
        query: VectorQueryRectangle,
456
        ctx: &'a dyn QueryContext,
457
    ) -> Result<BoxStream<'a, Result<Self::VectorType>>> {
2✔
458
        let (time_interval, state) = self.shift.shift(query.time_interval)?;
2✔
459

460
        let query = VectorQueryRectangle {
2✔
461
            spatial_bounds: query.spatial_bounds,
2✔
462
            time_interval,
2✔
463
            spatial_resolution: query.spatial_resolution,
2✔
464
            attributes: ColumnSelection::all(),
2✔
465
        };
2✔
466
        let stream = self.processor.vector_query(query, ctx).await?;
2✔
467

468
        let stream = stream.then(move |collection| async move {
2✔
469
            let collection = collection?;
2✔
470
            let shift = self.shift;
2✔
471

2✔
472
            crate::util::spawn_blocking(move || {
2✔
473
                let time_intervals = collection
2✔
474
                    .time_intervals()
2✔
475
                    .iter()
2✔
476
                    .map(move |time| shift.reverse_shift(*time, state))
3✔
477
                    .collect::<Result<Vec<TimeInterval>, TimeShiftError>>()?;
2✔
478

479
                collection
2✔
480
                    .replace_time(&time_intervals)
2✔
481
                    .boxed_context(error::FeatureCollectionTimeModification)
2✔
482
                    .map_err(Into::into)
2✔
483
            })
2✔
484
            .await?
2✔
485
        });
4✔
486

2✔
487
        Ok(stream.boxed())
2✔
488
    }
4✔
489

490
    fn vector_result_descriptor(&self) -> &VectorResultDescriptor {
2✔
491
        &self.result_descriptor
2✔
492
    }
2✔
493
}
494

495
#[async_trait]
496
impl<Q, P, Shift> RasterQueryProcessor for RasterTimeShiftProcessor<Q, P, Shift>
497
where
498
    Q: RasterQueryProcessor<RasterType = P>,
499
    P: Pixel,
500
    Shift: TimeShiftOperation,
501
{
502
    type RasterType = P;
503

504
    async fn raster_query<'a>(
505
        &'a self,
506
        query: RasterQueryRectangle,
507
        ctx: &'a dyn QueryContext,
508
    ) -> Result<BoxStream<'a, Result<RasterTile2D<Self::RasterType>>>> {
6✔
509
        let (time_interval, state) = self.shift.shift(query.time_interval)?;
6✔
510
        let query = RasterQueryRectangle {
6✔
511
            spatial_bounds: query.spatial_bounds,
6✔
512
            time_interval,
6✔
513
            spatial_resolution: query.spatial_resolution,
6✔
514
            attributes: query.attributes,
6✔
515
        };
6✔
516
        let stream = self.processor.raster_query(query, ctx).await?;
6✔
517

518
        let stream = stream.map(move |raster| {
20✔
519
            // reverse time shift for results
520
            let mut raster = raster?;
20✔
521

522
            raster.time = self.shift.reverse_shift(raster.time, state)?;
20✔
523

524
            Ok(raster)
20✔
525
        });
20✔
526

6✔
527
        Ok(Box::pin(stream))
6✔
528
    }
12✔
529

530
    fn raster_result_descriptor(&self) -> &RasterResultDescriptor {
6✔
531
        &self.result_descriptor
6✔
532
    }
6✔
533
}
534

535
#[cfg(test)]
536
mod tests {
537
    use super::*;
538

539
    use crate::{
540
        engine::{
541
            MockExecutionContext, MockQueryContext, MultipleRasterSources, RasterBandDescriptors,
542
            SingleRasterSource,
543
        },
544
        mock::{MockFeatureCollectionSource, MockRasterSource, MockRasterSourceParams},
545
        processing::{Expression, ExpressionParams, RasterStacker, RasterStackerParams},
546
        source::{GdalSource, GdalSourceParameters},
547
        util::{gdal::add_ndvi_dataset, input::RasterOrVectorOperator},
548
    };
549
    use futures::StreamExt;
550
    use geoengine_datatypes::{
551
        collections::{ChunksEqualIgnoringCacheHint, MultiPointCollection},
552
        dataset::NamedData,
553
        primitives::{
554
            BandSelection, BoundingBox2D, CacheHint, DateTime, MultiPoint, SpatialPartition2D,
555
            SpatialResolution, TimeGranularity,
556
        },
557
        raster::{
558
            EmptyGrid2D, GridOrEmpty, RasterDataType, RenameBands, TileInformation,
559
            TilingSpecification,
560
        },
561
        spatial_reference::SpatialReference,
562
        util::test::TestDefault,
563
    };
564

565
    #[test]
566
    fn test_ser_de_absolute() {
1✔
567
        let time_shift = TimeShift {
1✔
568
            sources: SingleRasterOrVectorSource {
1✔
569
                source: RasterOrVectorOperator::Raster(
1✔
570
                    GdalSource {
1✔
571
                        params: GdalSourceParameters {
1✔
572
                            data: NamedData::with_system_name("test-raster"),
1✔
573
                        },
1✔
574
                    }
1✔
575
                    .boxed(),
1✔
576
                ),
1✔
577
            },
1✔
578
            params: TimeShiftParams::Absolute {
1✔
579
                time_interval: TimeInterval::new_unchecked(
1✔
580
                    DateTime::new_utc(2011, 1, 1, 0, 0, 0),
1✔
581
                    DateTime::new_utc(2012, 1, 1, 0, 0, 0),
1✔
582
                ),
1✔
583
            },
1✔
584
        };
1✔
585

1✔
586
        let serialized = serde_json::to_value(&time_shift).unwrap();
1✔
587

1✔
588
        assert_eq!(
1✔
589
            serialized,
1✔
590
            serde_json::json!({
1✔
591
                "params": {
1✔
592
                    "type": "absolute",
1✔
593
                    "timeInterval": {
1✔
594
                        "start": 1_293_840_000_000_i64,
1✔
595
                        "end": 1_325_376_000_000_i64
1✔
596
                    }
1✔
597
                },
1✔
598
                "sources": {
1✔
599
                    "source": {
1✔
600
                        "type": "GdalSource",
1✔
601
                        "params": {
1✔
602
                            "data": "test-raster"
1✔
603
                        }
1✔
604
                    }
1✔
605
                }
1✔
606
            })
1✔
607
        );
1✔
608

609
        let deserialized: TimeShift = serde_json::from_value(serialized).unwrap();
1✔
610

1✔
611
        assert_eq!(time_shift.params, deserialized.params);
1✔
612
    }
1✔
613

614
    #[test]
615
    fn test_ser_de_relative() {
1✔
616
        let time_shift = TimeShift {
1✔
617
            sources: SingleRasterOrVectorSource {
1✔
618
                source: RasterOrVectorOperator::Raster(
1✔
619
                    GdalSource {
1✔
620
                        params: GdalSourceParameters {
1✔
621
                            data: NamedData::with_system_name("test-raster"),
1✔
622
                        },
1✔
623
                    }
1✔
624
                    .boxed(),
1✔
625
                ),
1✔
626
            },
1✔
627
            params: TimeShiftParams::Relative {
1✔
628
                granularity: TimeGranularity::Years,
1✔
629
                value: 1,
1✔
630
            },
1✔
631
        };
1✔
632

1✔
633
        let serialized = serde_json::to_value(&time_shift).unwrap();
1✔
634

1✔
635
        assert_eq!(
1✔
636
            serialized,
1✔
637
            serde_json::json!({
1✔
638
                "params": {
1✔
639
                    "type": "relative",
1✔
640
                    "granularity": "years",
1✔
641
                    "value": 1
1✔
642
                },
1✔
643
                "sources": {
1✔
644
                    "source": {
1✔
645
                        "type": "GdalSource",
1✔
646
                        "params": {
1✔
647
                            "data": "test-raster"
1✔
648
                        }
1✔
649
                    }
1✔
650
                }
1✔
651
            })
1✔
652
        );
1✔
653

654
        let deserialized: TimeShift = serde_json::from_value(serialized).unwrap();
1✔
655

1✔
656
        assert_eq!(time_shift.params, deserialized.params);
1✔
657
    }
1✔
658

659
    #[tokio::test]
660
    async fn test_absolute_vector_shift() {
1✔
661
        let execution_context = MockExecutionContext::test_default();
1✔
662
        let query_context = MockQueryContext::test_default();
1✔
663

1✔
664
        let source = MockFeatureCollectionSource::single(
1✔
665
            MultiPointCollection::from_data(
1✔
666
                MultiPoint::many(vec![(0., 0.), (1., 1.), (2., 2.)]).unwrap(),
1✔
667
                vec![
1✔
668
                    TimeInterval::new(
1✔
669
                        DateTime::new_utc(2009, 1, 1, 0, 0, 0),
1✔
670
                        DateTime::new_utc_with_millis(2010, 12, 31, 23, 59, 59, 999),
1✔
671
                    )
1✔
672
                    .unwrap(),
1✔
673
                    TimeInterval::new(
1✔
674
                        DateTime::new_utc(2009, 6, 3, 0, 0, 0),
1✔
675
                        DateTime::new_utc(2010, 7, 14, 0, 0, 0),
1✔
676
                    )
1✔
677
                    .unwrap(),
1✔
678
                    TimeInterval::new(
1✔
679
                        DateTime::new_utc(2010, 1, 1, 0, 0, 0),
1✔
680
                        DateTime::new_utc_with_millis(2011, 3, 31, 23, 59, 59, 999),
1✔
681
                    )
1✔
682
                    .unwrap(),
1✔
683
                ],
1✔
684
                Default::default(),
1✔
685
                CacheHint::default(),
1✔
686
            )
1✔
687
            .unwrap(),
1✔
688
        );
1✔
689

1✔
690
        let time_shift = TimeShift {
1✔
691
            sources: SingleRasterOrVectorSource {
1✔
692
                source: RasterOrVectorOperator::Vector(source.boxed()),
1✔
693
            },
1✔
694
            params: TimeShiftParams::Absolute {
1✔
695
                time_interval: TimeInterval::new(
1✔
696
                    DateTime::new_utc(2009, 1, 1, 0, 0, 0),
1✔
697
                    DateTime::new_utc(2009, 6, 1, 0, 0, 0),
1✔
698
                )
1✔
699
                .unwrap(),
1✔
700
            },
1✔
701
        };
1✔
702

1✔
703
        let query_processor = VectorOperator::boxed(time_shift)
1✔
704
            .initialize(WorkflowOperatorPath::initialize_root(), &execution_context)
1✔
705
            .await
1✔
706
            .unwrap()
1✔
707
            .query_processor()
1✔
708
            .unwrap()
1✔
709
            .multi_point()
1✔
710
            .unwrap();
1✔
711

1✔
712
        let mut stream = query_processor
1✔
713
            .vector_query(
1✔
714
                VectorQueryRectangle {
1✔
715
                    spatial_bounds: BoundingBox2D::new((0., 0.).into(), (2., 2.).into()).unwrap(),
1✔
716
                    time_interval: TimeInterval::new(
1✔
717
                        DateTime::new_utc(2009, 1, 1, 0, 0, 0),
1✔
718
                        DateTime::new_utc(2012, 1, 1, 0, 0, 0),
1✔
719
                    )
1✔
720
                    .unwrap(),
1✔
721
                    spatial_resolution: SpatialResolution::one(),
1✔
722
                    attributes: ColumnSelection::all(),
1✔
723
                },
1✔
724
                &query_context,
1✔
725
            )
1✔
726
            .await
1✔
727
            .unwrap();
1✔
728

1✔
729
        let mut result = Vec::new();
1✔
730
        while let Some(collection) = stream.next().await {
2✔
731
            result.push(collection.unwrap());
1✔
732
        }
1✔
733

1✔
734
        assert_eq!(result.len(), 1);
1✔
735

1✔
736
        let expected = MultiPointCollection::from_data(
1✔
737
            MultiPoint::many(vec![(0., 0.)]).unwrap(),
1✔
738
            vec![TimeInterval::new(
1✔
739
                DateTime::new_utc(2009, 1, 1, 0, 0, 0),
1✔
740
                DateTime::new_utc_with_millis(2013, 8, 1, 23, 59, 59, 999),
1✔
741
            )
1✔
742
            .unwrap()],
1✔
743
            Default::default(),
1✔
744
            CacheHint::default(),
1✔
745
        )
1✔
746
        .unwrap();
1✔
747

1✔
748
        assert!(result[0].chunks_equal_ignoring_cache_hint(&expected));
1✔
749
    }
1✔
750

751
    #[tokio::test]
752
    async fn test_relative_vector_shift() {
1✔
753
        let execution_context = MockExecutionContext::test_default();
1✔
754
        let query_context = MockQueryContext::test_default();
1✔
755

1✔
756
        let source = MockFeatureCollectionSource::single(
1✔
757
            MultiPointCollection::from_data(
1✔
758
                MultiPoint::many(vec![(0., 0.), (1., 1.), (2., 2.)]).unwrap(),
1✔
759
                vec![
1✔
760
                    TimeInterval::new(
1✔
761
                        DateTime::new_utc(2009, 1, 1, 0, 0, 0),
1✔
762
                        DateTime::new_utc_with_millis(2010, 12, 31, 23, 59, 59, 999),
1✔
763
                    )
1✔
764
                    .unwrap(),
1✔
765
                    TimeInterval::new(
1✔
766
                        DateTime::new_utc(2009, 6, 3, 0, 0, 0),
1✔
767
                        DateTime::new_utc(2010, 7, 14, 0, 0, 0),
1✔
768
                    )
1✔
769
                    .unwrap(),
1✔
770
                    TimeInterval::new(
1✔
771
                        DateTime::new_utc(2010, 1, 1, 0, 0, 0),
1✔
772
                        DateTime::new_utc_with_millis(2011, 3, 31, 23, 59, 59, 999),
1✔
773
                    )
1✔
774
                    .unwrap(),
1✔
775
                ],
1✔
776
                Default::default(),
1✔
777
                CacheHint::default(),
1✔
778
            )
1✔
779
            .unwrap(),
1✔
780
        );
1✔
781

1✔
782
        let time_shift = TimeShift {
1✔
783
            sources: SingleRasterOrVectorSource {
1✔
784
                source: RasterOrVectorOperator::Vector(source.boxed()),
1✔
785
            },
1✔
786
            params: TimeShiftParams::Relative {
1✔
787
                granularity: TimeGranularity::Years,
1✔
788
                value: -1,
1✔
789
            },
1✔
790
        };
1✔
791

1✔
792
        let query_processor = VectorOperator::boxed(time_shift)
1✔
793
            .initialize(WorkflowOperatorPath::initialize_root(), &execution_context)
1✔
794
            .await
1✔
795
            .unwrap()
1✔
796
            .query_processor()
1✔
797
            .unwrap()
1✔
798
            .multi_point()
1✔
799
            .unwrap();
1✔
800

1✔
801
        let mut stream = query_processor
1✔
802
            .vector_query(
1✔
803
                VectorQueryRectangle {
1✔
804
                    spatial_bounds: BoundingBox2D::new((0., 0.).into(), (2., 2.).into()).unwrap(),
1✔
805
                    time_interval: TimeInterval::new(
1✔
806
                        DateTime::new_utc(2010, 1, 1, 0, 0, 0),
1✔
807
                        DateTime::new_utc(2011, 1, 1, 0, 0, 0),
1✔
808
                    )
1✔
809
                    .unwrap(),
1✔
810
                    spatial_resolution: SpatialResolution::one(),
1✔
811
                    attributes: ColumnSelection::all(),
1✔
812
                },
1✔
813
                &query_context,
1✔
814
            )
1✔
815
            .await
1✔
816
            .unwrap();
1✔
817

1✔
818
        let mut result = Vec::new();
1✔
819
        while let Some(collection) = stream.next().await {
2✔
820
            result.push(collection.unwrap());
1✔
821
        }
1✔
822

1✔
823
        assert_eq!(result.len(), 1);
1✔
824

1✔
825
        let expected = MultiPointCollection::from_data(
1✔
826
            MultiPoint::many(vec![(0., 0.), (1., 1.)]).unwrap(),
1✔
827
            vec![
1✔
828
                TimeInterval::new(
1✔
829
                    DateTime::new_utc(2010, 1, 1, 0, 0, 0),
1✔
830
                    DateTime::new_utc_with_millis(2011, 12, 31, 23, 59, 59, 999),
1✔
831
                )
1✔
832
                .unwrap(),
1✔
833
                TimeInterval::new(
1✔
834
                    DateTime::new_utc(2010, 6, 3, 0, 0, 0),
1✔
835
                    DateTime::new_utc(2011, 7, 14, 0, 0, 0),
1✔
836
                )
1✔
837
                .unwrap(),
1✔
838
            ],
1✔
839
            Default::default(),
1✔
840
            CacheHint::default(),
1✔
841
        )
1✔
842
        .unwrap();
1✔
843

1✔
844
        assert!(result[0].chunks_equal_ignoring_cache_hint(&expected));
1✔
845
    }
1✔
846

847
    #[tokio::test]
848
    #[allow(clippy::too_many_lines)]
849
    async fn test_absolute_raster_shift() {
1✔
850
        let empty_grid = GridOrEmpty::Empty(EmptyGrid2D::<u8>::new([3, 2].into()));
1✔
851
        let raster_tiles = vec![
1✔
852
            RasterTile2D::new_with_tile_info(
1✔
853
                TimeInterval::new_unchecked(
1✔
854
                    DateTime::new_utc(2010, 1, 1, 0, 0, 0),
1✔
855
                    DateTime::new_utc(2011, 1, 1, 0, 0, 0),
1✔
856
                ),
1✔
857
                TileInformation {
1✔
858
                    global_tile_position: [-1, 0].into(),
1✔
859
                    tile_size_in_pixels: [3, 2].into(),
1✔
860
                    global_geo_transform: TestDefault::test_default(),
1✔
861
                },
1✔
862
                0,
1✔
863
                empty_grid.clone(),
1✔
864
                CacheHint::default(),
1✔
865
            ),
1✔
866
            RasterTile2D::new_with_tile_info(
1✔
867
                TimeInterval::new_unchecked(
1✔
868
                    DateTime::new_utc(2010, 1, 1, 0, 0, 0),
1✔
869
                    DateTime::new_utc(2011, 1, 1, 0, 0, 0),
1✔
870
                ),
1✔
871
                TileInformation {
1✔
872
                    global_tile_position: [-1, 1].into(),
1✔
873
                    tile_size_in_pixels: [3, 2].into(),
1✔
874
                    global_geo_transform: TestDefault::test_default(),
1✔
875
                },
1✔
876
                0,
1✔
877
                empty_grid.clone(),
1✔
878
                CacheHint::default(),
1✔
879
            ),
1✔
880
            RasterTile2D::new_with_tile_info(
1✔
881
                TimeInterval::new_unchecked(
1✔
882
                    DateTime::new_utc(2011, 1, 1, 0, 0, 0),
1✔
883
                    DateTime::new_utc(2012, 1, 1, 0, 0, 0),
1✔
884
                ),
1✔
885
                TileInformation {
1✔
886
                    global_tile_position: [-1, 0].into(),
1✔
887
                    tile_size_in_pixels: [3, 2].into(),
1✔
888
                    global_geo_transform: TestDefault::test_default(),
1✔
889
                },
1✔
890
                0,
1✔
891
                empty_grid.clone(),
1✔
892
                CacheHint::default(),
1✔
893
            ),
1✔
894
            RasterTile2D::new_with_tile_info(
1✔
895
                TimeInterval::new_unchecked(
1✔
896
                    DateTime::new_utc(2011, 1, 1, 0, 0, 0),
1✔
897
                    DateTime::new_utc(2012, 1, 1, 0, 0, 0),
1✔
898
                ),
1✔
899
                TileInformation {
1✔
900
                    global_tile_position: [-1, 1].into(),
1✔
901
                    tile_size_in_pixels: [3, 2].into(),
1✔
902
                    global_geo_transform: TestDefault::test_default(),
1✔
903
                },
1✔
904
                0,
1✔
905
                empty_grid.clone(),
1✔
906
                CacheHint::default(),
1✔
907
            ),
1✔
908
            RasterTile2D::new_with_tile_info(
1✔
909
                TimeInterval::new_unchecked(
1✔
910
                    DateTime::new_utc(2012, 1, 1, 0, 0, 0),
1✔
911
                    DateTime::new_utc(2013, 1, 1, 0, 0, 0),
1✔
912
                ),
1✔
913
                TileInformation {
1✔
914
                    global_tile_position: [-1, 0].into(),
1✔
915
                    tile_size_in_pixels: [3, 2].into(),
1✔
916
                    global_geo_transform: TestDefault::test_default(),
1✔
917
                },
1✔
918
                0,
1✔
919
                empty_grid.clone(),
1✔
920
                CacheHint::default(),
1✔
921
            ),
1✔
922
            RasterTile2D::new_with_tile_info(
1✔
923
                TimeInterval::new_unchecked(
1✔
924
                    DateTime::new_utc(2012, 1, 1, 0, 0, 0),
1✔
925
                    DateTime::new_utc(2013, 1, 1, 0, 0, 0),
1✔
926
                ),
1✔
927
                TileInformation {
1✔
928
                    global_tile_position: [-1, 1].into(),
1✔
929
                    tile_size_in_pixels: [3, 2].into(),
1✔
930
                    global_geo_transform: TestDefault::test_default(),
1✔
931
                },
1✔
932
                0,
1✔
933
                empty_grid.clone(),
1✔
934
                CacheHint::default(),
1✔
935
            ),
1✔
936
        ];
1✔
937

1✔
938
        let mrs = MockRasterSource {
1✔
939
            params: MockRasterSourceParams {
1✔
940
                data: raster_tiles,
1✔
941
                result_descriptor: RasterResultDescriptor {
1✔
942
                    data_type: RasterDataType::U8,
1✔
943
                    spatial_reference: SpatialReference::epsg_4326().into(),
1✔
944
                    time: None,
1✔
945
                    bbox: None,
1✔
946
                    resolution: None,
1✔
947
                    bands: RasterBandDescriptors::new_single_band(),
1✔
948
                },
1✔
949
            },
1✔
950
        }
1✔
951
        .boxed();
1✔
952

1✔
953
        let time_shift = TimeShift {
1✔
954
            sources: SingleRasterOrVectorSource {
1✔
955
                source: RasterOrVectorOperator::Raster(mrs),
1✔
956
            },
1✔
957
            params: TimeShiftParams::Absolute {
1✔
958
                time_interval: TimeInterval::new_unchecked(
1✔
959
                    DateTime::new_utc(2011, 1, 1, 0, 0, 0),
1✔
960
                    DateTime::new_utc(2012, 1, 1, 0, 0, 0),
1✔
961
                ),
1✔
962
            },
1✔
963
        };
1✔
964

1✔
965
        let execution_context = MockExecutionContext::new_with_tiling_spec(
1✔
966
            TilingSpecification::new((0., 0.).into(), [3, 2].into()),
1✔
967
        );
1✔
968
        let query_context = MockQueryContext::test_default();
1✔
969

1✔
970
        let query_processor = RasterOperator::boxed(time_shift)
1✔
971
            .initialize(WorkflowOperatorPath::initialize_root(), &execution_context)
1✔
972
            .await
1✔
973
            .unwrap()
1✔
974
            .query_processor()
1✔
975
            .unwrap()
1✔
976
            .get_u8()
1✔
977
            .unwrap();
1✔
978

1✔
979
        let mut stream = query_processor
1✔
980
            .raster_query(
1✔
981
                RasterQueryRectangle {
1✔
982
                    spatial_bounds: SpatialPartition2D::new_unchecked(
1✔
983
                        (0., 3.).into(),
1✔
984
                        (4., 0.).into(),
1✔
985
                    ),
1✔
986
                    time_interval: TimeInterval::new(
1✔
987
                        DateTime::new_utc(2010, 1, 1, 0, 0, 0),
1✔
988
                        DateTime::new_utc(2011, 1, 1, 0, 0, 0),
1✔
989
                    )
1✔
990
                    .unwrap(),
1✔
991
                    spatial_resolution: SpatialResolution::one(),
1✔
992
                    attributes: BandSelection::first(),
1✔
993
                },
1✔
994
                &query_context,
1✔
995
            )
1✔
996
            .await
1✔
997
            .unwrap();
1✔
998

1✔
999
        let mut result = Vec::new();
1✔
1000
        while let Some(tile) = stream.next().await {
3✔
1001
            result.push(tile.unwrap());
2✔
1002
        }
2✔
1003

1✔
1004
        assert_eq!(result.len(), 2);
1✔
1005

1✔
1006
        assert_eq!(
1✔
1007
            result[0].time,
1✔
1008
            TimeInterval::new_unchecked(
1✔
1009
                DateTime::new_utc(2010, 1, 1, 0, 0, 0),
1✔
1010
                DateTime::new_utc(2011, 1, 1, 0, 0, 0),
1✔
1011
            ),
1✔
1012
        );
1✔
1013
        assert_eq!(
1✔
1014
            result[1].time,
1✔
1015
            TimeInterval::new_unchecked(
1✔
1016
                DateTime::new_utc(2010, 1, 1, 0, 0, 0),
1✔
1017
                DateTime::new_utc(2011, 1, 1, 0, 0, 0),
1✔
1018
            ),
1✔
1019
        );
1✔
1020
    }
1✔
1021

1022
    #[tokio::test]
1023
    #[allow(clippy::too_many_lines)]
1024
    async fn test_relative_raster_shift() {
1✔
1025
        let empty_grid = GridOrEmpty::Empty(EmptyGrid2D::<u8>::new([3, 2].into()));
1✔
1026
        let raster_tiles = vec![
1✔
1027
            RasterTile2D::new_with_tile_info(
1✔
1028
                TimeInterval::new_unchecked(
1✔
1029
                    DateTime::new_utc(2010, 1, 1, 0, 0, 0),
1✔
1030
                    DateTime::new_utc(2011, 1, 1, 0, 0, 0),
1✔
1031
                ),
1✔
1032
                TileInformation {
1✔
1033
                    global_tile_position: [-1, 0].into(),
1✔
1034
                    tile_size_in_pixels: [3, 2].into(),
1✔
1035
                    global_geo_transform: TestDefault::test_default(),
1✔
1036
                },
1✔
1037
                0,
1✔
1038
                empty_grid.clone(),
1✔
1039
                CacheHint::default(),
1✔
1040
            ),
1✔
1041
            RasterTile2D::new_with_tile_info(
1✔
1042
                TimeInterval::new_unchecked(
1✔
1043
                    DateTime::new_utc(2010, 1, 1, 0, 0, 0),
1✔
1044
                    DateTime::new_utc(2011, 1, 1, 0, 0, 0),
1✔
1045
                ),
1✔
1046
                TileInformation {
1✔
1047
                    global_tile_position: [-1, 1].into(),
1✔
1048
                    tile_size_in_pixels: [3, 2].into(),
1✔
1049
                    global_geo_transform: TestDefault::test_default(),
1✔
1050
                },
1✔
1051
                0,
1✔
1052
                empty_grid.clone(),
1✔
1053
                CacheHint::default(),
1✔
1054
            ),
1✔
1055
            RasterTile2D::new_with_tile_info(
1✔
1056
                TimeInterval::new_unchecked(
1✔
1057
                    DateTime::new_utc(2011, 1, 1, 0, 0, 0),
1✔
1058
                    DateTime::new_utc(2012, 1, 1, 0, 0, 0),
1✔
1059
                ),
1✔
1060
                TileInformation {
1✔
1061
                    global_tile_position: [-1, 0].into(),
1✔
1062
                    tile_size_in_pixels: [3, 2].into(),
1✔
1063
                    global_geo_transform: TestDefault::test_default(),
1✔
1064
                },
1✔
1065
                0,
1✔
1066
                empty_grid.clone(),
1✔
1067
                CacheHint::default(),
1✔
1068
            ),
1✔
1069
            RasterTile2D::new_with_tile_info(
1✔
1070
                TimeInterval::new_unchecked(
1✔
1071
                    DateTime::new_utc(2011, 1, 1, 0, 0, 0),
1✔
1072
                    DateTime::new_utc(2012, 1, 1, 0, 0, 0),
1✔
1073
                ),
1✔
1074
                TileInformation {
1✔
1075
                    global_tile_position: [-1, 1].into(),
1✔
1076
                    tile_size_in_pixels: [3, 2].into(),
1✔
1077
                    global_geo_transform: TestDefault::test_default(),
1✔
1078
                },
1✔
1079
                0,
1✔
1080
                empty_grid.clone(),
1✔
1081
                CacheHint::default(),
1✔
1082
            ),
1✔
1083
            RasterTile2D::new_with_tile_info(
1✔
1084
                TimeInterval::new_unchecked(
1✔
1085
                    DateTime::new_utc(2012, 1, 1, 0, 0, 0),
1✔
1086
                    DateTime::new_utc(2013, 1, 1, 0, 0, 0),
1✔
1087
                ),
1✔
1088
                TileInformation {
1✔
1089
                    global_tile_position: [-1, 0].into(),
1✔
1090
                    tile_size_in_pixels: [3, 2].into(),
1✔
1091
                    global_geo_transform: TestDefault::test_default(),
1✔
1092
                },
1✔
1093
                0,
1✔
1094
                empty_grid.clone(),
1✔
1095
                CacheHint::default(),
1✔
1096
            ),
1✔
1097
            RasterTile2D::new_with_tile_info(
1✔
1098
                TimeInterval::new_unchecked(
1✔
1099
                    DateTime::new_utc(2012, 1, 1, 0, 0, 0),
1✔
1100
                    DateTime::new_utc(2013, 1, 1, 0, 0, 0),
1✔
1101
                ),
1✔
1102
                TileInformation {
1✔
1103
                    global_tile_position: [-1, 1].into(),
1✔
1104
                    tile_size_in_pixels: [3, 2].into(),
1✔
1105
                    global_geo_transform: TestDefault::test_default(),
1✔
1106
                },
1✔
1107
                0,
1✔
1108
                empty_grid.clone(),
1✔
1109
                CacheHint::default(),
1✔
1110
            ),
1✔
1111
        ];
1✔
1112

1✔
1113
        let mrs = MockRasterSource {
1✔
1114
            params: MockRasterSourceParams {
1✔
1115
                data: raster_tiles,
1✔
1116
                result_descriptor: RasterResultDescriptor {
1✔
1117
                    data_type: RasterDataType::U8,
1✔
1118
                    spatial_reference: SpatialReference::epsg_4326().into(),
1✔
1119
                    time: None,
1✔
1120
                    bbox: None,
1✔
1121
                    resolution: None,
1✔
1122
                    bands: RasterBandDescriptors::new_single_band(),
1✔
1123
                },
1✔
1124
            },
1✔
1125
        }
1✔
1126
        .boxed();
1✔
1127

1✔
1128
        let time_shift = TimeShift {
1✔
1129
            sources: SingleRasterOrVectorSource {
1✔
1130
                source: RasterOrVectorOperator::Raster(mrs),
1✔
1131
            },
1✔
1132
            params: TimeShiftParams::Relative {
1✔
1133
                granularity: TimeGranularity::Years,
1✔
1134
                value: 1,
1✔
1135
            },
1✔
1136
        };
1✔
1137

1✔
1138
        let execution_context = MockExecutionContext::new_with_tiling_spec(
1✔
1139
            TilingSpecification::new((0., 0.).into(), [3, 2].into()),
1✔
1140
        );
1✔
1141
        let query_context = MockQueryContext::test_default();
1✔
1142

1✔
1143
        let query_processor = RasterOperator::boxed(time_shift)
1✔
1144
            .initialize(WorkflowOperatorPath::initialize_root(), &execution_context)
1✔
1145
            .await
1✔
1146
            .unwrap()
1✔
1147
            .query_processor()
1✔
1148
            .unwrap()
1✔
1149
            .get_u8()
1✔
1150
            .unwrap();
1✔
1151

1✔
1152
        let mut stream = query_processor
1✔
1153
            .raster_query(
1✔
1154
                RasterQueryRectangle {
1✔
1155
                    spatial_bounds: SpatialPartition2D::new_unchecked(
1✔
1156
                        (0., 3.).into(),
1✔
1157
                        (4., 0.).into(),
1✔
1158
                    ),
1✔
1159
                    time_interval: TimeInterval::new(
1✔
1160
                        DateTime::new_utc(2010, 1, 1, 0, 0, 0),
1✔
1161
                        DateTime::new_utc(2011, 1, 1, 0, 0, 0),
1✔
1162
                    )
1✔
1163
                    .unwrap(),
1✔
1164
                    spatial_resolution: SpatialResolution::one(),
1✔
1165
                    attributes: BandSelection::first(),
1✔
1166
                },
1✔
1167
                &query_context,
1✔
1168
            )
1✔
1169
            .await
1✔
1170
            .unwrap();
1✔
1171

1✔
1172
        let mut result = Vec::new();
1✔
1173
        while let Some(tile) = stream.next().await {
3✔
1174
            result.push(tile.unwrap());
2✔
1175
        }
2✔
1176

1✔
1177
        assert_eq!(result.len(), 2);
1✔
1178

1✔
1179
        assert_eq!(
1✔
1180
            result[0].time,
1✔
1181
            TimeInterval::new_unchecked(
1✔
1182
                DateTime::new_utc(2010, 1, 1, 0, 0, 0),
1✔
1183
                DateTime::new_utc(2011, 1, 1, 0, 0, 0),
1✔
1184
            ),
1✔
1185
        );
1✔
1186
        assert_eq!(
1✔
1187
            result[1].time,
1✔
1188
            TimeInterval::new_unchecked(
1✔
1189
                DateTime::new_utc(2010, 1, 1, 0, 0, 0),
1✔
1190
                DateTime::new_utc(2011, 1, 1, 0, 0, 0),
1✔
1191
            ),
1✔
1192
        );
1✔
1193
    }
1✔
1194

1195
    #[tokio::test]
1196
    async fn test_expression_on_shifted_raster() {
1✔
1197
        let mut execution_context = MockExecutionContext::test_default();
1✔
1198

1✔
1199
        let ndvi_source = GdalSource {
1✔
1200
            params: GdalSourceParameters {
1✔
1201
                data: add_ndvi_dataset(&mut execution_context),
1✔
1202
            },
1✔
1203
        }
1✔
1204
        .boxed();
1✔
1205

1✔
1206
        let shifted_ndvi_source = RasterOperator::boxed(TimeShift {
1✔
1207
            params: TimeShiftParams::Relative {
1✔
1208
                granularity: TimeGranularity::Months,
1✔
1209
                value: -1,
1✔
1210
            },
1✔
1211
            sources: SingleRasterOrVectorSource {
1✔
1212
                source: RasterOrVectorOperator::Raster(ndvi_source.clone()),
1✔
1213
            },
1✔
1214
        });
1✔
1215

1✔
1216
        let expression = Expression {
1✔
1217
            params: ExpressionParams {
1✔
1218
                expression: "A - B".to_string(),
1✔
1219
                output_type: RasterDataType::F64,
1✔
1220
                output_band: None,
1✔
1221
                map_no_data: false,
1✔
1222
            },
1✔
1223
            sources: SingleRasterSource {
1✔
1224
                raster: RasterStacker {
1✔
1225
                    params: RasterStackerParams {
1✔
1226
                        rename_bands: RenameBands::Default,
1✔
1227
                    },
1✔
1228
                    sources: MultipleRasterSources {
1✔
1229
                        rasters: vec![ndvi_source, shifted_ndvi_source],
1✔
1230
                    },
1✔
1231
                }
1✔
1232
                .boxed(),
1✔
1233
            },
1✔
1234
        }
1✔
1235
        .boxed();
1✔
1236

1✔
1237
        let query_processor = expression
1✔
1238
            .initialize(WorkflowOperatorPath::initialize_root(), &execution_context)
1✔
1239
            .await
1✔
1240
            .unwrap()
1✔
1241
            .query_processor()
1✔
1242
            .unwrap()
1✔
1243
            .get_f64()
1✔
1244
            .unwrap();
1✔
1245

1✔
1246
        let query_context = MockQueryContext::test_default();
1✔
1247

1✔
1248
        let mut stream = query_processor
1✔
1249
            .raster_query(
1✔
1250
                RasterQueryRectangle {
1✔
1251
                    spatial_bounds: SpatialPartition2D::new_unchecked(
1✔
1252
                        (-180., 90.).into(),
1✔
1253
                        (180., -90.).into(),
1✔
1254
                    ),
1✔
1255
                    time_interval: TimeInterval::new_instant(DateTime::new_utc(
1✔
1256
                        2014, 3, 1, 0, 0, 0,
1✔
1257
                    ))
1✔
1258
                    .unwrap(),
1✔
1259
                    spatial_resolution: SpatialResolution::one(),
1✔
1260
                    attributes: BandSelection::first(),
1✔
1261
                },
1✔
1262
                &query_context,
1✔
1263
            )
1✔
1264
            .await
1✔
1265
            .unwrap();
1✔
1266

1✔
1267
        let mut result = Vec::new();
1✔
1268
        while let Some(tile) = stream.next().await {
5✔
1269
            result.push(tile.unwrap());
4✔
1270
        }
4✔
1271

1✔
1272
        assert_eq!(result.len(), 4);
1✔
1273
        assert_eq!(
1✔
1274
            result[0].time,
1✔
1275
            TimeInterval::new(
1✔
1276
                DateTime::new_utc(2014, 3, 1, 0, 0, 0),
1✔
1277
                DateTime::new_utc(2014, 4, 1, 0, 0, 0)
1✔
1278
            )
1✔
1279
            .unwrap()
1✔
1280
        );
1✔
1281
    }
1✔
1282

1283
    #[tokio::test]
1284
    async fn test_expression_on_absolute_shifted_raster() {
1✔
1285
        let mut execution_context = MockExecutionContext::test_default();
1✔
1286

1✔
1287
        let ndvi_source = GdalSource {
1✔
1288
            params: GdalSourceParameters {
1✔
1289
                data: add_ndvi_dataset(&mut execution_context),
1✔
1290
            },
1✔
1291
        }
1✔
1292
        .boxed();
1✔
1293

1✔
1294
        let shifted_ndvi_source = RasterOperator::boxed(TimeShift {
1✔
1295
            params: TimeShiftParams::Absolute {
1✔
1296
                time_interval: TimeInterval::new_instant(DateTime::new_utc(2014, 5, 1, 0, 0, 0))
1✔
1297
                    .unwrap(),
1✔
1298
            },
1✔
1299
            sources: SingleRasterOrVectorSource {
1✔
1300
                source: RasterOrVectorOperator::Raster(ndvi_source),
1✔
1301
            },
1✔
1302
        });
1✔
1303

1✔
1304
        let query_processor = shifted_ndvi_source
1✔
1305
            .initialize(WorkflowOperatorPath::initialize_root(), &execution_context)
1✔
1306
            .await
1✔
1307
            .unwrap()
1✔
1308
            .query_processor()
1✔
1309
            .unwrap()
1✔
1310
            .get_u8()
1✔
1311
            .unwrap();
1✔
1312

1✔
1313
        let query_context = MockQueryContext::test_default();
1✔
1314

1✔
1315
        let mut stream = query_processor
1✔
1316
            .raster_query(
1✔
1317
                RasterQueryRectangle {
1✔
1318
                    spatial_bounds: SpatialPartition2D::new_unchecked(
1✔
1319
                        (-180., 90.).into(),
1✔
1320
                        (180., -90.).into(),
1✔
1321
                    ),
1✔
1322
                    time_interval: TimeInterval::new_instant(DateTime::new_utc(
1✔
1323
                        2014, 3, 1, 0, 0, 0,
1✔
1324
                    ))
1✔
1325
                    .unwrap(),
1✔
1326
                    spatial_resolution: SpatialResolution::one(),
1✔
1327
                    attributes: BandSelection::first(),
1✔
1328
                },
1✔
1329
                &query_context,
1✔
1330
            )
1✔
1331
            .await
1✔
1332
            .unwrap();
1✔
1333

1✔
1334
        let mut result = Vec::new();
1✔
1335
        while let Some(tile) = stream.next().await {
5✔
1336
            result.push(tile.unwrap());
4✔
1337
        }
4✔
1338

1✔
1339
        assert_eq!(result.len(), 4);
1✔
1340
        assert_eq!(
1✔
1341
            result[0].time,
1✔
1342
            TimeInterval::new(
1✔
1343
                DateTime::new_utc(2014, 3, 1, 0, 0, 0),
1✔
1344
                DateTime::new_utc(2014, 4, 1, 0, 0, 0)
1✔
1345
            )
1✔
1346
            .unwrap()
1✔
1347
        );
1✔
1348
    }
1✔
1349

1350
    #[test]
1351
    fn shift_eternal() {
1✔
1352
        let eternal = TimeInterval::default();
1✔
1353

1✔
1354
        let f_shift = RelativeForwardShift {
1✔
1355
            step: TimeStep {
1✔
1356
                granularity: TimeGranularity::Seconds,
1✔
1357
                step: 1,
1✔
1358
            },
1✔
1359
        };
1✔
1360

1✔
1361
        let (shifted, state) = f_shift.shift(eternal).unwrap();
1✔
1362
        assert_eq!(shifted, eternal);
1✔
1363
        let reverse_shifted = f_shift.reverse_shift(eternal, state).unwrap();
1✔
1364
        assert_eq!(reverse_shifted, eternal);
1✔
1365

1366
        let b_shift = RelativeBackwardShift {
1✔
1367
            step: TimeStep {
1✔
1368
                granularity: TimeGranularity::Seconds,
1✔
1369
                step: 1,
1✔
1370
            },
1✔
1371
        };
1✔
1372

1✔
1373
        let (shifted, state) = b_shift.shift(eternal).unwrap();
1✔
1374
        assert_eq!(shifted, eternal);
1✔
1375

1376
        let reverse_shifted = b_shift.reverse_shift(eternal, state).unwrap();
1✔
1377
        assert_eq!(reverse_shifted, eternal);
1✔
1378
    }
1✔
1379

1380
    #[test]
1381
    fn shift_begin_of_time() {
1✔
1382
        let time = TimeInterval::new_unchecked(TimeInstance::MIN, TimeInstance::EPOCH_START);
1✔
1383

1✔
1384
        let f_shift = RelativeForwardShift {
1✔
1385
            step: TimeStep {
1✔
1386
                granularity: TimeGranularity::Seconds,
1✔
1387
                step: 1,
1✔
1388
            },
1✔
1389
        };
1✔
1390

1✔
1391
        let expected_shift =
1✔
1392
            TimeInterval::new_unchecked(TimeInstance::MIN, TimeInstance::EPOCH_START + 1_000);
1✔
1393

1✔
1394
        let (shifted, state) = f_shift.shift(time).unwrap();
1✔
1395
        assert_eq!(shifted, expected_shift);
1✔
1396

1397
        let reverse_shifted = f_shift.reverse_shift(expected_shift, state).unwrap();
1✔
1398
        assert_eq!(reverse_shifted, time);
1✔
1399

1400
        let f_shift = RelativeBackwardShift {
1✔
1401
            step: TimeStep {
1✔
1402
                granularity: TimeGranularity::Seconds,
1✔
1403
                step: 1,
1✔
1404
            },
1✔
1405
        };
1✔
1406

1✔
1407
        let expected_shift =
1✔
1408
            TimeInterval::new_unchecked(TimeInstance::MIN, TimeInstance::EPOCH_START - 1_000);
1✔
1409

1✔
1410
        let (shifted, state) = f_shift.shift(time).unwrap();
1✔
1411
        assert_eq!(shifted, expected_shift);
1✔
1412

1413
        let reverse_shifted = f_shift.reverse_shift(expected_shift, state).unwrap();
1✔
1414
        assert_eq!(reverse_shifted, time);
1✔
1415
    }
1✔
1416

1417
    #[test]
1418
    fn shift_end_of_time() {
1✔
1419
        let time = TimeInterval::new_unchecked(TimeInstance::EPOCH_START, TimeInstance::MAX);
1✔
1420

1✔
1421
        let f_shift = RelativeForwardShift {
1✔
1422
            step: TimeStep {
1✔
1423
                granularity: TimeGranularity::Seconds,
1✔
1424
                step: 1,
1✔
1425
            },
1✔
1426
        };
1✔
1427

1✔
1428
        let expected_shift =
1✔
1429
            TimeInterval::new_unchecked(TimeInstance::EPOCH_START + 1_000, TimeInstance::MAX);
1✔
1430

1✔
1431
        let (shifted, state) = f_shift.shift(time).unwrap();
1✔
1432
        assert_eq!(shifted, expected_shift);
1✔
1433

1434
        let reverse_shifted = f_shift.reverse_shift(expected_shift, state).unwrap();
1✔
1435
        assert_eq!(reverse_shifted, time);
1✔
1436

1437
        let f_shift = RelativeBackwardShift {
1✔
1438
            step: TimeStep {
1✔
1439
                granularity: TimeGranularity::Seconds,
1✔
1440
                step: 1,
1✔
1441
            },
1✔
1442
        };
1✔
1443

1✔
1444
        let expected_shift =
1✔
1445
            TimeInterval::new_unchecked(TimeInstance::EPOCH_START - 1_000, TimeInstance::MAX);
1✔
1446

1✔
1447
        let (shifted, state) = f_shift.shift(time).unwrap();
1✔
1448
        assert_eq!(shifted, expected_shift);
1✔
1449

1450
        let reverse_shifted = f_shift.reverse_shift(expected_shift, state).unwrap();
1✔
1451
        assert_eq!(reverse_shifted, time);
1✔
1452
    }
1✔
1453
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc