• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 12466060820

23 Dec 2024 11:26AM UTC coverage: 90.353% (-0.2%) from 90.512%
12466060820

Pull #998

github

web-flow
Merge 66ab0655c into 34e12969f
Pull Request #998: Quota and Data usage Logging

834 of 1211 new or added lines in 66 files covered. (68.87%)

222 existing lines in 18 files now uncovered.

133834 of 148123 relevant lines covered (90.35%)

54353.18 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.54
/operators/src/engine/operator.rs
1
use serde::{Deserialize, Serialize};
2
use tracing::debug;
3

4
use crate::error;
5
use crate::util::Result;
6
use async_trait::async_trait;
7
use geoengine_datatypes::{dataset::NamedData, util::ByteSize};
8

9
use super::{
10
    query_processor::{TypedRasterQueryProcessor, TypedVectorQueryProcessor},
11
    CloneablePlotOperator, CloneableRasterOperator, CloneableVectorOperator, CreateSpan,
12
    ExecutionContext, PlotResultDescriptor, RasterResultDescriptor, TypedPlotQueryProcessor,
13
    VectorResultDescriptor, WorkflowOperatorPath,
14
};
15

16
pub trait OperatorData {
17
    /// Get the ids of all the data involoved in this operator and its sources
18
    fn data_names(&self) -> Vec<NamedData> {
2✔
19
        let mut datasets = vec![];
2✔
20
        self.data_names_collect(&mut datasets);
2✔
21
        datasets
2✔
22
    }
2✔
23

24
    fn data_names_collect(&self, data_names: &mut Vec<NamedData>);
25
}
26

27
/// Common methods for `RasterOperator`s
28
#[typetag::serde(tag = "type")]
136✔
29
#[async_trait]
30
pub trait RasterOperator:
31
    CloneableRasterOperator + OperatorData + Send + Sync + std::fmt::Debug
32
{
33
    /// Internal initialization logic of the operator
34
    async fn _initialize(
35
        self: Box<Self>,
36
        path: WorkflowOperatorPath,
37
        context: &dyn ExecutionContext,
38
    ) -> Result<Box<dyn InitializedRasterOperator>>;
39

40
    /// Initialize the operator
41
    ///
42
    /// This method should not be overriden because it handles wrapping the operator using the
43
    /// execution context. Instead, `_initialize` should be implemented.
44
    async fn initialize(
45
        self: Box<Self>,
46
        path: WorkflowOperatorPath,
47
        context: &dyn ExecutionContext,
48
    ) -> Result<Box<dyn InitializedRasterOperator>> {
384✔
49
        let span = self.span();
384✔
50
        debug!("Initialize {}, path: {}", self.typetag_name(), &path);
384✔
51
        let op = self._initialize(path.clone(), context).await?;
1,427✔
52

53
        Ok(context.wrap_initialized_raster_operator(op, span))
369✔
54
    }
768✔
55

56
    /// Wrap a box around a `RasterOperator`
57
    fn boxed(self) -> Box<dyn RasterOperator>
525✔
58
    where
525✔
59
        Self: Sized + 'static,
525✔
60
    {
525✔
61
        Box::new(self)
525✔
62
    }
525✔
63

64
    fn span(&self) -> CreateSpan;
65
}
66

67
/// Common methods for `VectorOperator`s
68
#[typetag::serde(tag = "type")]
100✔
69
#[async_trait]
70
pub trait VectorOperator:
71
    CloneableVectorOperator + OperatorData + Send + Sync + std::fmt::Debug
72
{
73
    async fn _initialize(
74
        self: Box<Self>,
75
        path: WorkflowOperatorPath,
76
        context: &dyn ExecutionContext,
77
    ) -> Result<Box<dyn InitializedVectorOperator>>;
78

79
    async fn initialize(
80
        self: Box<Self>,
81
        path: WorkflowOperatorPath,
82
        context: &dyn ExecutionContext,
83
    ) -> Result<Box<dyn InitializedVectorOperator>> {
194✔
84
        let span = self.span();
194✔
85
        debug!("Initialize {}, path: {}", self.typetag_name(), &path);
194✔
86
        let op = self._initialize(path.clone(), context).await?;
194✔
87
        Ok(context.wrap_initialized_vector_operator(op, span))
184✔
88
    }
388✔
89

90
    /// Wrap a box around a `VectorOperator`
91
    fn boxed(self) -> Box<dyn VectorOperator>
246✔
92
    where
246✔
93
        Self: Sized + 'static,
246✔
94
    {
246✔
95
        Box::new(self)
246✔
96
    }
246✔
97

98
    fn span(&self) -> CreateSpan;
99
}
100

101
/// Common methods for `PlotOperator`s
102
#[typetag::serde(tag = "type")]
20✔
103
#[async_trait]
104
pub trait PlotOperator:
105
    CloneablePlotOperator + OperatorData + Send + Sync + std::fmt::Debug
106
{
107
    async fn _initialize(
108
        self: Box<Self>,
109
        path: WorkflowOperatorPath,
110
        context: &dyn ExecutionContext,
111
    ) -> Result<Box<dyn InitializedPlotOperator>>;
112

113
    async fn initialize(
114
        self: Box<Self>,
115
        path: WorkflowOperatorPath,
116
        context: &dyn ExecutionContext,
117
    ) -> Result<Box<dyn InitializedPlotOperator>> {
70✔
118
        let span = self.span();
70✔
119
        debug!("Initialize {}, path: {}", self.typetag_name(), &path);
70✔
120
        let op = self._initialize(path.clone(), context).await?;
158✔
121
        Ok(context.wrap_initialized_plot_operator(op, span))
57✔
122
    }
140✔
123

124
    /// Wrap a box around a `PlotOperator`
125
    fn boxed(self) -> Box<dyn PlotOperator>
79✔
126
    where
79✔
127
        Self: Sized + 'static,
79✔
128
    {
79✔
129
        Box::new(self)
79✔
130
    }
79✔
131

132
    fn span(&self) -> CreateSpan;
133
}
134

135
// TODO: implement a derive macro for common fields of operators: name, path, data, result_descriptor and automatically implement common trait functions
136
pub trait InitializedRasterOperator: Send + Sync {
137
    /// Get the result descriptor of the `Operator`
138
    fn result_descriptor(&self) -> &RasterResultDescriptor;
139

140
    /// Instantiate a `TypedVectorQueryProcessor` from a `RasterOperator`
141
    fn query_processor(&self) -> Result<TypedRasterQueryProcessor>;
142

143
    /// Wrap a box around a `RasterOperator`
144
    fn boxed(self) -> Box<dyn InitializedRasterOperator>
333✔
145
    where
333✔
146
        Self: Sized + 'static,
333✔
147
    {
333✔
148
        Box::new(self)
333✔
149
    }
333✔
150

151
    /// Get a canonic representation of the operator and its sources
152
    fn canonic_name(&self) -> CanonicOperatorName;
153

154
    /// Get the unique name of the operator
155
    fn name(&self) -> &'static str;
156

157
    // Get the path of the operator in the workflow
158
    fn path(&self) -> WorkflowOperatorPath;
159

160
    /// Return the name of the data loaded by the operator (if any)
NEW
161
    fn data(&self) -> Option<String> {
×
NEW
162
        None
×
NEW
163
    }
×
164
}
165

166
pub trait InitializedVectorOperator: Send + Sync {
167
    /// Get the result descriptor of the `Operator`
168
    fn result_descriptor(&self) -> &VectorResultDescriptor;
169

170
    /// Instantiate a `TypedVectorQueryProcessor` from a `RasterOperator`
171
    fn query_processor(&self) -> Result<TypedVectorQueryProcessor>;
172

173
    /// Wrap a box around a `RasterOperator`
174
    fn boxed(self) -> Box<dyn InitializedVectorOperator>
177✔
175
    where
177✔
176
        Self: Sized + 'static,
177✔
177
    {
177✔
178
        Box::new(self)
177✔
179
    }
177✔
180

181
    /// Get a canonic representation of the operator and its sources.
182
    /// This only includes *logical* operators, not wrappers
183
    fn canonic_name(&self) -> CanonicOperatorName;
184

185
    /// Get the unique name of the operator
186
    fn name(&self) -> &'static str;
187

188
    // Get the path of the operator in the workflow
189
    fn path(&self) -> WorkflowOperatorPath;
190

191
    /// Return the name of the data loaded by the operator (if any)
NEW
192
    fn data(&self) -> Option<String> {
×
NEW
193
        None
×
NEW
194
    }
×
195
}
196

197
/// A canonic name for an operator and its sources
198
/// We use a byte representation of the operator json
199
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
200
pub struct CanonicOperatorName(Vec<u8>);
201

202
impl CanonicOperatorName {
203
    pub fn new<T: Serialize>(value: &T) -> Result<Self> {
×
204
        Ok(CanonicOperatorName(serde_json::to_vec(&value)?))
×
205
    }
×
206

207
    ///
208
    /// # Panics
209
    ///
210
    /// if the value cannot be serialized as json
211
    ///
212
    pub fn new_unchecked<T: Serialize>(value: &T) -> Self {
654✔
213
        CanonicOperatorName(serde_json::to_vec(&value).expect("it should be checked by the caller"))
654✔
214
    }
654✔
215
}
216

217
impl ByteSize for CanonicOperatorName {
218
    fn heap_byte_size(&self) -> usize {
2✔
219
        self.0.heap_byte_size()
2✔
220
    }
2✔
221
}
222

223
impl<T> From<&T> for CanonicOperatorName
224
where
225
    T: Serialize,
226
{
227
    fn from(value: &T) -> Self {
640✔
228
        CanonicOperatorName::new_unchecked(value)
640✔
229
    }
640✔
230
}
231

232
impl std::fmt::Display for CanonicOperatorName {
233
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
×
234
        let s = String::from_utf8_lossy(&self.0);
×
235
        write!(f, "{s}")
×
236
    }
×
237
}
238

239
pub trait InitializedPlotOperator: Send + Sync {
240
    /// Get the result descriptor of the `Operator`
241
    fn result_descriptor(&self) -> &PlotResultDescriptor;
242

243
    /// Instantiate a `TypedVectorQueryProcessor` from a `RasterOperator`
244
    fn query_processor(&self) -> Result<TypedPlotQueryProcessor>;
245

246
    /// Wrap a box around a `RasterOperator`
247
    fn boxed(self) -> Box<dyn InitializedPlotOperator>
57✔
248
    where
57✔
249
        Self: Sized + 'static,
57✔
250
    {
57✔
251
        Box::new(self)
57✔
252
    }
57✔
253

254
    /// Get a canonic representation of the operator and its sources
255
    fn canonic_name(&self) -> CanonicOperatorName;
256
}
257

258
impl InitializedRasterOperator for Box<dyn InitializedRasterOperator> {
259
    fn result_descriptor(&self) -> &RasterResultDescriptor {
240✔
260
        self.as_ref().result_descriptor()
240✔
261
    }
240✔
262

263
    fn query_processor(&self) -> Result<TypedRasterQueryProcessor> {
260✔
264
        self.as_ref().query_processor()
260✔
265
    }
260✔
266

267
    fn canonic_name(&self) -> CanonicOperatorName {
2✔
268
        self.as_ref().canonic_name()
2✔
269
    }
2✔
270

271
    fn name(&self) -> &'static str {
2✔
272
        self.as_ref().name()
2✔
273
    }
2✔
274

275
    fn path(&self) -> WorkflowOperatorPath {
2✔
276
        self.as_ref().path()
2✔
277
    }
2✔
278

279
    fn data(&self) -> Option<String> {
2✔
280
        self.as_ref().data()
2✔
281
    }
2✔
282
}
283

284
impl InitializedVectorOperator for Box<dyn InitializedVectorOperator> {
285
    fn result_descriptor(&self) -> &VectorResultDescriptor {
185✔
286
        self.as_ref().result_descriptor()
185✔
287
    }
185✔
288

289
    fn query_processor(&self) -> Result<TypedVectorQueryProcessor> {
133✔
290
        self.as_ref().query_processor()
133✔
291
    }
133✔
292

293
    fn canonic_name(&self) -> CanonicOperatorName {
×
294
        self.as_ref().canonic_name()
×
295
    }
×
296

297
    fn name(&self) -> &'static str {
1✔
298
        self.as_ref().name()
1✔
299
    }
1✔
300

301
    fn path(&self) -> WorkflowOperatorPath {
1✔
302
        self.as_ref().path()
1✔
303
    }
1✔
304

305
    fn data(&self) -> Option<String> {
1✔
306
        self.as_ref().data()
1✔
307
    }
1✔
308
}
309

310
impl InitializedPlotOperator for Box<dyn InitializedPlotOperator> {
311
    fn result_descriptor(&self) -> &PlotResultDescriptor {
×
312
        self.as_ref().result_descriptor()
×
313
    }
×
314

315
    fn query_processor(&self) -> Result<TypedPlotQueryProcessor> {
53✔
316
        self.as_ref().query_processor()
53✔
317
    }
53✔
318

319
    fn canonic_name(&self) -> CanonicOperatorName {
×
320
        self.as_ref().canonic_name()
×
321
    }
×
322
}
323

324
/// An enum to differentiate between `Operator` variants
325
#[derive(Clone, Debug, Serialize, Deserialize)]
208✔
326
#[serde(tag = "type", content = "operator")]
327
pub enum TypedOperator {
328
    Vector(Box<dyn VectorOperator>),
329
    Raster(Box<dyn RasterOperator>),
330
    Plot(Box<dyn PlotOperator>),
331
}
332

333
impl TypedOperator {
334
    pub fn get_vector(self) -> Result<Box<dyn VectorOperator>> {
6✔
335
        if let TypedOperator::Vector(o) = self {
6✔
336
            return Ok(o);
6✔
337
        }
×
338
        Err(error::Error::InvalidOperatorType {
×
339
            expected: "Vector".to_owned(),
×
340
            found: self.type_name().to_owned(),
×
341
        })
×
342
    }
6✔
343

344
    pub fn get_raster(self) -> Result<Box<dyn RasterOperator>> {
28✔
345
        if let TypedOperator::Raster(o) = self {
28✔
346
            return Ok(o);
28✔
347
        }
×
348
        Err(error::Error::InvalidOperatorType {
×
349
            expected: "Raster".to_owned(),
×
350
            found: self.type_name().to_owned(),
×
351
        })
×
352
    }
28✔
353

354
    pub fn get_plot(self) -> Result<Box<dyn PlotOperator>> {
2✔
355
        if let TypedOperator::Plot(o) = self {
2✔
356
            return Ok(o);
2✔
357
        }
×
358
        Err(error::Error::InvalidOperatorType {
×
359
            expected: "Plot".to_owned(),
×
360
            found: self.type_name().to_owned(),
×
361
        })
×
362
    }
2✔
363

364
    fn type_name(&self) -> &str {
×
365
        match self {
×
366
            TypedOperator::Vector(_) => "Vector",
×
367
            TypedOperator::Raster(_) => "Raster",
×
368
            TypedOperator::Plot(_) => "Plot",
×
369
        }
370
    }
×
371
}
372

373
impl From<Box<dyn VectorOperator>> for TypedOperator {
374
    fn from(operator: Box<dyn VectorOperator>) -> Self {
22✔
375
        Self::Vector(operator)
22✔
376
    }
22✔
377
}
378

379
impl From<Box<dyn RasterOperator>> for TypedOperator {
380
    fn from(operator: Box<dyn RasterOperator>) -> Self {
9✔
381
        Self::Raster(operator)
9✔
382
    }
9✔
383
}
384

385
impl From<Box<dyn PlotOperator>> for TypedOperator {
386
    fn from(operator: Box<dyn PlotOperator>) -> Self {
14✔
387
        Self::Plot(operator)
14✔
388
    }
14✔
389
}
390

391
#[macro_export]
392
macro_rules! call_on_typed_operator {
393
    ($typed_operator:expr, $operator_var:ident => $function_call:expr) => {
394
        match $typed_operator {
395
            $crate::engine::TypedOperator::Vector($operator_var) => $function_call,
396
            $crate::engine::TypedOperator::Raster($operator_var) => $function_call,
397
            $crate::engine::TypedOperator::Plot($operator_var) => $function_call,
398
        }
399
    };
400
}
401

402
impl OperatorData for TypedOperator {
403
    fn data_names_collect(&self, data_ids: &mut Vec<NamedData>) {
2✔
404
        match self {
2✔
405
            TypedOperator::Vector(v) => v.data_names_collect(data_ids),
×
406
            TypedOperator::Raster(r) => r.data_names_collect(data_ids),
2✔
407
            TypedOperator::Plot(p) => p.data_names_collect(data_ids),
×
408
        }
409
    }
2✔
410
}
411

412
pub trait OperatorName {
413
    const TYPE_NAME: &'static str;
414
}
415

416
#[cfg(test)]
417
mod tests {
418
    use serde_json::json;
419

420
    use super::*;
421

422
    #[test]
423
    fn op_name_byte_size() {
1✔
424
        let op = CanonicOperatorName::new_unchecked(&json!({"foo": "bar"}));
1✔
425
        assert_eq!(op.byte_size(), 37);
1✔
426

427
        let op = CanonicOperatorName::new_unchecked(&json!({"foo": {"bar": [1,2,3]}}));
1✔
428
        assert_eq!(op.byte_size(), 47);
1✔
429
    }
1✔
430
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc