• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 11553615378

28 Oct 2024 11:42AM UTC coverage: 91.032% (+0.01%) from 91.021%
11553615378

push

github

web-flow
Merge pull request #988 from geo-engine/ogr-dataset

ajudst ogr time type to json format

105 of 108 new or added lines in 7 files covered. (97.22%)

14 existing lines in 8 files now uncovered.

133813 of 146996 relevant lines covered (91.03%)

52474.12 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.62
/services/src/datasets/create_from_workflow.rs
1
use crate::api::model::datatypes::RasterQueryRectangle;
2
use crate::contexts::SessionContext;
3
use crate::datasets::listing::DatasetProvider;
4
use crate::datasets::storage::{DatasetDefinition, DatasetStore, MetaDataDefinition};
5
use crate::datasets::upload::{UploadId, UploadRootPath};
6
use crate::datasets::AddDataset;
7
use crate::error;
8
use crate::tasks::{Task, TaskId, TaskManager, TaskStatusInfo};
9
use crate::workflows::workflow::Workflow;
10
use geoengine_datatypes::error::ErrorSource;
11
use geoengine_datatypes::primitives::TimeInterval;
12
use geoengine_datatypes::spatial_reference::SpatialReference;
13
use geoengine_datatypes::util::Identifier;
14
use geoengine_operators::call_on_generic_raster_processor_gdal_types;
15
use geoengine_operators::engine::{
16
    ExecutionContext, InitializedRasterOperator, RasterResultDescriptor, WorkflowOperatorPath,
17
};
18
use geoengine_operators::source::{
19
    GdalLoadingInfoTemporalSlice, GdalMetaDataList, GdalMetaDataStatic,
20
};
21
use geoengine_operators::util::raster_stream_to_geotiff::{
22
    raster_stream_to_geotiff, GdalCompressionNumThreads, GdalGeoTiffDatasetMetadata,
23
    GdalGeoTiffOptions,
24
};
25
use serde::{Deserialize, Serialize};
26
use snafu::{ensure, ResultExt};
27
use std::path::PathBuf;
28
use std::sync::Arc;
29
use tokio::fs;
30
use utoipa::ToSchema;
31

32
use super::{DatasetIdAndName, DatasetName};
33

34
/// parameter for the dataset from workflow handler (body)
35
#[derive(Clone, Debug, Deserialize, Serialize, ToSchema)]
4✔
36
#[schema(example = json!({"name": "foo", "displayName": "a new dataset", "description": null, "query": {"spatialBounds": {"upperLeftCoordinate": {"x": -10.0, "y": 80.0}, "lowerRightCoordinate": {"x": 50.0, "y": 20.0}}, "timeInterval": {"start": 1_388_534_400_000_i64, "end": 1_388_534_401_000_i64}, "spatialResolution": {"x": 0.1, "y": 0.1}}}))]
37
#[serde(rename_all = "camelCase")]
38
pub struct RasterDatasetFromWorkflow {
39
    pub name: Option<DatasetName>,
40
    pub display_name: String,
41
    pub description: Option<String>,
42
    pub query: RasterQueryRectangle,
43
    #[schema(default = default_as_cog)]
44
    #[serde(default = "default_as_cog")]
45
    pub as_cog: bool,
46
}
47

48
/// By default, we set [`RasterDatasetFromWorkflow::as_cog`] to true to produce cloud-optmized `GeoTiff`s.
49
#[inline]
50
const fn default_as_cog() -> bool {
3✔
51
    true
3✔
52
}
3✔
53

54
/// response of the dataset from workflow handler
55
#[derive(Clone, Debug, Deserialize, Serialize, ToSchema)]
2✔
56
pub struct RasterDatasetFromWorkflowResult {
57
    pub dataset: DatasetName,
58
    pub upload: UploadId,
59
}
60

61
impl TaskStatusInfo for RasterDatasetFromWorkflowResult {}
62

63
pub struct RasterDatasetFromWorkflowTask<C: SessionContext> {
64
    pub source_name: String,
65
    pub workflow: Workflow,
66
    pub ctx: Arc<C>,
67
    pub info: RasterDatasetFromWorkflow,
68
    pub upload: UploadId,
69
    pub file_path: PathBuf,
70
    pub compression_num_threads: GdalCompressionNumThreads,
71
}
72

73
impl<C: SessionContext> RasterDatasetFromWorkflowTask<C> {
74
    async fn process(&self) -> error::Result<RasterDatasetFromWorkflowResult> {
3✔
75
        let operator = self.workflow.operator.clone();
3✔
76

77
        let operator = operator.get_raster()?;
3✔
78

79
        let execution_context = self.ctx.execution_context()?;
3✔
80

81
        let workflow_operator_path_root = WorkflowOperatorPath::initialize_root();
3✔
82

83
        let initialized = operator
3✔
84
            .initialize(workflow_operator_path_root, &execution_context)
3✔
85
            .await?;
6✔
86

87
        let result_descriptor = initialized.result_descriptor();
3✔
88

89
        let processor = initialized.query_processor()?;
3✔
90

91
        let query_rect = self.info.query;
3✔
92
        let query_ctx = self.ctx.query_context()?;
3✔
93
        let request_spatial_ref =
3✔
94
            Option::<SpatialReference>::from(result_descriptor.spatial_reference)
3✔
95
                .ok_or(crate::error::Error::MissingSpatialReference)?;
3✔
96
        let tile_limit = None; // TODO: set a reasonable limit or make configurable?
3✔
97

98
        // build the geotiff
99
        let res =
3✔
100
            call_on_generic_raster_processor_gdal_types!(processor, p => raster_stream_to_geotiff(
3✔
101
            &self.file_path,
3✔
102
            p,
3✔
103
            query_rect.into(),
3✔
104
            query_ctx,
3✔
105
            GdalGeoTiffDatasetMetadata {
3✔
106
                no_data_value: Default::default(), // TODO: decide how to handle the no data here
3✔
107
                spatial_reference: request_spatial_ref,
3✔
108
            },
3✔
109
            GdalGeoTiffOptions {
3✔
110
                compression_num_threads: self.compression_num_threads,
3✔
111
                as_cog: self.info.as_cog,
3✔
112
                force_big_tiff: false,
3✔
113
            },
3✔
114
            tile_limit,
3✔
115
            Box::pin(futures::future::pending()), // datasets shall continue to be built in the background and not cancelled
3✔
116
            execution_context.tiling_specification(),
3✔
117
        ).await)?
28✔
118
            .map_err(crate::error::Error::from)?;
3✔
119

120
        // create the dataset
121
        let dataset = create_dataset(
3✔
122
            self.info.clone(),
3✔
123
            res,
3✔
124
            result_descriptor,
3✔
125
            query_rect,
3✔
126
            self.ctx.as_ref(),
3✔
127
        )
3✔
128
        .await?;
259✔
129

130
        Ok(RasterDatasetFromWorkflowResult {
3✔
131
            dataset: dataset.name,
3✔
132
            upload: self.upload,
3✔
133
        })
3✔
134
    }
3✔
135
}
136

137
#[async_trait::async_trait]
138
impl<C: SessionContext> Task<C::TaskContext> for RasterDatasetFromWorkflowTask<C> {
139
    async fn run(
140
        &self,
141
        _ctx: C::TaskContext,
142
    ) -> error::Result<Box<dyn crate::tasks::TaskStatusInfo>, Box<dyn ErrorSource>> {
3✔
143
        let response = self.process().await;
293✔
144

3✔
145
        response
3✔
146
            .map(TaskStatusInfo::boxed)
3✔
147
            .map_err(ErrorSource::boxed)
3✔
148
    }
3✔
149

150
    async fn cleanup_on_error(
151
        &self,
152
        _ctx: C::TaskContext,
153
    ) -> error::Result<(), Box<dyn ErrorSource>> {
×
154
        fs::remove_dir_all(&self.file_path)
×
155
            .await
×
156
            .context(crate::error::Io)
×
157
            .map_err(ErrorSource::boxed)?;
×
158

×
159
        //TODO: Dataset might already be in the database, if task was already close to finishing.
×
160

×
161
        Ok(())
×
162
    }
×
163

164
    fn task_type(&self) -> &'static str {
6✔
165
        "create-dataset"
6✔
166
    }
6✔
167

168
    fn task_unique_id(&self) -> Option<String> {
3✔
169
        Some(self.upload.to_string())
3✔
170
    }
3✔
171

172
    fn task_description(&self) -> String {
3✔
173
        format!(
3✔
174
            "Creating dataset {} from {}",
3✔
175
            self.info.display_name, self.source_name
3✔
176
        )
3✔
177
    }
3✔
178
}
179

180
pub async fn schedule_raster_dataset_from_workflow_task<C: SessionContext>(
3✔
181
    source_name: String,
3✔
182
    workflow: Workflow,
3✔
183
    ctx: Arc<C>,
3✔
184
    info: RasterDatasetFromWorkflow,
3✔
185
    compression_num_threads: GdalCompressionNumThreads,
3✔
186
) -> error::Result<TaskId> {
3✔
187
    if let Some(dataset_name) = &info.name {
3✔
188
        let db = ctx.db();
×
189

190
        // try to resolve the dataset name to an id
191
        let potential_id_result = db.resolve_dataset_name_to_id(dataset_name).await?;
×
192

193
        // handle the case where the dataset name is already taken
194
        if let Some(dataset_id) = potential_id_result {
×
195
            return Err(error::Error::DatasetNameAlreadyExists {
×
196
                dataset_name: dataset_name.to_string(),
×
197
                dataset_id: dataset_id.into(),
×
198
            });
×
199
        }
×
200
    }
3✔
201

202
    let upload = UploadId::new();
3✔
203
    let upload_path = upload.root_path()?;
3✔
204
    fs::create_dir_all(&upload_path)
3✔
UNCOV
205
        .await
×
206
        .context(crate::error::Io)?;
3✔
207
    let file_path = upload_path.clone();
3✔
208

3✔
209
    let task = RasterDatasetFromWorkflowTask {
3✔
210
        source_name,
3✔
211
        workflow,
3✔
212
        ctx: ctx.clone(),
3✔
213
        info,
3✔
214
        upload,
3✔
215
        file_path,
3✔
216
        compression_num_threads,
3✔
217
    }
3✔
218
    .boxed();
3✔
219

220
    let task_id = ctx.tasks().schedule_task(task, None).await?;
3✔
221

222
    Ok(task_id)
3✔
223
}
3✔
224

225
async fn create_dataset<C: SessionContext>(
3✔
226
    info: RasterDatasetFromWorkflow,
3✔
227
    mut slice_info: Vec<GdalLoadingInfoTemporalSlice>,
3✔
228
    origin_result_descriptor: &RasterResultDescriptor,
3✔
229
    query_rectangle: RasterQueryRectangle,
3✔
230
    ctx: &C,
3✔
231
) -> error::Result<DatasetIdAndName> {
3✔
232
    ensure!(!slice_info.is_empty(), error::EmptyDatasetCannotBeImported);
3✔
233

234
    let first_start = slice_info
3✔
235
        .first()
3✔
236
        .expect("slice_info should have at least one element")
3✔
237
        .time
3✔
238
        .start();
3✔
239
    let last_end = slice_info
3✔
240
        .last()
3✔
241
        .expect("slice_info should have at least one element")
3✔
242
        .time
3✔
243
        .end();
3✔
244
    let result_time_interval = TimeInterval::new(first_start, last_end)?;
3✔
245

246
    let result_descriptor = RasterResultDescriptor {
3✔
247
        data_type: origin_result_descriptor.data_type,
3✔
248
        spatial_reference: origin_result_descriptor.spatial_reference,
3✔
249
        time: Some(result_time_interval),
3✔
250
        bbox: Some(query_rectangle.spatial_bounds.into()),
3✔
251
        resolution: Some(query_rectangle.spatial_resolution.into()),
3✔
252
        bands: origin_result_descriptor.bands.clone(),
3✔
253
    };
3✔
254
    //TODO: Recognize MetaDataDefinition::GdalMetaDataRegular
255
    let meta_data = if slice_info.len() == 1 {
3✔
256
        let loading_info_slice = slice_info.pop().expect("slice_info has len one");
1✔
257
        let time = Some(loading_info_slice.time);
1✔
258
        let params = loading_info_slice
1✔
259
            .params
1✔
260
            .expect("datasets with exactly one timestep should have data");
1✔
261
        let cache_ttl = loading_info_slice.cache_ttl;
1✔
262
        MetaDataDefinition::GdalStatic(GdalMetaDataStatic {
1✔
263
            time,
1✔
264
            params,
1✔
265
            result_descriptor,
1✔
266
            cache_ttl,
1✔
267
        })
1✔
268
    } else {
269
        MetaDataDefinition::GdalMetaDataList(GdalMetaDataList {
2✔
270
            result_descriptor,
2✔
271
            params: slice_info,
2✔
272
        })
2✔
273
    };
274

275
    let dataset_definition = DatasetDefinition {
3✔
276
        properties: AddDataset {
3✔
277
            name: info.name,
3✔
278
            display_name: info.display_name,
3✔
279
            description: info.description.unwrap_or_default(),
3✔
280
            source_operator: "GdalSource".to_owned(),
3✔
281
            symbology: None,  // TODO add symbology?
3✔
282
            provenance: None, // TODO add provenance that references the workflow
3✔
283
            tags: Some(vec!["workflow".to_owned()]),
3✔
284
        },
3✔
285
        meta_data,
3✔
286
    };
3✔
287

3✔
288
    let db = ctx.db();
3✔
289
    let result = db
3✔
290
        .add_dataset(dataset_definition.properties, dataset_definition.meta_data)
3✔
291
        .await?;
259✔
292

293
    Ok(result)
3✔
294
}
3✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc