• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 9778142714

03 Jul 2024 12:47PM CUT coverage: 90.741% (+0.05%) from 90.687%
9778142714

Pull #970

github

web-flow
Merge 371bbde3c into 9a33a5b13
Pull Request #970: FAIR dataset deletion

1369 of 1424 new or added lines in 9 files covered. (96.14%)

22 existing lines in 12 files now uncovered.

134486 of 148208 relevant lines covered (90.74%)

52199.73 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.62
/services/src/datasets/create_from_workflow.rs
1
use crate::api::model::datatypes::RasterQueryRectangle;
2
use crate::contexts::SessionContext;
3
use crate::datasets::listing::DatasetProvider;
4
use crate::datasets::storage::{DatasetDefinition, DatasetStore, MetaDataDefinition};
5
use crate::datasets::upload::{UploadId, UploadRootPath};
6
use crate::datasets::AddDataset;
7
use crate::error;
8
use crate::tasks::{Task, TaskId, TaskManager, TaskStatusInfo};
9
use crate::workflows::workflow::Workflow;
10
use geoengine_datatypes::error::ErrorSource;
11
use geoengine_datatypes::primitives::TimeInterval;
12
use geoengine_datatypes::spatial_reference::SpatialReference;
13
use geoengine_datatypes::util::Identifier;
14
use geoengine_operators::call_on_generic_raster_processor_gdal_types;
15
use geoengine_operators::engine::{
16
    ExecutionContext, InitializedRasterOperator, RasterResultDescriptor, WorkflowOperatorPath,
17
};
18
use geoengine_operators::source::{
19
    GdalLoadingInfoTemporalSlice, GdalMetaDataList, GdalMetaDataStatic,
20
};
21
use geoengine_operators::util::raster_stream_to_geotiff::{
22
    raster_stream_to_geotiff, GdalCompressionNumThreads, GdalGeoTiffDatasetMetadata,
23
    GdalGeoTiffOptions,
24
};
25
use serde::{Deserialize, Serialize};
26
use snafu::{ensure, ResultExt};
27
use std::path::PathBuf;
28
use std::sync::Arc;
29
use tokio::fs;
30
use utoipa::ToSchema;
31

32
use super::{DatasetIdAndName, DatasetName};
33

34
/// parameter for the dataset from workflow handler (body)
35
#[derive(Clone, Debug, Deserialize, Serialize, ToSchema)]
7✔
36
#[schema(example = json!({"name": "foo", "displayName": "a new dataset", "description": null, "query": {"spatialBounds": {"upperLeftCoordinate": {"x": -10.0, "y": 80.0}, "lowerRightCoordinate": {"x": 50.0, "y": 20.0}}, "timeInterval": {"start": 1_388_534_400_000_i64, "end": 1_388_534_401_000_i64}, "spatialResolution": {"x": 0.1, "y": 0.1}}}))]
37
#[serde(rename_all = "camelCase")]
38
pub struct RasterDatasetFromWorkflow {
39
    pub name: Option<DatasetName>,
40
    pub display_name: String,
41
    pub description: Option<String>,
42
    pub query: RasterQueryRectangle,
43
    #[schema(default = default_as_cog)]
44
    #[serde(default = "default_as_cog")]
45
    pub as_cog: bool,
46
}
47

48
/// By default, we set [`RasterDatasetFromWorkflow::as_cog`] to true to produce cloud-optmized `GeoTiff`s.
49
#[inline]
50
const fn default_as_cog() -> bool {
3✔
51
    true
3✔
52
}
3✔
53

54
/// response of the dataset from workflow handler
55
#[derive(Clone, Debug, Deserialize, Serialize, ToSchema)]
3✔
56
pub struct RasterDatasetFromWorkflowResult {
57
    pub dataset: DatasetName,
58
    pub upload: UploadId,
59
}
60

61
impl TaskStatusInfo for RasterDatasetFromWorkflowResult {}
62

63
pub struct RasterDatasetFromWorkflowTask<C: SessionContext> {
64
    pub source_name: String,
65
    pub workflow: Workflow,
66
    pub ctx: Arc<C>,
67
    pub info: RasterDatasetFromWorkflow,
68
    pub upload: UploadId,
69
    pub file_path: PathBuf,
70
    pub compression_num_threads: GdalCompressionNumThreads,
71
}
72

73
impl<C: SessionContext> RasterDatasetFromWorkflowTask<C> {
74
    async fn process(&self) -> error::Result<RasterDatasetFromWorkflowResult> {
3✔
75
        let operator = self.workflow.operator.clone();
3✔
76

77
        let operator = operator.get_raster().context(crate::error::Operator)?;
3✔
78

79
        let execution_context = self.ctx.execution_context()?;
3✔
80

81
        let workflow_operator_path_root = WorkflowOperatorPath::initialize_root();
3✔
82

83
        let initialized = operator
3✔
84
            .initialize(workflow_operator_path_root, &execution_context)
3✔
85
            .await
6✔
86
            .context(crate::error::Operator)?;
3✔
87

88
        let result_descriptor = initialized.result_descriptor();
3✔
89

90
        let processor = initialized
3✔
91
            .query_processor()
3✔
92
            .context(crate::error::Operator)?;
3✔
93

94
        let query_rect = self.info.query;
3✔
95
        let query_ctx = self.ctx.query_context()?;
3✔
96
        let request_spatial_ref =
3✔
97
            Option::<SpatialReference>::from(result_descriptor.spatial_reference)
3✔
98
                .ok_or(crate::error::Error::MissingSpatialReference)?;
3✔
99
        let tile_limit = None; // TODO: set a reasonable limit or make configurable?
3✔
100

101
        // build the geotiff
102
        let res =
3✔
103
            call_on_generic_raster_processor_gdal_types!(processor, p => raster_stream_to_geotiff(
3✔
104
            &self.file_path,
3✔
105
            p,
3✔
106
            query_rect.into(),
3✔
107
            query_ctx,
3✔
108
            GdalGeoTiffDatasetMetadata {
109
                no_data_value: Default::default(), // TODO: decide how to handle the no data here
3✔
110
                spatial_reference: request_spatial_ref,
3✔
111
            },
112
            GdalGeoTiffOptions {
113
                compression_num_threads: self.compression_num_threads,
3✔
114
                as_cog: self.info.as_cog,
3✔
115
                force_big_tiff: false,
116
            },
117
            tile_limit,
3✔
118
            Box::pin(futures::future::pending()), // datasets shall continue to be built in the background and not cancelled
3✔
119
            execution_context.tiling_specification(),
3✔
120
        ).await)?
24✔
121
            .map_err(crate::error::Error::from)?;
3✔
122

123
        // create the dataset
124
        let dataset = create_dataset(
3✔
125
            self.info.clone(),
3✔
126
            res,
3✔
127
            result_descriptor,
3✔
128
            query_rect,
3✔
129
            self.ctx.as_ref(),
3✔
130
        )
3✔
131
        .await?;
259✔
132

133
        Ok(RasterDatasetFromWorkflowResult {
3✔
134
            dataset: dataset.name,
3✔
135
            upload: self.upload,
3✔
136
        })
3✔
137
    }
3✔
138
}
139

140
#[async_trait::async_trait]
141
impl<C: SessionContext> Task<C::TaskContext> for RasterDatasetFromWorkflowTask<C> {
142
    async fn run(
3✔
143
        &self,
3✔
144
        _ctx: C::TaskContext,
3✔
145
    ) -> error::Result<Box<dyn crate::tasks::TaskStatusInfo>, Box<dyn ErrorSource>> {
3✔
146
        let response = self.process().await;
289✔
147

148
        response
3✔
149
            .map(TaskStatusInfo::boxed)
3✔
150
            .map_err(ErrorSource::boxed)
3✔
151
    }
9✔
152

153
    async fn cleanup_on_error(
×
154
        &self,
×
155
        _ctx: C::TaskContext,
×
156
    ) -> error::Result<(), Box<dyn ErrorSource>> {
×
157
        fs::remove_dir_all(&self.file_path)
×
158
            .await
×
159
            .context(crate::error::Io)
×
160
            .map_err(ErrorSource::boxed)?;
×
161

162
        //TODO: Dataset might already be in the database, if task was already close to finishing.
163

164
        Ok(())
×
165
    }
×
166

167
    fn task_type(&self) -> &'static str {
6✔
168
        "create-dataset"
6✔
169
    }
6✔
170

171
    fn task_unique_id(&self) -> Option<String> {
3✔
172
        Some(self.upload.to_string())
3✔
173
    }
3✔
174

175
    fn task_description(&self) -> String {
3✔
176
        format!(
3✔
177
            "Creating dataset {} from {}",
3✔
178
            self.info.display_name, self.source_name
3✔
179
        )
3✔
180
    }
3✔
181
}
182

183
pub async fn schedule_raster_dataset_from_workflow_task<C: SessionContext>(
3✔
184
    source_name: String,
3✔
185
    workflow: Workflow,
3✔
186
    ctx: Arc<C>,
3✔
187
    info: RasterDatasetFromWorkflow,
3✔
188
    compression_num_threads: GdalCompressionNumThreads,
3✔
189
) -> error::Result<TaskId> {
3✔
190
    if let Some(dataset_name) = &info.name {
3✔
191
        let db = ctx.db();
×
192

193
        // try to resolve the dataset name to an id
194
        let potential_id_result = db.resolve_dataset_name_to_id(dataset_name).await?;
×
195

196
        // handle the case where the dataset name is already taken
197
        if let Some(dataset_id) = potential_id_result {
×
198
            return Err(error::Error::DatasetNameAlreadyExists {
×
199
                dataset_name: dataset_name.to_string(),
×
200
                dataset_id: dataset_id.into(),
×
201
            });
×
202
        }
×
203
    }
3✔
204

205
    let upload = UploadId::new();
3✔
206
    let upload_path = upload.root_path()?;
3✔
207
    fs::create_dir_all(&upload_path)
3✔
UNCOV
208
        .await
×
209
        .context(crate::error::Io)?;
3✔
210
    let file_path = upload_path.clone();
3✔
211

3✔
212
    let task = RasterDatasetFromWorkflowTask {
3✔
213
        source_name,
3✔
214
        workflow,
3✔
215
        ctx: ctx.clone(),
3✔
216
        info,
3✔
217
        upload,
3✔
218
        file_path,
3✔
219
        compression_num_threads,
3✔
220
    }
3✔
221
    .boxed();
3✔
222

223
    let task_id = ctx.tasks().schedule_task(task, None).await?;
3✔
224

225
    Ok(task_id)
3✔
226
}
3✔
227

228
async fn create_dataset<C: SessionContext>(
3✔
229
    info: RasterDatasetFromWorkflow,
3✔
230
    mut slice_info: Vec<GdalLoadingInfoTemporalSlice>,
3✔
231
    origin_result_descriptor: &RasterResultDescriptor,
3✔
232
    query_rectangle: RasterQueryRectangle,
3✔
233
    ctx: &C,
3✔
234
) -> error::Result<DatasetIdAndName> {
3✔
235
    ensure!(!slice_info.is_empty(), error::EmptyDatasetCannotBeImported);
3✔
236

237
    let first_start = slice_info
3✔
238
        .first()
3✔
239
        .expect("slice_info should have at least one element")
3✔
240
        .time
3✔
241
        .start();
3✔
242
    let last_end = slice_info
3✔
243
        .last()
3✔
244
        .expect("slice_info should have at least one element")
3✔
245
        .time
3✔
246
        .end();
3✔
247
    let result_time_interval = TimeInterval::new(first_start, last_end)?;
3✔
248

249
    let result_descriptor = RasterResultDescriptor {
3✔
250
        data_type: origin_result_descriptor.data_type,
3✔
251
        spatial_reference: origin_result_descriptor.spatial_reference,
3✔
252
        time: Some(result_time_interval),
3✔
253
        bbox: Some(query_rectangle.spatial_bounds.into()),
3✔
254
        resolution: Some(query_rectangle.spatial_resolution.into()),
3✔
255
        bands: origin_result_descriptor.bands.clone(),
3✔
256
    };
3✔
257
    //TODO: Recognize MetaDataDefinition::GdalMetaDataRegular
258
    let meta_data = if slice_info.len() == 1 {
3✔
259
        let loading_info_slice = slice_info.pop().expect("slice_info has len one");
1✔
260
        let time = Some(loading_info_slice.time);
1✔
261
        let params = loading_info_slice
1✔
262
            .params
1✔
263
            .expect("datasets with exactly one timestep should have data");
1✔
264
        let cache_ttl = loading_info_slice.cache_ttl;
1✔
265
        MetaDataDefinition::GdalStatic(GdalMetaDataStatic {
1✔
266
            time,
1✔
267
            params,
1✔
268
            result_descriptor,
1✔
269
            cache_ttl,
1✔
270
        })
1✔
271
    } else {
272
        MetaDataDefinition::GdalMetaDataList(GdalMetaDataList {
2✔
273
            result_descriptor,
2✔
274
            params: slice_info,
2✔
275
        })
2✔
276
    };
277

278
    let dataset_definition = DatasetDefinition {
3✔
279
        properties: AddDataset {
3✔
280
            name: info.name,
3✔
281
            display_name: info.display_name,
3✔
282
            description: info.description.unwrap_or_default(),
3✔
283
            source_operator: "GdalSource".to_owned(),
3✔
284
            symbology: None,  // TODO add symbology?
3✔
285
            provenance: None, // TODO add provenance that references the workflow
3✔
286
            tags: Some(vec!["workflow".to_owned()]),
3✔
287
        },
3✔
288
        meta_data,
3✔
289
    };
3✔
290

3✔
291
    let db = ctx.db();
3✔
292
    let result = db
3✔
293
        .add_dataset(dataset_definition.properties, dataset_definition.meta_data)
3✔
294
        .await?;
259✔
295

296
    Ok(result)
3✔
297
}
3✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc