• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 12469296660

23 Dec 2024 03:15PM UTC coverage: 90.56% (-0.1%) from 90.695%
12469296660

push

github

web-flow
Merge pull request #998 from geo-engine/quota_log_wip

Quota and Data usage Logging

859 of 1214 new or added lines in 66 files covered. (70.76%)

3 existing lines in 2 files now uncovered.

133923 of 147883 relevant lines covered (90.56%)

54439.32 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.41
/operators/src/engine/operator.rs
1
use serde::{Deserialize, Serialize};
2
use tracing::debug;
3

4
use crate::error;
5
use crate::util::Result;
6
use async_trait::async_trait;
7
use geoengine_datatypes::{dataset::NamedData, util::ByteSize};
8

9
use super::{
10
    query_processor::{TypedRasterQueryProcessor, TypedVectorQueryProcessor},
11
    CloneablePlotOperator, CloneableRasterOperator, CloneableVectorOperator, CreateSpan,
12
    ExecutionContext, PlotResultDescriptor, RasterResultDescriptor, TypedPlotQueryProcessor,
13
    VectorResultDescriptor, WorkflowOperatorPath,
14
};
15

16
pub trait OperatorData {
17
    /// Get the ids of all the data involoved in this operator and its sources
18
    fn data_names(&self) -> Vec<NamedData> {
2✔
19
        let mut datasets = vec![];
2✔
20
        self.data_names_collect(&mut datasets);
2✔
21
        datasets
2✔
22
    }
2✔
23

24
    fn data_names_collect(&self, data_names: &mut Vec<NamedData>);
25
}
26

27
/// Common methods for `RasterOperator`s
28
#[typetag::serde(tag = "type")]
136✔
29
#[async_trait]
30
pub trait RasterOperator:
31
    CloneableRasterOperator + OperatorData + Send + Sync + std::fmt::Debug
32
{
33
    /// Internal initialization logic of the operator
34
    async fn _initialize(
35
        self: Box<Self>,
36
        path: WorkflowOperatorPath,
37
        context: &dyn ExecutionContext,
38
    ) -> Result<Box<dyn InitializedRasterOperator>>;
39

40
    /// Initialize the operator
41
    ///
42
    /// This method should not be overriden because it handles wrapping the operator using the
43
    /// execution context. Instead, `_initialize` should be implemented.
44
    async fn initialize(
45
        self: Box<Self>,
46
        path: WorkflowOperatorPath,
47
        context: &dyn ExecutionContext,
48
    ) -> Result<Box<dyn InitializedRasterOperator>> {
384✔
49
        let span = self.span();
384✔
50
        debug!("Initialize {}, path: {}", self.typetag_name(), &path);
384✔
51
        #[allow(clippy::used_underscore_items)] // TODO: maybe rename?
52
        let op = self._initialize(path.clone(), context).await?;
384✔
53

54
        Ok(context.wrap_initialized_raster_operator(op, span))
369✔
55
    }
768✔
56

57
    /// Wrap a box around a `RasterOperator`
58
    fn boxed(self) -> Box<dyn RasterOperator>
525✔
59
    where
525✔
60
        Self: Sized + 'static,
525✔
61
    {
525✔
62
        Box::new(self)
525✔
63
    }
525✔
64

65
    fn span(&self) -> CreateSpan;
66
}
67

68
/// Common methods for `VectorOperator`s
69
#[typetag::serde(tag = "type")]
101✔
70
#[async_trait]
71
pub trait VectorOperator:
72
    CloneableVectorOperator + OperatorData + Send + Sync + std::fmt::Debug
73
{
74
    async fn _initialize(
75
        self: Box<Self>,
76
        path: WorkflowOperatorPath,
77
        context: &dyn ExecutionContext,
78
    ) -> Result<Box<dyn InitializedVectorOperator>>;
79

80
    #[allow(clippy::used_underscore_items)]
81
    async fn initialize(
82
        self: Box<Self>,
83
        path: WorkflowOperatorPath,
84
        context: &dyn ExecutionContext,
85
    ) -> Result<Box<dyn InitializedVectorOperator>> {
194✔
86
        let span = self.span();
194✔
87
        debug!("Initialize {}, path: {}", self.typetag_name(), &path);
194✔
88
        let op = self._initialize(path.clone(), context).await?;
194✔
89
        Ok(context.wrap_initialized_vector_operator(op, span))
184✔
90
    }
388✔
91

92
    /// Wrap a box around a `VectorOperator`
93
    fn boxed(self) -> Box<dyn VectorOperator>
246✔
94
    where
246✔
95
        Self: Sized + 'static,
246✔
96
    {
246✔
97
        Box::new(self)
246✔
98
    }
246✔
99

100
    fn span(&self) -> CreateSpan;
101
}
102

103
/// Common methods for `PlotOperator`s
104
#[typetag::serde(tag = "type")]
20✔
105
#[async_trait]
106
pub trait PlotOperator:
107
    CloneablePlotOperator + OperatorData + Send + Sync + std::fmt::Debug
108
{
109
    async fn _initialize(
110
        self: Box<Self>,
111
        path: WorkflowOperatorPath,
112
        context: &dyn ExecutionContext,
113
    ) -> Result<Box<dyn InitializedPlotOperator>>;
114

115
    async fn initialize(
116
        self: Box<Self>,
117
        path: WorkflowOperatorPath,
118
        context: &dyn ExecutionContext,
119
    ) -> Result<Box<dyn InitializedPlotOperator>> {
70✔
120
        let span = self.span();
70✔
121
        debug!("Initialize {}, path: {}", self.typetag_name(), &path);
70✔
122
        #[allow(clippy::used_underscore_items)] // TODO: maybe rename?
123
        let op = self._initialize(path.clone(), context).await?;
70✔
124
        Ok(context.wrap_initialized_plot_operator(op, span))
57✔
125
    }
140✔
126

127
    /// Wrap a box around a `PlotOperator`
128
    fn boxed(self) -> Box<dyn PlotOperator>
79✔
129
    where
79✔
130
        Self: Sized + 'static,
79✔
131
    {
79✔
132
        Box::new(self)
79✔
133
    }
79✔
134

135
    fn span(&self) -> CreateSpan;
136
}
137

138
// TODO: implement a derive macro for common fields of operators: name, path, data, result_descriptor and automatically implement common trait functions
139
pub trait InitializedRasterOperator: Send + Sync {
140
    /// Get the result descriptor of the `Operator`
141
    fn result_descriptor(&self) -> &RasterResultDescriptor;
142

143
    /// Instantiate a `TypedVectorQueryProcessor` from a `RasterOperator`
144
    fn query_processor(&self) -> Result<TypedRasterQueryProcessor>;
145

146
    /// Wrap a box around a `RasterOperator`
147
    fn boxed(self) -> Box<dyn InitializedRasterOperator>
333✔
148
    where
333✔
149
        Self: Sized + 'static,
333✔
150
    {
333✔
151
        Box::new(self)
333✔
152
    }
333✔
153

154
    /// Get a canonic representation of the operator and its sources
155
    fn canonic_name(&self) -> CanonicOperatorName;
156

157
    /// Get the unique name of the operator
158
    fn name(&self) -> &'static str;
159

160
    // Get the path of the operator in the workflow
161
    fn path(&self) -> WorkflowOperatorPath;
162

163
    /// Return the name of the data loaded by the operator (if any)
NEW
164
    fn data(&self) -> Option<String> {
×
NEW
165
        None
×
NEW
166
    }
×
167
}
168

169
pub trait InitializedVectorOperator: Send + Sync {
170
    /// Get the result descriptor of the `Operator`
171
    fn result_descriptor(&self) -> &VectorResultDescriptor;
172

173
    /// Instantiate a `TypedVectorQueryProcessor` from a `RasterOperator`
174
    fn query_processor(&self) -> Result<TypedVectorQueryProcessor>;
175

176
    /// Wrap a box around a `RasterOperator`
177
    fn boxed(self) -> Box<dyn InitializedVectorOperator>
177✔
178
    where
177✔
179
        Self: Sized + 'static,
177✔
180
    {
177✔
181
        Box::new(self)
177✔
182
    }
177✔
183

184
    /// Get a canonic representation of the operator and its sources.
185
    /// This only includes *logical* operators, not wrappers
186
    fn canonic_name(&self) -> CanonicOperatorName;
187

188
    /// Get the unique name of the operator
189
    fn name(&self) -> &'static str;
190

191
    // Get the path of the operator in the workflow
192
    fn path(&self) -> WorkflowOperatorPath;
193

194
    /// Return the name of the data loaded by the operator (if any)
NEW
195
    fn data(&self) -> Option<String> {
×
NEW
196
        None
×
NEW
197
    }
×
198
}
199

200
/// A canonic name for an operator and its sources
201
/// We use a byte representation of the operator json
202
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
203
pub struct CanonicOperatorName(Vec<u8>);
204

205
impl CanonicOperatorName {
206
    pub fn new<T: Serialize>(value: &T) -> Result<Self> {
×
207
        Ok(CanonicOperatorName(serde_json::to_vec(&value)?))
×
208
    }
×
209

210
    ///
211
    /// # Panics
212
    ///
213
    /// if the value cannot be serialized as json
214
    ///
215
    pub fn new_unchecked<T: Serialize>(value: &T) -> Self {
654✔
216
        CanonicOperatorName(serde_json::to_vec(&value).expect("it should be checked by the caller"))
654✔
217
    }
654✔
218
}
219

220
impl ByteSize for CanonicOperatorName {
221
    fn heap_byte_size(&self) -> usize {
2✔
222
        self.0.heap_byte_size()
2✔
223
    }
2✔
224
}
225

226
impl<T> From<&T> for CanonicOperatorName
227
where
228
    T: Serialize,
229
{
230
    fn from(value: &T) -> Self {
640✔
231
        CanonicOperatorName::new_unchecked(value)
640✔
232
    }
640✔
233
}
234

235
impl std::fmt::Display for CanonicOperatorName {
236
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
×
237
        let s = String::from_utf8_lossy(&self.0);
×
238
        write!(f, "{s}")
×
239
    }
×
240
}
241

242
pub trait InitializedPlotOperator: Send + Sync {
243
    /// Get the result descriptor of the `Operator`
244
    fn result_descriptor(&self) -> &PlotResultDescriptor;
245

246
    /// Instantiate a `TypedVectorQueryProcessor` from a `RasterOperator`
247
    fn query_processor(&self) -> Result<TypedPlotQueryProcessor>;
248

249
    /// Wrap a box around a `RasterOperator`
250
    fn boxed(self) -> Box<dyn InitializedPlotOperator>
57✔
251
    where
57✔
252
        Self: Sized + 'static,
57✔
253
    {
57✔
254
        Box::new(self)
57✔
255
    }
57✔
256

257
    /// Get a canonic representation of the operator and its sources
258
    fn canonic_name(&self) -> CanonicOperatorName;
259
}
260

261
impl InitializedRasterOperator for Box<dyn InitializedRasterOperator> {
262
    fn result_descriptor(&self) -> &RasterResultDescriptor {
240✔
263
        self.as_ref().result_descriptor()
240✔
264
    }
240✔
265

266
    fn query_processor(&self) -> Result<TypedRasterQueryProcessor> {
260✔
267
        self.as_ref().query_processor()
260✔
268
    }
260✔
269

270
    fn canonic_name(&self) -> CanonicOperatorName {
2✔
271
        self.as_ref().canonic_name()
2✔
272
    }
2✔
273

274
    fn name(&self) -> &'static str {
2✔
275
        self.as_ref().name()
2✔
276
    }
2✔
277

278
    fn path(&self) -> WorkflowOperatorPath {
2✔
279
        self.as_ref().path()
2✔
280
    }
2✔
281

282
    fn data(&self) -> Option<String> {
2✔
283
        self.as_ref().data()
2✔
284
    }
2✔
285
}
286

287
impl InitializedVectorOperator for Box<dyn InitializedVectorOperator> {
288
    fn result_descriptor(&self) -> &VectorResultDescriptor {
185✔
289
        self.as_ref().result_descriptor()
185✔
290
    }
185✔
291

292
    fn query_processor(&self) -> Result<TypedVectorQueryProcessor> {
133✔
293
        self.as_ref().query_processor()
133✔
294
    }
133✔
295

296
    fn canonic_name(&self) -> CanonicOperatorName {
×
297
        self.as_ref().canonic_name()
×
298
    }
×
299

300
    fn name(&self) -> &'static str {
1✔
301
        self.as_ref().name()
1✔
302
    }
1✔
303

304
    fn path(&self) -> WorkflowOperatorPath {
1✔
305
        self.as_ref().path()
1✔
306
    }
1✔
307

308
    fn data(&self) -> Option<String> {
1✔
309
        self.as_ref().data()
1✔
310
    }
1✔
311
}
312

313
impl InitializedPlotOperator for Box<dyn InitializedPlotOperator> {
314
    fn result_descriptor(&self) -> &PlotResultDescriptor {
×
315
        self.as_ref().result_descriptor()
×
316
    }
×
317

318
    fn query_processor(&self) -> Result<TypedPlotQueryProcessor> {
53✔
319
        self.as_ref().query_processor()
53✔
320
    }
53✔
321

322
    fn canonic_name(&self) -> CanonicOperatorName {
×
323
        self.as_ref().canonic_name()
×
324
    }
×
325
}
326

327
/// An enum to differentiate between `Operator` variants
328
#[derive(Clone, Debug, Serialize, Deserialize)]
329
#[serde(tag = "type", content = "operator")]
330
pub enum TypedOperator {
331
    Vector(Box<dyn VectorOperator>),
332
    Raster(Box<dyn RasterOperator>),
333
    Plot(Box<dyn PlotOperator>),
334
}
335

336
impl TypedOperator {
337
    pub fn get_vector(self) -> Result<Box<dyn VectorOperator>> {
6✔
338
        if let TypedOperator::Vector(o) = self {
6✔
339
            return Ok(o);
6✔
340
        }
×
341
        Err(error::Error::InvalidOperatorType {
×
342
            expected: "Vector".to_owned(),
×
343
            found: self.type_name().to_owned(),
×
344
        })
×
345
    }
6✔
346

347
    pub fn get_raster(self) -> Result<Box<dyn RasterOperator>> {
28✔
348
        if let TypedOperator::Raster(o) = self {
28✔
349
            return Ok(o);
28✔
350
        }
×
351
        Err(error::Error::InvalidOperatorType {
×
352
            expected: "Raster".to_owned(),
×
353
            found: self.type_name().to_owned(),
×
354
        })
×
355
    }
28✔
356

357
    pub fn get_plot(self) -> Result<Box<dyn PlotOperator>> {
2✔
358
        if let TypedOperator::Plot(o) = self {
2✔
359
            return Ok(o);
2✔
360
        }
×
361
        Err(error::Error::InvalidOperatorType {
×
362
            expected: "Plot".to_owned(),
×
363
            found: self.type_name().to_owned(),
×
364
        })
×
365
    }
2✔
366

367
    fn type_name(&self) -> &str {
×
368
        match self {
×
369
            TypedOperator::Vector(_) => "Vector",
×
370
            TypedOperator::Raster(_) => "Raster",
×
371
            TypedOperator::Plot(_) => "Plot",
×
372
        }
373
    }
×
374
}
375

376
impl From<Box<dyn VectorOperator>> for TypedOperator {
377
    fn from(operator: Box<dyn VectorOperator>) -> Self {
22✔
378
        Self::Vector(operator)
22✔
379
    }
22✔
380
}
381

382
impl From<Box<dyn RasterOperator>> for TypedOperator {
383
    fn from(operator: Box<dyn RasterOperator>) -> Self {
9✔
384
        Self::Raster(operator)
9✔
385
    }
9✔
386
}
387

388
impl From<Box<dyn PlotOperator>> for TypedOperator {
389
    fn from(operator: Box<dyn PlotOperator>) -> Self {
14✔
390
        Self::Plot(operator)
14✔
391
    }
14✔
392
}
393

394
#[macro_export]
395
macro_rules! call_on_typed_operator {
396
    ($typed_operator:expr, $operator_var:ident => $function_call:expr) => {
397
        match $typed_operator {
398
            $crate::engine::TypedOperator::Vector($operator_var) => $function_call,
399
            $crate::engine::TypedOperator::Raster($operator_var) => $function_call,
400
            $crate::engine::TypedOperator::Plot($operator_var) => $function_call,
401
        }
402
    };
403
}
404

405
impl OperatorData for TypedOperator {
406
    fn data_names_collect(&self, data_ids: &mut Vec<NamedData>) {
2✔
407
        match self {
2✔
408
            TypedOperator::Vector(v) => v.data_names_collect(data_ids),
×
409
            TypedOperator::Raster(r) => r.data_names_collect(data_ids),
2✔
410
            TypedOperator::Plot(p) => p.data_names_collect(data_ids),
×
411
        }
412
    }
2✔
413
}
414

415
pub trait OperatorName {
416
    const TYPE_NAME: &'static str;
417
}
418

419
#[cfg(test)]
420
mod tests {
421
    use serde_json::json;
422

423
    use super::*;
424

425
    #[test]
426
    fn op_name_byte_size() {
1✔
427
        let op = CanonicOperatorName::new_unchecked(&json!({"foo": "bar"}));
1✔
428
        assert_eq!(op.byte_size(), 37);
1✔
429

430
        let op = CanonicOperatorName::new_unchecked(&json!({"foo": {"bar": [1,2,3]}}));
1✔
431
        assert_eq!(op.byte_size(), 47);
1✔
432
    }
1✔
433
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc