• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 10178074589

31 Jul 2024 09:34AM UTC coverage: 91.068% (+0.4%) from 90.682%
10178074589

push

github

web-flow
Merge pull request #973 from geo-engine/remove-XGB-update-toolchain

Remove-XGB-update-toolchain

81 of 88 new or added lines in 29 files covered. (92.05%)

456 existing lines in 119 files now uncovered.

131088 of 143945 relevant lines covered (91.07%)

53581.03 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.63
/operators/src/processing/interpolation/mod.rs
1
use std::marker::PhantomData;
2
use std::sync::Arc;
3

4
use crate::adapters::{
5
    FoldTileAccu, FoldTileAccuMut, RasterSubQueryAdapter, SubQueryTileAggregator,
6
};
7
use crate::engine::{
8
    CanonicOperatorName, ExecutionContext, InitializedRasterOperator, InitializedSources, Operator,
9
    OperatorName, QueryContext, QueryProcessor, RasterOperator, RasterQueryProcessor,
10
    RasterResultDescriptor, SingleRasterSource, TypedRasterQueryProcessor, WorkflowOperatorPath,
11
};
12
use crate::util::Result;
13
use async_trait::async_trait;
14
use futures::future::BoxFuture;
15
use futures::stream::BoxStream;
16
use futures::{Future, FutureExt, TryFuture, TryFutureExt};
17
use geoengine_datatypes::primitives::{
18
    AxisAlignedRectangle, Coordinate2D, RasterQueryRectangle, SpatialPartition2D,
19
    SpatialPartitioned, SpatialResolution, TimeInstance, TimeInterval,
20
};
21
use geoengine_datatypes::primitives::{BandSelection, CacheHint};
22
use geoengine_datatypes::raster::{
23
    Bilinear, Blit, EmptyGrid2D, GeoTransform, GridOrEmpty, GridSize, InterpolationAlgorithm,
24
    NearestNeighbor, Pixel, RasterTile2D, TileInformation, TilingSpecification,
25
};
26
use rayon::ThreadPool;
27
use serde::{Deserialize, Serialize};
28
use snafu::{ensure, Snafu};
29

UNCOV
30
#[derive(Debug, Serialize, Deserialize, Clone)]
×
31
#[serde(rename_all = "camelCase")]
32
pub struct InterpolationParams {
33
    pub interpolation: InterpolationMethod,
34
    pub input_resolution: InputResolution,
35
}
36

UNCOV
37
#[derive(Debug, Serialize, Deserialize, Clone, Copy)]
×
38
#[serde(rename_all = "camelCase", tag = "type")]
39
pub enum InputResolution {
40
    Value(SpatialResolution),
41
    Source,
42
}
43

UNCOV
44
#[derive(Debug, Serialize, Deserialize, Clone, Copy)]
×
45
#[serde(rename_all = "camelCase")]
46
pub enum InterpolationMethod {
47
    NearestNeighbor,
48
    BiLinear,
49
}
50

51
#[derive(Debug, Snafu)]
×
52
#[snafu(visibility(pub(crate)), context(suffix(false)), module(error))]
53
pub enum InterpolationError {
54
    #[snafu(display(
55
        "The input resolution was defined as `source` but the source resolution is unknown.",
56
    ))]
57
    UnknownInputResolution,
58
}
59

60
pub type Interpolation = Operator<InterpolationParams, SingleRasterSource>;
61

62
impl OperatorName for Interpolation {
63
    const TYPE_NAME: &'static str = "Interpolation";
64
}
65

66
#[typetag::serde]
×
67
#[async_trait]
68
impl RasterOperator for Interpolation {
69
    async fn _initialize(
70
        self: Box<Self>,
71
        path: WorkflowOperatorPath,
72
        context: &dyn ExecutionContext,
73
    ) -> Result<Box<dyn InitializedRasterOperator>> {
3✔
74
        let name = CanonicOperatorName::from(&self);
3✔
75

3✔
76
        let initialized_sources = self.sources.initialize_sources(path, context).await?;
3✔
77
        let raster_source = initialized_sources.raster;
3✔
78
        let in_descriptor = raster_source.result_descriptor();
3✔
79

3✔
80
        ensure!(
3✔
81
            matches!(self.params.input_resolution, InputResolution::Value(_))
3✔
82
                || in_descriptor.resolution.is_some(),
3✔
83
            error::UnknownInputResolution
3✔
84
        );
3✔
85

3✔
86
        let input_resolution = if let InputResolution::Value(res) = self.params.input_resolution {
3✔
87
            res
3✔
88
        } else {
3✔
89
            in_descriptor.resolution.expect("checked in ensure")
3✔
90
        };
3✔
91

3✔
92
        let out_descriptor = RasterResultDescriptor {
3✔
93
            spatial_reference: in_descriptor.spatial_reference,
3✔
94
            data_type: in_descriptor.data_type,
3✔
95
            bbox: in_descriptor.bbox,
3✔
96
            time: in_descriptor.time,
3✔
97
            resolution: None, // after interpolation the resolution is uncapped
3✔
98
            bands: in_descriptor.bands.clone(),
3✔
99
        };
3✔
100

3✔
101
        let initialized_operator = InitializedInterpolation {
3✔
102
            name,
3✔
103
            result_descriptor: out_descriptor,
3✔
104
            raster_source,
3✔
105
            interpolation_method: self.params.interpolation,
3✔
106
            input_resolution,
3✔
107
            tiling_specification: context.tiling_specification(),
3✔
108
        };
3✔
109

3✔
110
        Ok(initialized_operator.boxed())
3✔
111
    }
3✔
112

113
    span_fn!(Interpolation);
114
}
115

116
pub struct InitializedInterpolation {
117
    name: CanonicOperatorName,
118
    result_descriptor: RasterResultDescriptor,
119
    raster_source: Box<dyn InitializedRasterOperator>,
120
    interpolation_method: InterpolationMethod,
121
    input_resolution: SpatialResolution,
122
    tiling_specification: TilingSpecification,
123
}
124

125
impl InitializedRasterOperator for InitializedInterpolation {
126
    fn query_processor(&self) -> Result<TypedRasterQueryProcessor> {
3✔
127
        let source_processor = self.raster_source.query_processor()?;
3✔
128

129
        let res = call_on_generic_raster_processor!(
3✔
130
            source_processor, p => match self.interpolation_method  {
3✔
UNCOV
131
                InterpolationMethod::NearestNeighbor => InterploationProcessor::<_,_, NearestNeighbor>::new(
×
UNCOV
132
                        p,
×
UNCOV
133
                        self.result_descriptor.clone(),
×
UNCOV
134
                        self.input_resolution,
×
UNCOV
135
                        self.tiling_specification,
×
UNCOV
136
                    ).boxed()
×
UNCOV
137
                    .into(),
×
UNCOV
138
                InterpolationMethod::BiLinear =>InterploationProcessor::<_,_, Bilinear>::new(
×
139
                        p,
×
140
                        self.result_descriptor.clone(),
×
141
                        self.input_resolution,
×
142
                        self.tiling_specification,
×
143
                    ).boxed()
×
144
                    .into(),
×
145
            }
146
        );
147

148
        Ok(res)
3✔
149
    }
3✔
150

151
    fn result_descriptor(&self) -> &RasterResultDescriptor {
×
152
        &self.result_descriptor
×
153
    }
×
154

155
    fn canonic_name(&self) -> CanonicOperatorName {
×
156
        self.name.clone()
×
157
    }
×
158
}
159

160
pub struct InterploationProcessor<Q, P, I>
161
where
162
    Q: RasterQueryProcessor<RasterType = P>,
163
    P: Pixel,
164
    I: InterpolationAlgorithm<P>,
165
{
166
    source: Q,
167
    result_descriptor: RasterResultDescriptor,
168
    input_resolution: SpatialResolution,
169
    tiling_specification: TilingSpecification,
170
    interpolation: PhantomData<I>,
171
}
172

173
impl<Q, P, I> InterploationProcessor<Q, P, I>
174
where
175
    Q: RasterQueryProcessor<RasterType = P>,
176
    P: Pixel,
177
    I: InterpolationAlgorithm<P>,
178
{
179
    pub fn new(
3✔
180
        source: Q,
3✔
181
        result_descriptor: RasterResultDescriptor,
3✔
182
        input_resolution: SpatialResolution,
3✔
183
        tiling_specification: TilingSpecification,
3✔
184
    ) -> Self {
3✔
185
        Self {
3✔
186
            source,
3✔
187
            result_descriptor,
3✔
188
            input_resolution,
3✔
189
            tiling_specification,
3✔
190
            interpolation: PhantomData,
3✔
191
        }
3✔
192
    }
3✔
193
}
194

195
#[async_trait]
196
impl<Q, P, I> QueryProcessor for InterploationProcessor<Q, P, I>
197
where
198
    Q: QueryProcessor<
199
        Output = RasterTile2D<P>,
200
        SpatialBounds = SpatialPartition2D,
201
        Selection = BandSelection,
202
        ResultDescription = RasterResultDescriptor,
203
    >,
204
    P: Pixel,
205
    I: InterpolationAlgorithm<P>,
206
{
207
    type Output = RasterTile2D<P>;
208
    type SpatialBounds = SpatialPartition2D;
209
    type Selection = BandSelection;
210
    type ResultDescription = RasterResultDescriptor;
211

212
    async fn _query<'a>(
213
        &'a self,
214
        query: RasterQueryRectangle,
215
        ctx: &'a dyn QueryContext,
216
    ) -> Result<BoxStream<'a, Result<Self::Output>>> {
3✔
217
        // do not interpolate if the source resolution is already fine enough
3✔
218
        if query.spatial_resolution.x >= self.input_resolution.x
3✔
219
            && query.spatial_resolution.y >= self.input_resolution.y
3✔
220
        {
3✔
221
            // TODO: should we use the query or the input resolution here?
3✔
222
            return self.source.query(query, ctx).await;
3✔
223
        }
3✔
224

3✔
225
        let sub_query = InterpolationSubQuery::<_, P, I> {
3✔
226
            input_resolution: self.input_resolution,
3✔
227
            fold_fn: fold_future,
3✔
228
            tiling_specification: self.tiling_specification,
3✔
229
            phantom: PhantomData,
3✔
230
            _phantom_pixel_type: PhantomData,
3✔
231
        };
3✔
232

3✔
233
        Ok(RasterSubQueryAdapter::<'a, P, _, _>::new(
3✔
234
            &self.source,
3✔
235
            query,
3✔
236
            self.tiling_specification,
3✔
237
            ctx,
3✔
238
            sub_query,
3✔
239
        )
3✔
240
        .filter_and_fill(
3✔
241
            crate::adapters::FillerTileCacheExpirationStrategy::DerivedFromSurroundingTiles,
3✔
242
        ))
3✔
243
    }
3✔
244

245
    fn result_descriptor(&self) -> &RasterResultDescriptor {
6✔
246
        &self.result_descriptor
6✔
247
    }
6✔
248
}
249

250
#[derive(Debug, Clone)]
251
pub struct InterpolationSubQuery<F, T, I> {
252
    input_resolution: SpatialResolution,
253
    fold_fn: F,
254
    tiling_specification: TilingSpecification,
255
    phantom: PhantomData<I>,
256
    _phantom_pixel_type: PhantomData<T>,
257
}
258

259
impl<'a, T, FoldM, FoldF, I> SubQueryTileAggregator<'a, T> for InterpolationSubQuery<FoldM, T, I>
260
where
261
    T: Pixel,
262
    FoldM: Send + Sync + 'a + Clone + Fn(InterpolationAccu<T, I>, RasterTile2D<T>) -> FoldF,
263
    FoldF: Send + TryFuture<Ok = InterpolationAccu<T, I>, Error = crate::error::Error>,
264
    I: InterpolationAlgorithm<T>,
265
{
266
    type FoldFuture = FoldF;
267

268
    type FoldMethod = FoldM;
269

270
    type TileAccu = InterpolationAccu<T, I>;
271
    type TileAccuFuture = BoxFuture<'a, Result<Self::TileAccu>>;
272

273
    fn new_fold_accu(
64✔
274
        &self,
64✔
275
        tile_info: TileInformation,
64✔
276
        query_rect: RasterQueryRectangle,
64✔
277
        pool: &Arc<ThreadPool>,
64✔
278
    ) -> Self::TileAccuFuture {
64✔
279
        create_accu(
64✔
280
            tile_info,
64✔
281
            &query_rect,
64✔
282
            pool.clone(),
64✔
283
            self.tiling_specification,
64✔
284
        )
64✔
285
        .boxed()
64✔
286
    }
64✔
287

288
    fn tile_query_rectangle(
64✔
289
        &self,
64✔
290
        tile_info: TileInformation,
64✔
291
        _query_rect: RasterQueryRectangle,
64✔
292
        start_time: TimeInstance,
64✔
293
        band_idx: u32,
64✔
294
    ) -> Result<Option<RasterQueryRectangle>> {
64✔
295
        // enlarge the spatial bounds in order to have the neighbor pixels for the interpolation
64✔
296
        let spatial_bounds = tile_info.spatial_partition();
64✔
297
        let enlarge: Coordinate2D = (self.input_resolution.x, -self.input_resolution.y).into();
64✔
298
        let spatial_bounds = SpatialPartition2D::new(
64✔
299
            spatial_bounds.upper_left(),
64✔
300
            spatial_bounds.lower_right() + enlarge,
64✔
301
        )?;
64✔
302

303
        Ok(Some(RasterQueryRectangle {
304
            spatial_bounds,
64✔
305
            time_interval: TimeInterval::new_instant(start_time)?,
64✔
306
            spatial_resolution: self.input_resolution,
64✔
307
            attributes: band_idx.into(),
64✔
308
        }))
309
    }
64✔
310

311
    fn fold_method(&self) -> Self::FoldMethod {
64✔
312
        self.fold_fn.clone()
64✔
313
    }
64✔
314
}
315

316
#[derive(Clone, Debug)]
317
pub struct InterpolationAccu<T: Pixel, I: InterpolationAlgorithm<T>> {
318
    pub output_info: TileInformation,
319
    pub input_tile: RasterTile2D<T>,
320
    pub pool: Arc<ThreadPool>,
321
    phantom: PhantomData<I>,
322
}
323

324
impl<T: Pixel, I: InterpolationAlgorithm<T>> InterpolationAccu<T, I> {
325
    pub fn new(
208✔
326
        input_tile: RasterTile2D<T>,
208✔
327
        output_info: TileInformation,
208✔
328
        pool: Arc<ThreadPool>,
208✔
329
    ) -> Self {
208✔
330
        InterpolationAccu {
208✔
331
            input_tile,
208✔
332
            output_info,
208✔
333
            pool,
208✔
334
            phantom: Default::default(),
208✔
335
        }
208✔
336
    }
208✔
337
}
338

339
#[async_trait]
340
impl<T: Pixel, I: InterpolationAlgorithm<T>> FoldTileAccu for InterpolationAccu<T, I> {
341
    type RasterType = T;
342

343
    async fn into_tile(self) -> Result<RasterTile2D<Self::RasterType>> {
64✔
344
        // now that we collected all the input tile pixels we perform the actual interpolation
64✔
345

64✔
346
        let output_tile = crate::util::spawn_blocking_with_thread_pool(self.pool, move || {
64✔
347
            I::interpolate(&self.input_tile, &self.output_info)
64✔
348
        })
64✔
349
        .await??;
64✔
350

64✔
351
        Ok(output_tile)
64✔
352
    }
64✔
353

354
    fn thread_pool(&self) -> &Arc<ThreadPool> {
×
355
        &self.pool
×
356
    }
×
357
}
358

359
impl<T: Pixel, I: InterpolationAlgorithm<T>> FoldTileAccuMut for InterpolationAccu<T, I> {
360
    fn tile_mut(&mut self) -> &mut RasterTile2D<T> {
×
361
        &mut self.input_tile
×
362
    }
×
363
}
364

365
pub fn create_accu<T: Pixel, I: InterpolationAlgorithm<T>>(
64✔
366
    tile_info: TileInformation,
64✔
367
    query_rect: &RasterQueryRectangle,
64✔
368
    pool: Arc<ThreadPool>,
64✔
369
    tiling_specification: TilingSpecification,
64✔
370
) -> impl Future<Output = Result<InterpolationAccu<T, I>>> {
64✔
371
    // create an accumulator as a single tile that fits all the input tiles
64✔
372
    let spatial_bounds = query_rect.spatial_bounds;
64✔
373
    let spatial_resolution = query_rect.spatial_resolution;
64✔
374
    let time_interval = query_rect.time_interval;
64✔
375

64✔
376
    crate::util::spawn_blocking(move || {
64✔
377
        let tiling = tiling_specification.strategy(spatial_resolution.x, -spatial_resolution.y);
64✔
378

64✔
379
        let origin_coordinate = tiling
64✔
380
            .tile_information_iterator(spatial_bounds)
64✔
381
            .next()
64✔
382
            .expect("a query contains at least one tile")
64✔
383
            .spatial_partition()
64✔
384
            .upper_left();
64✔
385

64✔
386
        let geo_transform = GeoTransform::new(
64✔
387
            origin_coordinate,
64✔
388
            spatial_resolution.x,
64✔
389
            -spatial_resolution.y,
64✔
390
        );
64✔
391

64✔
392
        let bbox = tiling.tile_grid_box(spatial_bounds);
64✔
393

64✔
394
        let shape = [
64✔
395
            bbox.axis_size_y() * tiling.tile_size_in_pixels.axis_size_y(),
64✔
396
            bbox.axis_size_x() * tiling.tile_size_in_pixels.axis_size_x(),
64✔
397
        ];
64✔
398

64✔
399
        // create a non-aligned (w.r.t. the tiling specification) grid by setting the origin to the top-left of the tile and the tile-index to [0, 0]
64✔
400
        let grid = EmptyGrid2D::new(shape.into());
64✔
401

64✔
402
        let input_tile = RasterTile2D::new(
64✔
403
            time_interval,
64✔
404
            [0, 0].into(),
64✔
405
            0,
64✔
406
            geo_transform,
64✔
407
            GridOrEmpty::from(grid),
64✔
408
            CacheHint::max_duration(),
64✔
409
        );
64✔
410

64✔
411
        InterpolationAccu::new(input_tile, tile_info, pool)
64✔
412
    })
64✔
413
    .map_err(From::from)
64✔
414
}
64✔
415

416
pub fn fold_future<T, I>(
144✔
417
    accu: InterpolationAccu<T, I>,
144✔
418
    tile: RasterTile2D<T>,
144✔
419
) -> impl Future<Output = Result<InterpolationAccu<T, I>>>
144✔
420
where
144✔
421
    T: Pixel,
144✔
422
    I: InterpolationAlgorithm<T>,
144✔
423
{
144✔
424
    crate::util::spawn_blocking(|| fold_impl(accu, tile)).then(|x| async move {
144✔
425
        match x {
144✔
426
            Ok(r) => r,
144✔
427
            Err(e) => Err(e.into()),
144✔
428
        }
144✔
429
    })
144✔
430
}
144✔
431

432
pub fn fold_impl<T, I>(
144✔
433
    mut accu: InterpolationAccu<T, I>,
144✔
434
    tile: RasterTile2D<T>,
144✔
435
) -> Result<InterpolationAccu<T, I>>
144✔
436
where
144✔
437
    T: Pixel,
144✔
438
    I: InterpolationAlgorithm<T>,
144✔
439
{
144✔
440
    // get the time now because it is not known when the accu was created
144✔
441
    accu.input_tile.time = tile.time;
144✔
442

144✔
443
    // TODO: add a skip if both tiles are empty?
144✔
444

144✔
445
    // copy all input tiles into the accu to have all data for interpolation
144✔
446
    let mut accu_input_tile = accu.input_tile.into_materialized_tile();
144✔
447
    accu_input_tile.blit(tile)?;
144✔
448

449
    Ok(InterpolationAccu::new(
144✔
450
        accu_input_tile.into(),
144✔
451
        accu.output_info,
144✔
452
        accu.pool,
144✔
453
    ))
144✔
454
}
144✔
455

456
#[cfg(test)]
457
mod tests {
458
    use super::*;
459
    use futures::StreamExt;
460
    use geoengine_datatypes::{
461
        primitives::{RasterQueryRectangle, SpatialPartition2D, SpatialResolution, TimeInterval},
462
        raster::{
463
            Grid2D, GridOrEmpty, RasterDataType, RasterTile2D, RenameBands, TileInformation,
464
            TilingSpecification,
465
        },
466
        spatial_reference::SpatialReference,
467
        util::test::TestDefault,
468
    };
469

470
    use crate::{
471
        engine::{
472
            MockExecutionContext, MockQueryContext, MultipleRasterSources, RasterBandDescriptors,
473
            RasterOperator, RasterResultDescriptor,
474
        },
475
        mock::{MockRasterSource, MockRasterSourceParams},
476
        processing::{RasterStacker, RasterStackerParams},
477
    };
478

479
    #[tokio::test]
480
    async fn nearest_neighbor_operator() -> Result<()> {
1✔
481
        let exe_ctx = MockExecutionContext::new_with_tiling_spec(TilingSpecification::new(
1✔
482
            (0., 0.).into(),
1✔
483
            [2, 2].into(),
1✔
484
        ));
1✔
485

1✔
486
        let raster = make_raster(CacheHint::max_duration());
1✔
487

1✔
488
        let operator = Interpolation {
1✔
489
            params: InterpolationParams {
1✔
490
                interpolation: InterpolationMethod::NearestNeighbor,
1✔
491
                input_resolution: InputResolution::Value(SpatialResolution::one()),
1✔
492
            },
1✔
493
            sources: SingleRasterSource { raster },
1✔
494
        }
1✔
495
        .boxed()
1✔
496
        .initialize(WorkflowOperatorPath::initialize_root(), &exe_ctx)
1✔
497
        .await?;
1✔
498

1✔
499
        let processor = operator.query_processor()?.get_i8().unwrap();
1✔
500

1✔
501
        let query_rect = RasterQueryRectangle {
1✔
502
            spatial_bounds: SpatialPartition2D::new_unchecked((0., 2.).into(), (4., 0.).into()),
1✔
503
            time_interval: TimeInterval::new_unchecked(0, 20),
1✔
504
            spatial_resolution: SpatialResolution::zero_point_five(),
1✔
505
            attributes: BandSelection::first(),
1✔
506
        };
1✔
507
        let query_ctx = MockQueryContext::test_default();
1✔
508

1✔
509
        let result_stream = processor.query(query_rect, &query_ctx).await?;
1✔
510

1✔
511
        let result: Vec<Result<RasterTile2D<i8>>> = result_stream.collect().await;
64✔
512
        let result = result.into_iter().collect::<Result<Vec<_>>>()?;
1✔
513

1✔
514
        let mut times: Vec<TimeInterval> = vec![TimeInterval::new_unchecked(0, 10); 8];
1✔
515
        times.append(&mut vec![TimeInterval::new_unchecked(10, 20); 8]);
1✔
516

1✔
517
        let data = vec![
1✔
518
            vec![1, 2, 5, 6],
1✔
519
            vec![2, 3, 6, 7],
1✔
520
            vec![3, 4, 7, 8],
1✔
521
            vec![4, 0, 8, 0],
1✔
522
            vec![5, 6, 0, 0],
1✔
523
            vec![6, 7, 0, 0],
1✔
524
            vec![7, 8, 0, 0],
1✔
525
            vec![8, 0, 0, 0],
1✔
526
            vec![8, 7, 4, 3],
1✔
527
            vec![7, 6, 3, 2],
1✔
528
            vec![6, 5, 2, 1],
1✔
529
            vec![5, 0, 1, 0],
1✔
530
            vec![4, 3, 0, 0],
1✔
531
            vec![3, 2, 0, 0],
1✔
532
            vec![2, 1, 0, 0],
1✔
533
            vec![1, 0, 0, 0],
1✔
534
        ];
1✔
535

1✔
536
        let valid = vec![
1✔
537
            vec![true; 4],
1✔
538
            vec![true; 4],
1✔
539
            vec![true; 4],
1✔
540
            vec![true, false, true, false],
1✔
541
            vec![true, true, false, false],
1✔
542
            vec![true, true, false, false],
1✔
543
            vec![true, true, false, false],
1✔
544
            vec![true, false, false, false],
1✔
545
            vec![true; 4],
1✔
546
            vec![true; 4],
1✔
547
            vec![true; 4],
1✔
548
            vec![true, false, true, false],
1✔
549
            vec![true, true, false, false],
1✔
550
            vec![true, true, false, false],
1✔
551
            vec![true, true, false, false],
1✔
552
            vec![true, false, false, false],
1✔
553
        ];
1✔
554

1✔
555
        for (i, tile) in result.into_iter().enumerate() {
16✔
556
            let tile = tile.into_materialized_tile();
16✔
557
            assert_eq!(tile.time, times[i]);
16✔
558
            assert_eq!(tile.grid_array.inner_grid.data, data[i]);
16✔
559
            assert_eq!(tile.grid_array.validity_mask.data, valid[i]);
16✔
560
        }
1✔
561

1✔
562
        Ok(())
1✔
563
    }
1✔
564

565
    fn make_raster(cache_hint: CacheHint) -> Box<dyn RasterOperator> {
4✔
566
        // test raster:
4✔
567
        // [0, 10)
4✔
568
        // || 1 | 2 || 3 | 4 ||
4✔
569
        // || 5 | 6 || 7 | 8 ||
4✔
570
        //
4✔
571
        // [10, 20)
4✔
572
        // || 8 | 7 || 6 | 5 ||
4✔
573
        // || 4 | 3 || 2 | 1 ||
4✔
574
        let raster_tiles = vec![
4✔
575
            RasterTile2D::<i8>::new_with_tile_info(
4✔
576
                TimeInterval::new_unchecked(0, 10),
4✔
577
                TileInformation {
4✔
578
                    global_tile_position: [-1, 0].into(),
4✔
579
                    tile_size_in_pixels: [2, 2].into(),
4✔
580
                    global_geo_transform: TestDefault::test_default(),
4✔
581
                },
4✔
582
                0,
4✔
583
                GridOrEmpty::from(Grid2D::new([2, 2].into(), vec![1, 2, 5, 6]).unwrap()),
4✔
584
                cache_hint,
4✔
585
            ),
4✔
586
            RasterTile2D::new_with_tile_info(
4✔
587
                TimeInterval::new_unchecked(0, 10),
4✔
588
                TileInformation {
4✔
589
                    global_tile_position: [-1, 1].into(),
4✔
590
                    tile_size_in_pixels: [2, 2].into(),
4✔
591
                    global_geo_transform: TestDefault::test_default(),
4✔
592
                },
4✔
593
                0,
4✔
594
                GridOrEmpty::from(Grid2D::new([2, 2].into(), vec![3, 4, 7, 8]).unwrap()),
4✔
595
                cache_hint,
4✔
596
            ),
4✔
597
            RasterTile2D::new_with_tile_info(
4✔
598
                TimeInterval::new_unchecked(10, 20),
4✔
599
                TileInformation {
4✔
600
                    global_tile_position: [-1, 0].into(),
4✔
601
                    tile_size_in_pixels: [2, 2].into(),
4✔
602
                    global_geo_transform: TestDefault::test_default(),
4✔
603
                },
4✔
604
                0,
4✔
605
                GridOrEmpty::from(Grid2D::new([2, 2].into(), vec![8, 7, 4, 3]).unwrap()),
4✔
606
                cache_hint,
4✔
607
            ),
4✔
608
            RasterTile2D::new_with_tile_info(
4✔
609
                TimeInterval::new_unchecked(10, 20),
4✔
610
                TileInformation {
4✔
611
                    global_tile_position: [-1, 1].into(),
4✔
612
                    tile_size_in_pixels: [2, 2].into(),
4✔
613
                    global_geo_transform: TestDefault::test_default(),
4✔
614
                },
4✔
615
                0,
4✔
616
                GridOrEmpty::from(Grid2D::new([2, 2].into(), vec![6, 5, 2, 1]).unwrap()),
4✔
617
                cache_hint,
4✔
618
            ),
4✔
619
        ];
4✔
620

4✔
621
        MockRasterSource {
4✔
622
            params: MockRasterSourceParams {
4✔
623
                data: raster_tiles,
4✔
624
                result_descriptor: RasterResultDescriptor {
4✔
625
                    data_type: RasterDataType::I8,
4✔
626
                    spatial_reference: SpatialReference::epsg_4326().into(),
4✔
627
                    time: None,
4✔
628
                    bbox: None,
4✔
629
                    resolution: None,
4✔
630
                    bands: RasterBandDescriptors::new_single_band(),
4✔
631
                },
4✔
632
            },
4✔
633
        }
4✔
634
        .boxed()
4✔
635
    }
4✔
636

637
    #[tokio::test]
638
    async fn it_attaches_cache_hint() -> Result<()> {
1✔
639
        let exe_ctx = MockExecutionContext::new_with_tiling_spec(TilingSpecification::new(
1✔
640
            (0., 0.).into(),
1✔
641
            [2, 2].into(),
1✔
642
        ));
1✔
643

1✔
644
        let cache_hint = CacheHint::seconds(1234);
1✔
645
        let raster = make_raster(cache_hint);
1✔
646

1✔
647
        let operator = Interpolation {
1✔
648
            params: InterpolationParams {
1✔
649
                interpolation: InterpolationMethod::NearestNeighbor,
1✔
650
                input_resolution: InputResolution::Value(SpatialResolution::one()),
1✔
651
            },
1✔
652
            sources: SingleRasterSource { raster },
1✔
653
        }
1✔
654
        .boxed()
1✔
655
        .initialize(WorkflowOperatorPath::initialize_root(), &exe_ctx)
1✔
656
        .await?;
1✔
657

1✔
658
        let processor = operator.query_processor()?.get_i8().unwrap();
1✔
659

1✔
660
        let query_rect = RasterQueryRectangle {
1✔
661
            spatial_bounds: SpatialPartition2D::new_unchecked((0., 2.).into(), (4., 0.).into()),
1✔
662
            time_interval: TimeInterval::new_unchecked(0, 20),
1✔
663
            spatial_resolution: SpatialResolution::zero_point_five(),
1✔
664
            attributes: BandSelection::first(),
1✔
665
        };
1✔
666
        let query_ctx = MockQueryContext::test_default();
1✔
667

1✔
668
        let result_stream = processor.query(query_rect, &query_ctx).await?;
1✔
669

1✔
670
        let result: Vec<Result<RasterTile2D<i8>>> = result_stream.collect().await;
68✔
671
        let result = result.into_iter().collect::<Result<Vec<_>>>()?;
1✔
672

1✔
673
        for tile in result {
17✔
674
            // dbg!(tile.time, tile.grid_array);
1✔
675
            assert_eq!(tile.cache_hint.expires(), cache_hint.expires());
16✔
676
        }
1✔
677

1✔
678
        Ok(())
1✔
679
    }
1✔
680

681
    #[tokio::test]
682
    #[allow(clippy::too_many_lines)]
683
    async fn it_interpolates_multiple_bands() -> Result<()> {
1✔
684
        let exe_ctx = MockExecutionContext::new_with_tiling_spec(TilingSpecification::new(
1✔
685
            (0., 0.).into(),
1✔
686
            [2, 2].into(),
1✔
687
        ));
1✔
688

1✔
689
        let operator = Interpolation {
1✔
690
            params: InterpolationParams {
1✔
691
                interpolation: InterpolationMethod::NearestNeighbor,
1✔
692
                input_resolution: InputResolution::Value(SpatialResolution::one()),
1✔
693
            },
1✔
694
            sources: SingleRasterSource {
1✔
695
                raster: RasterStacker {
1✔
696
                    params: RasterStackerParams {
1✔
697
                        rename_bands: RenameBands::Default,
1✔
698
                    },
1✔
699
                    sources: MultipleRasterSources {
1✔
700
                        rasters: vec![
1✔
701
                            make_raster(CacheHint::max_duration()),
1✔
702
                            make_raster(CacheHint::max_duration()),
1✔
703
                        ],
1✔
704
                    },
1✔
705
                }
1✔
706
                .boxed(),
1✔
707
            },
1✔
708
        }
1✔
709
        .boxed()
1✔
710
        .initialize(WorkflowOperatorPath::initialize_root(), &exe_ctx)
1✔
711
        .await?;
1✔
712

1✔
713
        let processor = operator.query_processor()?.get_i8().unwrap();
1✔
714

1✔
715
        let query_rect = RasterQueryRectangle {
1✔
716
            spatial_bounds: SpatialPartition2D::new_unchecked((0., 2.).into(), (4., 0.).into()),
1✔
717
            time_interval: TimeInterval::new_unchecked(0, 20),
1✔
718
            spatial_resolution: SpatialResolution::zero_point_five(),
1✔
719
            attributes: [0, 1].try_into().unwrap(),
1✔
720
        };
1✔
721
        let query_ctx = MockQueryContext::test_default();
1✔
722

1✔
723
        let result_stream = processor.query(query_rect, &query_ctx).await?;
1✔
724

1✔
725
        let result: Vec<Result<RasterTile2D<i8>>> = result_stream.collect().await;
136✔
726
        let result = result.into_iter().collect::<Result<Vec<_>>>()?;
1✔
727

1✔
728
        let mut times: Vec<TimeInterval> = vec![TimeInterval::new_unchecked(0, 10); 8];
1✔
729
        times.append(&mut vec![TimeInterval::new_unchecked(10, 20); 8]);
1✔
730

1✔
731
        let times = times
1✔
732
            .clone()
1✔
733
            .into_iter()
1✔
734
            .zip(times)
1✔
735
            .flat_map(|(a, b)| vec![a, b])
16✔
736
            .collect::<Vec<_>>();
1✔
737

1✔
738
        let data = vec![
1✔
739
            vec![1, 2, 5, 6],
1✔
740
            vec![2, 3, 6, 7],
1✔
741
            vec![3, 4, 7, 8],
1✔
742
            vec![4, 0, 8, 0],
1✔
743
            vec![5, 6, 0, 0],
1✔
744
            vec![6, 7, 0, 0],
1✔
745
            vec![7, 8, 0, 0],
1✔
746
            vec![8, 0, 0, 0],
1✔
747
            vec![8, 7, 4, 3],
1✔
748
            vec![7, 6, 3, 2],
1✔
749
            vec![6, 5, 2, 1],
1✔
750
            vec![5, 0, 1, 0],
1✔
751
            vec![4, 3, 0, 0],
1✔
752
            vec![3, 2, 0, 0],
1✔
753
            vec![2, 1, 0, 0],
1✔
754
            vec![1, 0, 0, 0],
1✔
755
        ];
1✔
756
        let data = data
1✔
757
            .clone()
1✔
758
            .into_iter()
1✔
759
            .zip(data)
1✔
760
            .flat_map(|(a, b)| vec![a, b])
16✔
761
            .collect::<Vec<_>>();
1✔
762

1✔
763
        let valid = vec![
1✔
764
            vec![true; 4],
1✔
765
            vec![true; 4],
1✔
766
            vec![true; 4],
1✔
767
            vec![true, false, true, false],
1✔
768
            vec![true, true, false, false],
1✔
769
            vec![true, true, false, false],
1✔
770
            vec![true, true, false, false],
1✔
771
            vec![true, false, false, false],
1✔
772
            vec![true; 4],
1✔
773
            vec![true; 4],
1✔
774
            vec![true; 4],
1✔
775
            vec![true, false, true, false],
1✔
776
            vec![true, true, false, false],
1✔
777
            vec![true, true, false, false],
1✔
778
            vec![true, true, false, false],
1✔
779
            vec![true, false, false, false],
1✔
780
        ];
1✔
781
        let valid = valid
1✔
782
            .clone()
1✔
783
            .into_iter()
1✔
784
            .zip(valid)
1✔
785
            .flat_map(|(a, b)| vec![a, b])
16✔
786
            .collect::<Vec<_>>();
1✔
787

1✔
788
        for (i, tile) in result.into_iter().enumerate() {
32✔
789
            let tile = tile.into_materialized_tile();
32✔
790
            assert_eq!(tile.time, times[i]);
32✔
791
            assert_eq!(tile.grid_array.inner_grid.data, data[i]);
32✔
792
            assert_eq!(tile.grid_array.validity_mask.data, valid[i]);
32✔
793
        }
1✔
794

1✔
795
        Ok(())
1✔
796
    }
1✔
797
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc