• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 12469296660

23 Dec 2024 03:15PM UTC coverage: 90.56% (-0.1%) from 90.695%
12469296660

push

github

web-flow
Merge pull request #998 from geo-engine/quota_log_wip

Quota and Data usage Logging

859 of 1214 new or added lines in 66 files covered. (70.76%)

3 existing lines in 2 files now uncovered.

133923 of 147883 relevant lines covered (90.56%)

54439.32 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.99
/operators/src/processing/interpolation/mod.rs
1
use std::marker::PhantomData;
2
use std::sync::Arc;
3

4
use crate::adapters::{
5
    FoldTileAccu, FoldTileAccuMut, RasterSubQueryAdapter, SubQueryTileAggregator,
6
};
7
use crate::engine::{
8
    CanonicOperatorName, ExecutionContext, InitializedRasterOperator, InitializedSources, Operator,
9
    OperatorName, QueryContext, QueryProcessor, RasterOperator, RasterQueryProcessor,
10
    RasterResultDescriptor, SingleRasterSource, TypedRasterQueryProcessor, WorkflowOperatorPath,
11
};
12
use crate::util::Result;
13
use async_trait::async_trait;
14
use futures::future::BoxFuture;
15
use futures::stream::BoxStream;
16
use futures::{Future, FutureExt, TryFuture, TryFutureExt};
17
use geoengine_datatypes::primitives::{
18
    AxisAlignedRectangle, Coordinate2D, RasterQueryRectangle, SpatialPartition2D,
19
    SpatialPartitioned, SpatialResolution, TimeInstance, TimeInterval,
20
};
21
use geoengine_datatypes::primitives::{BandSelection, CacheHint};
22
use geoengine_datatypes::raster::{
23
    Bilinear, Blit, EmptyGrid2D, GeoTransform, GridOrEmpty, GridSize, InterpolationAlgorithm,
24
    NearestNeighbor, Pixel, RasterTile2D, TileInformation, TilingSpecification,
25
};
26
use rayon::ThreadPool;
27
use serde::{Deserialize, Serialize};
28
use snafu::{ensure, Snafu};
29

30
#[derive(Debug, Serialize, Deserialize, Clone)]
31
#[serde(rename_all = "camelCase")]
32
pub struct InterpolationParams {
33
    pub interpolation: InterpolationMethod,
34
    pub input_resolution: InputResolution,
35
}
36

37
#[derive(Debug, Serialize, Deserialize, Clone, Copy)]
38
#[serde(rename_all = "camelCase", tag = "type")]
39
pub enum InputResolution {
40
    Value(SpatialResolution),
41
    Source,
42
}
43

44
#[derive(Debug, Serialize, Deserialize, Clone, Copy)]
45
#[serde(rename_all = "camelCase")]
46
pub enum InterpolationMethod {
47
    NearestNeighbor,
48
    BiLinear,
49
}
50

51
#[derive(Debug, Snafu)]
×
52
#[snafu(visibility(pub(crate)), context(suffix(false)), module(error))]
53
pub enum InterpolationError {
54
    #[snafu(display(
55
        "The input resolution was defined as `source` but the source resolution is unknown.",
56
    ))]
57
    UnknownInputResolution,
58
}
59

60
pub type Interpolation = Operator<InterpolationParams, SingleRasterSource>;
61

62
impl OperatorName for Interpolation {
63
    const TYPE_NAME: &'static str = "Interpolation";
64
}
65

66
#[typetag::serde]
×
67
#[async_trait]
68
impl RasterOperator for Interpolation {
69
    async fn _initialize(
70
        self: Box<Self>,
71
        path: WorkflowOperatorPath,
72
        context: &dyn ExecutionContext,
73
    ) -> Result<Box<dyn InitializedRasterOperator>> {
3✔
74
        let name = CanonicOperatorName::from(&self);
3✔
75

76
        let initialized_sources = self
3✔
77
            .sources
3✔
78
            .initialize_sources(path.clone(), context)
3✔
79
            .await?;
3✔
80
        let raster_source = initialized_sources.raster;
3✔
81
        let in_descriptor = raster_source.result_descriptor();
3✔
82

3✔
83
        ensure!(
3✔
84
            matches!(self.params.input_resolution, InputResolution::Value(_))
3✔
85
                || in_descriptor.resolution.is_some(),
×
86
            error::UnknownInputResolution
×
87
        );
88

89
        let input_resolution = if let InputResolution::Value(res) = self.params.input_resolution {
3✔
90
            res
3✔
91
        } else {
92
            in_descriptor.resolution.expect("checked in ensure")
×
93
        };
94

95
        let out_descriptor = RasterResultDescriptor {
3✔
96
            spatial_reference: in_descriptor.spatial_reference,
3✔
97
            data_type: in_descriptor.data_type,
3✔
98
            bbox: in_descriptor.bbox,
3✔
99
            time: in_descriptor.time,
3✔
100
            resolution: None, // after interpolation the resolution is uncapped
3✔
101
            bands: in_descriptor.bands.clone(),
3✔
102
        };
3✔
103

3✔
104
        let initialized_operator = InitializedInterpolation {
3✔
105
            name,
3✔
106
            path,
3✔
107
            result_descriptor: out_descriptor,
3✔
108
            raster_source,
3✔
109
            interpolation_method: self.params.interpolation,
3✔
110
            input_resolution,
3✔
111
            tiling_specification: context.tiling_specification(),
3✔
112
        };
3✔
113

3✔
114
        Ok(initialized_operator.boxed())
3✔
115
    }
6✔
116

117
    span_fn!(Interpolation);
118
}
119

120
pub struct InitializedInterpolation {
121
    name: CanonicOperatorName,
122
    path: WorkflowOperatorPath,
123
    result_descriptor: RasterResultDescriptor,
124
    raster_source: Box<dyn InitializedRasterOperator>,
125
    interpolation_method: InterpolationMethod,
126
    input_resolution: SpatialResolution,
127
    tiling_specification: TilingSpecification,
128
}
129

130
impl InitializedRasterOperator for InitializedInterpolation {
131
    fn query_processor(&self) -> Result<TypedRasterQueryProcessor> {
3✔
132
        let source_processor = self.raster_source.query_processor()?;
3✔
133

134
        let res = call_on_generic_raster_processor!(
3✔
135
            source_processor, p => match self.interpolation_method  {
3✔
136
                InterpolationMethod::NearestNeighbor => InterploationProcessor::<_,_, NearestNeighbor>::new(
×
137
                        p,
×
138
                        self.result_descriptor.clone(),
×
139
                        self.input_resolution,
×
140
                        self.tiling_specification,
×
141
                    ).boxed()
×
142
                    .into(),
×
143
                InterpolationMethod::BiLinear =>InterploationProcessor::<_,_, Bilinear>::new(
×
144
                        p,
×
145
                        self.result_descriptor.clone(),
×
146
                        self.input_resolution,
×
147
                        self.tiling_specification,
×
148
                    ).boxed()
×
149
                    .into(),
×
150
            }
151
        );
152

153
        Ok(res)
3✔
154
    }
3✔
155

156
    fn result_descriptor(&self) -> &RasterResultDescriptor {
×
157
        &self.result_descriptor
×
158
    }
×
159

160
    fn canonic_name(&self) -> CanonicOperatorName {
×
161
        self.name.clone()
×
162
    }
×
163

NEW
164
    fn name(&self) -> &'static str {
×
NEW
165
        Interpolation::TYPE_NAME
×
NEW
166
    }
×
167

NEW
168
    fn path(&self) -> WorkflowOperatorPath {
×
NEW
169
        self.path.clone()
×
NEW
170
    }
×
171
}
172

173
pub struct InterploationProcessor<Q, P, I>
174
where
175
    Q: RasterQueryProcessor<RasterType = P>,
176
    P: Pixel,
177
    I: InterpolationAlgorithm<P>,
178
{
179
    source: Q,
180
    result_descriptor: RasterResultDescriptor,
181
    input_resolution: SpatialResolution,
182
    tiling_specification: TilingSpecification,
183
    interpolation: PhantomData<I>,
184
}
185

186
impl<Q, P, I> InterploationProcessor<Q, P, I>
187
where
188
    Q: RasterQueryProcessor<RasterType = P>,
189
    P: Pixel,
190
    I: InterpolationAlgorithm<P>,
191
{
192
    pub fn new(
3✔
193
        source: Q,
3✔
194
        result_descriptor: RasterResultDescriptor,
3✔
195
        input_resolution: SpatialResolution,
3✔
196
        tiling_specification: TilingSpecification,
3✔
197
    ) -> Self {
3✔
198
        Self {
3✔
199
            source,
3✔
200
            result_descriptor,
3✔
201
            input_resolution,
3✔
202
            tiling_specification,
3✔
203
            interpolation: PhantomData,
3✔
204
        }
3✔
205
    }
3✔
206
}
207

208
#[async_trait]
209
impl<Q, P, I> QueryProcessor for InterploationProcessor<Q, P, I>
210
where
211
    Q: QueryProcessor<
212
        Output = RasterTile2D<P>,
213
        SpatialBounds = SpatialPartition2D,
214
        Selection = BandSelection,
215
        ResultDescription = RasterResultDescriptor,
216
    >,
217
    P: Pixel,
218
    I: InterpolationAlgorithm<P>,
219
{
220
    type Output = RasterTile2D<P>;
221
    type SpatialBounds = SpatialPartition2D;
222
    type Selection = BandSelection;
223
    type ResultDescription = RasterResultDescriptor;
224

225
    async fn _query<'a>(
226
        &'a self,
227
        query: RasterQueryRectangle,
228
        ctx: &'a dyn QueryContext,
229
    ) -> Result<BoxStream<'a, Result<Self::Output>>> {
3✔
230
        // do not interpolate if the source resolution is already fine enough
231
        if query.spatial_resolution.x >= self.input_resolution.x
3✔
232
            && query.spatial_resolution.y >= self.input_resolution.y
×
233
        {
234
            // TODO: should we use the query or the input resolution here?
235
            return self.source.query(query, ctx).await;
×
236
        }
3✔
237

3✔
238
        let sub_query = InterpolationSubQuery::<_, P, I> {
3✔
239
            input_resolution: self.input_resolution,
3✔
240
            fold_fn: fold_future,
3✔
241
            tiling_specification: self.tiling_specification,
3✔
242
            phantom: PhantomData,
3✔
243
            _phantom_pixel_type: PhantomData,
3✔
244
        };
3✔
245

3✔
246
        Ok(RasterSubQueryAdapter::<'a, P, _, _>::new(
3✔
247
            &self.source,
3✔
248
            query,
3✔
249
            self.tiling_specification,
3✔
250
            ctx,
3✔
251
            sub_query,
3✔
252
        )
3✔
253
        .filter_and_fill(
3✔
254
            crate::adapters::FillerTileCacheExpirationStrategy::DerivedFromSurroundingTiles,
3✔
255
        ))
3✔
256
    }
6✔
257

258
    fn result_descriptor(&self) -> &RasterResultDescriptor {
6✔
259
        &self.result_descriptor
6✔
260
    }
6✔
261
}
262

263
#[derive(Debug, Clone)]
264
pub struct InterpolationSubQuery<F, T, I> {
265
    input_resolution: SpatialResolution,
266
    fold_fn: F,
267
    tiling_specification: TilingSpecification,
268
    phantom: PhantomData<I>,
269
    _phantom_pixel_type: PhantomData<T>,
270
}
271

272
impl<'a, T, FoldM, FoldF, I> SubQueryTileAggregator<'a, T> for InterpolationSubQuery<FoldM, T, I>
273
where
274
    T: Pixel,
275
    FoldM: Send + Sync + 'a + Clone + Fn(InterpolationAccu<T, I>, RasterTile2D<T>) -> FoldF,
276
    FoldF: Send + TryFuture<Ok = InterpolationAccu<T, I>, Error = crate::error::Error>,
277
    I: InterpolationAlgorithm<T>,
278
{
279
    type FoldFuture = FoldF;
280

281
    type FoldMethod = FoldM;
282

283
    type TileAccu = InterpolationAccu<T, I>;
284
    type TileAccuFuture = BoxFuture<'a, Result<Self::TileAccu>>;
285

286
    fn new_fold_accu(
64✔
287
        &self,
64✔
288
        tile_info: TileInformation,
64✔
289
        query_rect: RasterQueryRectangle,
64✔
290
        pool: &Arc<ThreadPool>,
64✔
291
    ) -> Self::TileAccuFuture {
64✔
292
        create_accu(
64✔
293
            tile_info,
64✔
294
            &query_rect,
64✔
295
            pool.clone(),
64✔
296
            self.tiling_specification,
64✔
297
        )
64✔
298
        .boxed()
64✔
299
    }
64✔
300

301
    fn tile_query_rectangle(
64✔
302
        &self,
64✔
303
        tile_info: TileInformation,
64✔
304
        _query_rect: RasterQueryRectangle,
64✔
305
        start_time: TimeInstance,
64✔
306
        band_idx: u32,
64✔
307
    ) -> Result<Option<RasterQueryRectangle>> {
64✔
308
        // enlarge the spatial bounds in order to have the neighbor pixels for the interpolation
64✔
309
        let spatial_bounds = tile_info.spatial_partition();
64✔
310
        let enlarge: Coordinate2D = (self.input_resolution.x, -self.input_resolution.y).into();
64✔
311
        let spatial_bounds = SpatialPartition2D::new(
64✔
312
            spatial_bounds.upper_left(),
64✔
313
            spatial_bounds.lower_right() + enlarge,
64✔
314
        )?;
64✔
315

316
        Ok(Some(RasterQueryRectangle {
317
            spatial_bounds,
64✔
318
            time_interval: TimeInterval::new_instant(start_time)?,
64✔
319
            spatial_resolution: self.input_resolution,
64✔
320
            attributes: band_idx.into(),
64✔
321
        }))
322
    }
64✔
323

324
    fn fold_method(&self) -> Self::FoldMethod {
64✔
325
        self.fold_fn.clone()
64✔
326
    }
64✔
327
}
328

329
#[derive(Clone, Debug)]
330
pub struct InterpolationAccu<T: Pixel, I: InterpolationAlgorithm<T>> {
331
    pub output_info: TileInformation,
332
    pub input_tile: RasterTile2D<T>,
333
    pub pool: Arc<ThreadPool>,
334
    phantom: PhantomData<I>,
335
}
336

337
impl<T: Pixel, I: InterpolationAlgorithm<T>> InterpolationAccu<T, I> {
338
    pub fn new(
208✔
339
        input_tile: RasterTile2D<T>,
208✔
340
        output_info: TileInformation,
208✔
341
        pool: Arc<ThreadPool>,
208✔
342
    ) -> Self {
208✔
343
        InterpolationAccu {
208✔
344
            input_tile,
208✔
345
            output_info,
208✔
346
            pool,
208✔
347
            phantom: Default::default(),
208✔
348
        }
208✔
349
    }
208✔
350
}
351

352
#[async_trait]
353
impl<T: Pixel, I: InterpolationAlgorithm<T>> FoldTileAccu for InterpolationAccu<T, I> {
354
    type RasterType = T;
355

356
    async fn into_tile(self) -> Result<RasterTile2D<Self::RasterType>> {
64✔
357
        // now that we collected all the input tile pixels we perform the actual interpolation
358

359
        let output_tile = crate::util::spawn_blocking_with_thread_pool(self.pool, move || {
64✔
360
            I::interpolate(&self.input_tile, &self.output_info)
64✔
361
        })
64✔
362
        .await??;
64✔
363

364
        Ok(output_tile)
64✔
365
    }
128✔
366

367
    fn thread_pool(&self) -> &Arc<ThreadPool> {
×
368
        &self.pool
×
369
    }
×
370
}
371

372
impl<T: Pixel, I: InterpolationAlgorithm<T>> FoldTileAccuMut for InterpolationAccu<T, I> {
373
    fn tile_mut(&mut self) -> &mut RasterTile2D<T> {
×
374
        &mut self.input_tile
×
375
    }
×
376
}
377

378
pub fn create_accu<T: Pixel, I: InterpolationAlgorithm<T>>(
64✔
379
    tile_info: TileInformation,
64✔
380
    query_rect: &RasterQueryRectangle,
64✔
381
    pool: Arc<ThreadPool>,
64✔
382
    tiling_specification: TilingSpecification,
64✔
383
) -> impl Future<Output = Result<InterpolationAccu<T, I>>> {
64✔
384
    // create an accumulator as a single tile that fits all the input tiles
64✔
385
    let spatial_bounds = query_rect.spatial_bounds;
64✔
386
    let spatial_resolution = query_rect.spatial_resolution;
64✔
387
    let time_interval = query_rect.time_interval;
64✔
388

64✔
389
    crate::util::spawn_blocking(move || {
64✔
390
        let tiling = tiling_specification.strategy(spatial_resolution.x, -spatial_resolution.y);
64✔
391

64✔
392
        let origin_coordinate = tiling
64✔
393
            .tile_information_iterator(spatial_bounds)
64✔
394
            .next()
64✔
395
            .expect("a query contains at least one tile")
64✔
396
            .spatial_partition()
64✔
397
            .upper_left();
64✔
398

64✔
399
        let geo_transform = GeoTransform::new(
64✔
400
            origin_coordinate,
64✔
401
            spatial_resolution.x,
64✔
402
            -spatial_resolution.y,
64✔
403
        );
64✔
404

64✔
405
        let bbox = tiling.tile_grid_box(spatial_bounds);
64✔
406

64✔
407
        let shape = [
64✔
408
            bbox.axis_size_y() * tiling.tile_size_in_pixels.axis_size_y(),
64✔
409
            bbox.axis_size_x() * tiling.tile_size_in_pixels.axis_size_x(),
64✔
410
        ];
64✔
411

64✔
412
        // create a non-aligned (w.r.t. the tiling specification) grid by setting the origin to the top-left of the tile and the tile-index to [0, 0]
64✔
413
        let grid = EmptyGrid2D::new(shape.into());
64✔
414

64✔
415
        let input_tile = RasterTile2D::new(
64✔
416
            time_interval,
64✔
417
            [0, 0].into(),
64✔
418
            0,
64✔
419
            geo_transform,
64✔
420
            GridOrEmpty::from(grid),
64✔
421
            CacheHint::max_duration(),
64✔
422
        );
64✔
423

64✔
424
        InterpolationAccu::new(input_tile, tile_info, pool)
64✔
425
    })
64✔
426
    .map_err(From::from)
64✔
427
}
64✔
428

429
pub fn fold_future<T, I>(
144✔
430
    accu: InterpolationAccu<T, I>,
144✔
431
    tile: RasterTile2D<T>,
144✔
432
) -> impl Future<Output = Result<InterpolationAccu<T, I>>>
144✔
433
where
144✔
434
    T: Pixel,
144✔
435
    I: InterpolationAlgorithm<T>,
144✔
436
{
144✔
437
    crate::util::spawn_blocking(|| fold_impl(accu, tile)).then(|x| async move {
144✔
438
        match x {
144✔
439
            Ok(r) => r,
144✔
440
            Err(e) => Err(e.into()),
×
441
        }
442
    })
288✔
443
}
144✔
444

445
pub fn fold_impl<T, I>(
144✔
446
    mut accu: InterpolationAccu<T, I>,
144✔
447
    tile: RasterTile2D<T>,
144✔
448
) -> Result<InterpolationAccu<T, I>>
144✔
449
where
144✔
450
    T: Pixel,
144✔
451
    I: InterpolationAlgorithm<T>,
144✔
452
{
144✔
453
    // get the time now because it is not known when the accu was created
144✔
454
    accu.input_tile.time = tile.time;
144✔
455

144✔
456
    // TODO: add a skip if both tiles are empty?
144✔
457

144✔
458
    // copy all input tiles into the accu to have all data for interpolation
144✔
459
    let mut accu_input_tile = accu.input_tile.into_materialized_tile();
144✔
460
    accu_input_tile.blit(tile)?;
144✔
461

462
    Ok(InterpolationAccu::new(
144✔
463
        accu_input_tile.into(),
144✔
464
        accu.output_info,
144✔
465
        accu.pool,
144✔
466
    ))
144✔
467
}
144✔
468

469
#[cfg(test)]
470
mod tests {
471
    use super::*;
472
    use futures::StreamExt;
473
    use geoengine_datatypes::{
474
        primitives::{RasterQueryRectangle, SpatialPartition2D, SpatialResolution, TimeInterval},
475
        raster::{
476
            Grid2D, GridOrEmpty, RasterDataType, RasterTile2D, RenameBands, TileInformation,
477
            TilingSpecification,
478
        },
479
        spatial_reference::SpatialReference,
480
        util::test::TestDefault,
481
    };
482

483
    use crate::{
484
        engine::{
485
            MockExecutionContext, MockQueryContext, MultipleRasterSources, RasterBandDescriptors,
486
            RasterOperator, RasterResultDescriptor,
487
        },
488
        mock::{MockRasterSource, MockRasterSourceParams},
489
        processing::{RasterStacker, RasterStackerParams},
490
    };
491

492
    #[tokio::test]
493
    async fn nearest_neighbor_operator() -> Result<()> {
1✔
494
        let exe_ctx = MockExecutionContext::new_with_tiling_spec(TilingSpecification::new(
1✔
495
            (0., 0.).into(),
1✔
496
            [2, 2].into(),
1✔
497
        ));
1✔
498

1✔
499
        let raster = make_raster(CacheHint::max_duration());
1✔
500

1✔
501
        let operator = Interpolation {
1✔
502
            params: InterpolationParams {
1✔
503
                interpolation: InterpolationMethod::NearestNeighbor,
1✔
504
                input_resolution: InputResolution::Value(SpatialResolution::one()),
1✔
505
            },
1✔
506
            sources: SingleRasterSource { raster },
1✔
507
        }
1✔
508
        .boxed()
1✔
509
        .initialize(WorkflowOperatorPath::initialize_root(), &exe_ctx)
1✔
510
        .await?;
1✔
511

1✔
512
        let processor = operator.query_processor()?.get_i8().unwrap();
1✔
513

1✔
514
        let query_rect = RasterQueryRectangle {
1✔
515
            spatial_bounds: SpatialPartition2D::new_unchecked((0., 2.).into(), (4., 0.).into()),
1✔
516
            time_interval: TimeInterval::new_unchecked(0, 20),
1✔
517
            spatial_resolution: SpatialResolution::zero_point_five(),
1✔
518
            attributes: BandSelection::first(),
1✔
519
        };
1✔
520
        let query_ctx = MockQueryContext::test_default();
1✔
521

1✔
522
        let result_stream = processor.query(query_rect, &query_ctx).await?;
1✔
523

1✔
524
        let result: Vec<Result<RasterTile2D<i8>>> = result_stream.collect().await;
1✔
525
        let result = result.into_iter().collect::<Result<Vec<_>>>()?;
1✔
526

1✔
527
        let mut times: Vec<TimeInterval> = vec![TimeInterval::new_unchecked(0, 10); 8];
1✔
528
        times.append(&mut vec![TimeInterval::new_unchecked(10, 20); 8]);
1✔
529

1✔
530
        let data = vec![
1✔
531
            vec![1, 2, 5, 6],
1✔
532
            vec![2, 3, 6, 7],
1✔
533
            vec![3, 4, 7, 8],
1✔
534
            vec![4, 0, 8, 0],
1✔
535
            vec![5, 6, 0, 0],
1✔
536
            vec![6, 7, 0, 0],
1✔
537
            vec![7, 8, 0, 0],
1✔
538
            vec![8, 0, 0, 0],
1✔
539
            vec![8, 7, 4, 3],
1✔
540
            vec![7, 6, 3, 2],
1✔
541
            vec![6, 5, 2, 1],
1✔
542
            vec![5, 0, 1, 0],
1✔
543
            vec![4, 3, 0, 0],
1✔
544
            vec![3, 2, 0, 0],
1✔
545
            vec![2, 1, 0, 0],
1✔
546
            vec![1, 0, 0, 0],
1✔
547
        ];
1✔
548

1✔
549
        let valid = vec![
1✔
550
            vec![true; 4],
1✔
551
            vec![true; 4],
1✔
552
            vec![true; 4],
1✔
553
            vec![true, false, true, false],
1✔
554
            vec![true, true, false, false],
1✔
555
            vec![true, true, false, false],
1✔
556
            vec![true, true, false, false],
1✔
557
            vec![true, false, false, false],
1✔
558
            vec![true; 4],
1✔
559
            vec![true; 4],
1✔
560
            vec![true; 4],
1✔
561
            vec![true, false, true, false],
1✔
562
            vec![true, true, false, false],
1✔
563
            vec![true, true, false, false],
1✔
564
            vec![true, true, false, false],
1✔
565
            vec![true, false, false, false],
1✔
566
        ];
1✔
567

1✔
568
        for (i, tile) in result.into_iter().enumerate() {
16✔
569
            let tile = tile.into_materialized_tile();
16✔
570
            assert_eq!(tile.time, times[i]);
16✔
571
            assert_eq!(tile.grid_array.inner_grid.data, data[i]);
16✔
572
            assert_eq!(tile.grid_array.validity_mask.data, valid[i]);
16✔
573
        }
1✔
574

1✔
575
        Ok(())
1✔
576
    }
1✔
577

578
    fn make_raster(cache_hint: CacheHint) -> Box<dyn RasterOperator> {
4✔
579
        // test raster:
4✔
580
        // [0, 10)
4✔
581
        // || 1 | 2 || 3 | 4 ||
4✔
582
        // || 5 | 6 || 7 | 8 ||
4✔
583
        //
4✔
584
        // [10, 20)
4✔
585
        // || 8 | 7 || 6 | 5 ||
4✔
586
        // || 4 | 3 || 2 | 1 ||
4✔
587
        let raster_tiles = vec![
4✔
588
            RasterTile2D::<i8>::new_with_tile_info(
4✔
589
                TimeInterval::new_unchecked(0, 10),
4✔
590
                TileInformation {
4✔
591
                    global_tile_position: [-1, 0].into(),
4✔
592
                    tile_size_in_pixels: [2, 2].into(),
4✔
593
                    global_geo_transform: TestDefault::test_default(),
4✔
594
                },
4✔
595
                0,
4✔
596
                GridOrEmpty::from(Grid2D::new([2, 2].into(), vec![1, 2, 5, 6]).unwrap()),
4✔
597
                cache_hint,
4✔
598
            ),
4✔
599
            RasterTile2D::new_with_tile_info(
4✔
600
                TimeInterval::new_unchecked(0, 10),
4✔
601
                TileInformation {
4✔
602
                    global_tile_position: [-1, 1].into(),
4✔
603
                    tile_size_in_pixels: [2, 2].into(),
4✔
604
                    global_geo_transform: TestDefault::test_default(),
4✔
605
                },
4✔
606
                0,
4✔
607
                GridOrEmpty::from(Grid2D::new([2, 2].into(), vec![3, 4, 7, 8]).unwrap()),
4✔
608
                cache_hint,
4✔
609
            ),
4✔
610
            RasterTile2D::new_with_tile_info(
4✔
611
                TimeInterval::new_unchecked(10, 20),
4✔
612
                TileInformation {
4✔
613
                    global_tile_position: [-1, 0].into(),
4✔
614
                    tile_size_in_pixels: [2, 2].into(),
4✔
615
                    global_geo_transform: TestDefault::test_default(),
4✔
616
                },
4✔
617
                0,
4✔
618
                GridOrEmpty::from(Grid2D::new([2, 2].into(), vec![8, 7, 4, 3]).unwrap()),
4✔
619
                cache_hint,
4✔
620
            ),
4✔
621
            RasterTile2D::new_with_tile_info(
4✔
622
                TimeInterval::new_unchecked(10, 20),
4✔
623
                TileInformation {
4✔
624
                    global_tile_position: [-1, 1].into(),
4✔
625
                    tile_size_in_pixels: [2, 2].into(),
4✔
626
                    global_geo_transform: TestDefault::test_default(),
4✔
627
                },
4✔
628
                0,
4✔
629
                GridOrEmpty::from(Grid2D::new([2, 2].into(), vec![6, 5, 2, 1]).unwrap()),
4✔
630
                cache_hint,
4✔
631
            ),
4✔
632
        ];
4✔
633

4✔
634
        MockRasterSource {
4✔
635
            params: MockRasterSourceParams {
4✔
636
                data: raster_tiles,
4✔
637
                result_descriptor: RasterResultDescriptor {
4✔
638
                    data_type: RasterDataType::I8,
4✔
639
                    spatial_reference: SpatialReference::epsg_4326().into(),
4✔
640
                    time: None,
4✔
641
                    bbox: None,
4✔
642
                    resolution: None,
4✔
643
                    bands: RasterBandDescriptors::new_single_band(),
4✔
644
                },
4✔
645
            },
4✔
646
        }
4✔
647
        .boxed()
4✔
648
    }
4✔
649

650
    #[tokio::test]
651
    async fn it_attaches_cache_hint() -> Result<()> {
1✔
652
        let exe_ctx = MockExecutionContext::new_with_tiling_spec(TilingSpecification::new(
1✔
653
            (0., 0.).into(),
1✔
654
            [2, 2].into(),
1✔
655
        ));
1✔
656

1✔
657
        let cache_hint = CacheHint::seconds(1234);
1✔
658
        let raster = make_raster(cache_hint);
1✔
659

1✔
660
        let operator = Interpolation {
1✔
661
            params: InterpolationParams {
1✔
662
                interpolation: InterpolationMethod::NearestNeighbor,
1✔
663
                input_resolution: InputResolution::Value(SpatialResolution::one()),
1✔
664
            },
1✔
665
            sources: SingleRasterSource { raster },
1✔
666
        }
1✔
667
        .boxed()
1✔
668
        .initialize(WorkflowOperatorPath::initialize_root(), &exe_ctx)
1✔
669
        .await?;
1✔
670

1✔
671
        let processor = operator.query_processor()?.get_i8().unwrap();
1✔
672

1✔
673
        let query_rect = RasterQueryRectangle {
1✔
674
            spatial_bounds: SpatialPartition2D::new_unchecked((0., 2.).into(), (4., 0.).into()),
1✔
675
            time_interval: TimeInterval::new_unchecked(0, 20),
1✔
676
            spatial_resolution: SpatialResolution::zero_point_five(),
1✔
677
            attributes: BandSelection::first(),
1✔
678
        };
1✔
679
        let query_ctx = MockQueryContext::test_default();
1✔
680

1✔
681
        let result_stream = processor.query(query_rect, &query_ctx).await?;
1✔
682

1✔
683
        let result: Vec<Result<RasterTile2D<i8>>> = result_stream.collect().await;
1✔
684
        let result = result.into_iter().collect::<Result<Vec<_>>>()?;
1✔
685

1✔
686
        for tile in result {
17✔
687
            // dbg!(tile.time, tile.grid_array);
1✔
688
            assert_eq!(tile.cache_hint.expires(), cache_hint.expires());
16✔
689
        }
1✔
690

1✔
691
        Ok(())
1✔
692
    }
1✔
693

694
    #[tokio::test]
695
    #[allow(clippy::too_many_lines)]
696
    async fn it_interpolates_multiple_bands() -> Result<()> {
1✔
697
        let exe_ctx = MockExecutionContext::new_with_tiling_spec(TilingSpecification::new(
1✔
698
            (0., 0.).into(),
1✔
699
            [2, 2].into(),
1✔
700
        ));
1✔
701

1✔
702
        let operator = Interpolation {
1✔
703
            params: InterpolationParams {
1✔
704
                interpolation: InterpolationMethod::NearestNeighbor,
1✔
705
                input_resolution: InputResolution::Value(SpatialResolution::one()),
1✔
706
            },
1✔
707
            sources: SingleRasterSource {
1✔
708
                raster: RasterStacker {
1✔
709
                    params: RasterStackerParams {
1✔
710
                        rename_bands: RenameBands::Default,
1✔
711
                    },
1✔
712
                    sources: MultipleRasterSources {
1✔
713
                        rasters: vec![
1✔
714
                            make_raster(CacheHint::max_duration()),
1✔
715
                            make_raster(CacheHint::max_duration()),
1✔
716
                        ],
1✔
717
                    },
1✔
718
                }
1✔
719
                .boxed(),
1✔
720
            },
1✔
721
        }
1✔
722
        .boxed()
1✔
723
        .initialize(WorkflowOperatorPath::initialize_root(), &exe_ctx)
1✔
724
        .await?;
1✔
725

1✔
726
        let processor = operator.query_processor()?.get_i8().unwrap();
1✔
727

1✔
728
        let query_rect = RasterQueryRectangle {
1✔
729
            spatial_bounds: SpatialPartition2D::new_unchecked((0., 2.).into(), (4., 0.).into()),
1✔
730
            time_interval: TimeInterval::new_unchecked(0, 20),
1✔
731
            spatial_resolution: SpatialResolution::zero_point_five(),
1✔
732
            attributes: [0, 1].try_into().unwrap(),
1✔
733
        };
1✔
734
        let query_ctx = MockQueryContext::test_default();
1✔
735

1✔
736
        let result_stream = processor.query(query_rect, &query_ctx).await?;
1✔
737

1✔
738
        let result: Vec<Result<RasterTile2D<i8>>> = result_stream.collect().await;
1✔
739
        let result = result.into_iter().collect::<Result<Vec<_>>>()?;
1✔
740

1✔
741
        let mut times: Vec<TimeInterval> = vec![TimeInterval::new_unchecked(0, 10); 8];
1✔
742
        times.append(&mut vec![TimeInterval::new_unchecked(10, 20); 8]);
1✔
743

1✔
744
        let times = times
1✔
745
            .clone()
1✔
746
            .into_iter()
1✔
747
            .zip(times)
1✔
748
            .flat_map(|(a, b)| vec![a, b])
16✔
749
            .collect::<Vec<_>>();
1✔
750

1✔
751
        let data = vec![
1✔
752
            vec![1, 2, 5, 6],
1✔
753
            vec![2, 3, 6, 7],
1✔
754
            vec![3, 4, 7, 8],
1✔
755
            vec![4, 0, 8, 0],
1✔
756
            vec![5, 6, 0, 0],
1✔
757
            vec![6, 7, 0, 0],
1✔
758
            vec![7, 8, 0, 0],
1✔
759
            vec![8, 0, 0, 0],
1✔
760
            vec![8, 7, 4, 3],
1✔
761
            vec![7, 6, 3, 2],
1✔
762
            vec![6, 5, 2, 1],
1✔
763
            vec![5, 0, 1, 0],
1✔
764
            vec![4, 3, 0, 0],
1✔
765
            vec![3, 2, 0, 0],
1✔
766
            vec![2, 1, 0, 0],
1✔
767
            vec![1, 0, 0, 0],
1✔
768
        ];
1✔
769
        let data = data
1✔
770
            .clone()
1✔
771
            .into_iter()
1✔
772
            .zip(data)
1✔
773
            .flat_map(|(a, b)| vec![a, b])
16✔
774
            .collect::<Vec<_>>();
1✔
775

1✔
776
        let valid = vec![
1✔
777
            vec![true; 4],
1✔
778
            vec![true; 4],
1✔
779
            vec![true; 4],
1✔
780
            vec![true, false, true, false],
1✔
781
            vec![true, true, false, false],
1✔
782
            vec![true, true, false, false],
1✔
783
            vec![true, true, false, false],
1✔
784
            vec![true, false, false, false],
1✔
785
            vec![true; 4],
1✔
786
            vec![true; 4],
1✔
787
            vec![true; 4],
1✔
788
            vec![true, false, true, false],
1✔
789
            vec![true, true, false, false],
1✔
790
            vec![true, true, false, false],
1✔
791
            vec![true, true, false, false],
1✔
792
            vec![true, false, false, false],
1✔
793
        ];
1✔
794
        let valid = valid
1✔
795
            .clone()
1✔
796
            .into_iter()
1✔
797
            .zip(valid)
1✔
798
            .flat_map(|(a, b)| vec![a, b])
16✔
799
            .collect::<Vec<_>>();
1✔
800

1✔
801
        for (i, tile) in result.into_iter().enumerate() {
32✔
802
            let tile = tile.into_materialized_tile();
32✔
803
            assert_eq!(tile.time, times[i]);
32✔
804
            assert_eq!(tile.grid_array.inner_grid.data, data[i]);
32✔
805
            assert_eq!(tile.grid_array.validity_mask.data, valid[i]);
32✔
806
        }
1✔
807

1✔
808
        Ok(())
1✔
809
    }
1✔
810
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc