• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 12417203919

19 Dec 2024 04:55PM UTC coverage: 90.351% (-0.2%) from 90.512%
12417203919

Pull #998

github

web-flow
Merge c7e5c8ae4 into 34e12969f
Pull Request #998: quota logging wip

834 of 1211 new or added lines in 66 files covered. (68.87%)

220 existing lines in 21 files now uncovered.

133830 of 148123 relevant lines covered (90.35%)

54352.72 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.54
/operators/src/engine/operator.rs
1
use serde::{Deserialize, Serialize};
2
use tracing::debug;
3

4
use crate::error;
5
use crate::util::Result;
6
use async_trait::async_trait;
7
use geoengine_datatypes::{dataset::NamedData, util::ByteSize};
8

9
use super::{
10
    query_processor::{TypedRasterQueryProcessor, TypedVectorQueryProcessor},
11
    CloneablePlotOperator, CloneableRasterOperator, CloneableVectorOperator, CreateSpan,
12
    ExecutionContext, PlotResultDescriptor, RasterResultDescriptor, TypedPlotQueryProcessor,
13
    VectorResultDescriptor, WorkflowOperatorPath,
14
};
15

16
pub trait OperatorData {
17
    /// Get the ids of all the data involoved in this operator and its sources
18
    fn data_names(&self) -> Vec<NamedData> {
2✔
19
        let mut datasets = vec![];
2✔
20
        self.data_names_collect(&mut datasets);
2✔
21
        datasets
2✔
22
    }
2✔
23

24
    fn data_names_collect(&self, data_names: &mut Vec<NamedData>);
25
}
26

27
/// Common methods for `RasterOperator`s
28
#[typetag::serde(tag = "type")]
136✔
29
#[async_trait]
30
pub trait RasterOperator:
31
    CloneableRasterOperator + OperatorData + Send + Sync + std::fmt::Debug
32
{
33
    /// Internal initialization logic of the operator
34
    async fn _initialize(
35
        self: Box<Self>,
36
        path: WorkflowOperatorPath,
37
        context: &dyn ExecutionContext,
38
    ) -> Result<Box<dyn InitializedRasterOperator>>;
39

40
    /// Initialize the operator
41
    ///
42
    /// This method should not be overriden because it handles wrapping the operator using the
43
    /// execution context. Instead, `_initialize` should be implemented.
44
    async fn initialize(
45
        self: Box<Self>,
46
        path: WorkflowOperatorPath,
47
        context: &dyn ExecutionContext,
48
    ) -> Result<Box<dyn InitializedRasterOperator>> {
384✔
49
        let span = self.span();
384✔
50
        debug!("Initialize {}, path: {}", self.typetag_name(), &path);
384✔
51
        let op = self._initialize(path.clone(), context).await?;
964✔
52

53
        Ok(context.wrap_initialized_raster_operator(op, span))
369✔
54
    }
768✔
55

56
    /// Wrap a box around a `RasterOperator`
57
    fn boxed(self) -> Box<dyn RasterOperator>
525✔
58
    where
525✔
59
        Self: Sized + 'static,
525✔
60
    {
525✔
61
        Box::new(self)
525✔
62
    }
525✔
63

64
    fn span(&self) -> CreateSpan;
65
}
66

67
/// Common methods for `VectorOperator`s
68
#[typetag::serde(tag = "type")]
101✔
69
#[async_trait]
70
pub trait VectorOperator:
71
    CloneableVectorOperator + OperatorData + Send + Sync + std::fmt::Debug
72
{
73
    async fn _initialize(
74
        self: Box<Self>,
75
        path: WorkflowOperatorPath,
76
        context: &dyn ExecutionContext,
77
    ) -> Result<Box<dyn InitializedVectorOperator>>;
78

79
    async fn initialize(
80
        self: Box<Self>,
81
        path: WorkflowOperatorPath,
82
        context: &dyn ExecutionContext,
83
    ) -> Result<Box<dyn InitializedVectorOperator>> {
194✔
84
        let span = self.span();
194✔
85
        debug!("Initialize {}, path: {}", self.typetag_name(), &path);
194✔
86
        let op = self._initialize(path.clone(), context).await?;
194✔
87
        Ok(context.wrap_initialized_vector_operator(op, span))
184✔
88
    }
388✔
89

90
    /// Wrap a box around a `VectorOperator`
91
    fn boxed(self) -> Box<dyn VectorOperator>
246✔
92
    where
246✔
93
        Self: Sized + 'static,
246✔
94
    {
246✔
95
        Box::new(self)
246✔
96
    }
246✔
97

98
    fn span(&self) -> CreateSpan;
99
}
100

101
/// Common methods for `PlotOperator`s
102
#[typetag::serde(tag = "type")]
20✔
103
#[async_trait]
104
pub trait PlotOperator:
105
    CloneablePlotOperator + OperatorData + Send + Sync + std::fmt::Debug
106
{
107
    async fn _initialize(
108
        self: Box<Self>,
109
        path: WorkflowOperatorPath,
110
        context: &dyn ExecutionContext,
111
    ) -> Result<Box<dyn InitializedPlotOperator>>;
112

113
    async fn initialize(
114
        self: Box<Self>,
115
        path: WorkflowOperatorPath,
116
        context: &dyn ExecutionContext,
117
    ) -> Result<Box<dyn InitializedPlotOperator>> {
70✔
118
        let span = self.span();
70✔
119
        debug!("Initialize {}, path: {}", self.typetag_name(), &path);
70✔
120
        let op = self._initialize(path.clone(), context).await?;
70✔
121
        Ok(context.wrap_initialized_plot_operator(op, span))
57✔
122
    }
140✔
123

124
    /// Wrap a box around a `PlotOperator`
125
    fn boxed(self) -> Box<dyn PlotOperator>
79✔
126
    where
79✔
127
        Self: Sized + 'static,
79✔
128
    {
79✔
129
        Box::new(self)
79✔
130
    }
79✔
131

132
    fn span(&self) -> CreateSpan;
133
}
134

135
pub trait InitializedRasterOperator: Send + Sync {
136
    /// Get the result descriptor of the `Operator`
137
    fn result_descriptor(&self) -> &RasterResultDescriptor;
138

139
    /// Instantiate a `TypedVectorQueryProcessor` from a `RasterOperator`
140
    fn query_processor(&self) -> Result<TypedRasterQueryProcessor>;
141

142
    /// Wrap a box around a `RasterOperator`
143
    fn boxed(self) -> Box<dyn InitializedRasterOperator>
333✔
144
    where
333✔
145
        Self: Sized + 'static,
333✔
146
    {
333✔
147
        Box::new(self)
333✔
148
    }
333✔
149

150
    /// Get a canonic representation of the operator and its sources
151
    fn canonic_name(&self) -> CanonicOperatorName;
152

153
    /// Get the unique name of the operator
154
    fn name(&self) -> &'static str;
155

156
    // Get the path of the operator in the workflow
157
    fn path(&self) -> WorkflowOperatorPath;
158

159
    /// Return the name of the data loaded by the operator (if any)
NEW
160
    fn data(&self) -> Option<String> {
×
NEW
161
        None
×
NEW
162
    }
×
163
}
164

165
pub trait InitializedVectorOperator: Send + Sync {
166
    /// Get the result descriptor of the `Operator`
167
    fn result_descriptor(&self) -> &VectorResultDescriptor;
168

169
    /// Instantiate a `TypedVectorQueryProcessor` from a `RasterOperator`
170
    fn query_processor(&self) -> Result<TypedVectorQueryProcessor>;
171

172
    /// Wrap a box around a `RasterOperator`
173
    fn boxed(self) -> Box<dyn InitializedVectorOperator>
177✔
174
    where
177✔
175
        Self: Sized + 'static,
177✔
176
    {
177✔
177
        Box::new(self)
177✔
178
    }
177✔
179

180
    /// Get a canonic representation of the operator and its sources.
181
    /// This only includes *logical* operators, not wrappers
182
    fn canonic_name(&self) -> CanonicOperatorName;
183

184
    /// Get the unique name of the operator
185
    fn name(&self) -> &'static str;
186

187
    // Get the path of the operator in the workflow
188
    fn path(&self) -> WorkflowOperatorPath;
189

190
    /// Return the name of the data loaded by the operator (if any)
NEW
191
    fn data(&self) -> Option<String> {
×
NEW
192
        None
×
NEW
193
    }
×
194
}
195

196
/// A canonic name for an operator and its sources
197
/// We use a byte representation of the operator json
198
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
199
pub struct CanonicOperatorName(Vec<u8>);
200

201
impl CanonicOperatorName {
202
    pub fn new<T: Serialize>(value: &T) -> Result<Self> {
×
203
        Ok(CanonicOperatorName(serde_json::to_vec(&value)?))
×
204
    }
×
205

206
    ///
207
    /// # Panics
208
    ///
209
    /// if the value cannot be serialized as json
210
    ///
211
    pub fn new_unchecked<T: Serialize>(value: &T) -> Self {
654✔
212
        CanonicOperatorName(serde_json::to_vec(&value).expect("it should be checked by the caller"))
654✔
213
    }
654✔
214
}
215

216
impl ByteSize for CanonicOperatorName {
217
    fn heap_byte_size(&self) -> usize {
2✔
218
        self.0.heap_byte_size()
2✔
219
    }
2✔
220
}
221

222
impl<T> From<&T> for CanonicOperatorName
223
where
224
    T: Serialize,
225
{
226
    fn from(value: &T) -> Self {
640✔
227
        CanonicOperatorName::new_unchecked(value)
640✔
228
    }
640✔
229
}
230

231
impl std::fmt::Display for CanonicOperatorName {
232
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
×
233
        let s = String::from_utf8_lossy(&self.0);
×
234
        write!(f, "{s}")
×
235
    }
×
236
}
237

238
pub trait InitializedPlotOperator: Send + Sync {
239
    /// Get the result descriptor of the `Operator`
240
    fn result_descriptor(&self) -> &PlotResultDescriptor;
241

242
    /// Instantiate a `TypedVectorQueryProcessor` from a `RasterOperator`
243
    fn query_processor(&self) -> Result<TypedPlotQueryProcessor>;
244

245
    /// Wrap a box around a `RasterOperator`
246
    fn boxed(self) -> Box<dyn InitializedPlotOperator>
57✔
247
    where
57✔
248
        Self: Sized + 'static,
57✔
249
    {
57✔
250
        Box::new(self)
57✔
251
    }
57✔
252

253
    /// Get a canonic representation of the operator and its sources
254
    fn canonic_name(&self) -> CanonicOperatorName;
255
}
256

257
impl InitializedRasterOperator for Box<dyn InitializedRasterOperator> {
258
    fn result_descriptor(&self) -> &RasterResultDescriptor {
240✔
259
        self.as_ref().result_descriptor()
240✔
260
    }
240✔
261

262
    fn query_processor(&self) -> Result<TypedRasterQueryProcessor> {
260✔
263
        self.as_ref().query_processor()
260✔
264
    }
260✔
265

266
    fn canonic_name(&self) -> CanonicOperatorName {
2✔
267
        self.as_ref().canonic_name()
2✔
268
    }
2✔
269

270
    fn name(&self) -> &'static str {
2✔
271
        self.as_ref().name()
2✔
272
    }
2✔
273

274
    fn path(&self) -> WorkflowOperatorPath {
2✔
275
        self.as_ref().path()
2✔
276
    }
2✔
277

278
    fn data(&self) -> Option<String> {
2✔
279
        self.as_ref().data()
2✔
280
    }
2✔
281
}
282

283
impl InitializedVectorOperator for Box<dyn InitializedVectorOperator> {
284
    fn result_descriptor(&self) -> &VectorResultDescriptor {
185✔
285
        self.as_ref().result_descriptor()
185✔
286
    }
185✔
287

288
    fn query_processor(&self) -> Result<TypedVectorQueryProcessor> {
133✔
289
        self.as_ref().query_processor()
133✔
290
    }
133✔
291

292
    fn canonic_name(&self) -> CanonicOperatorName {
×
293
        self.as_ref().canonic_name()
×
294
    }
×
295

296
    fn name(&self) -> &'static str {
1✔
297
        self.as_ref().name()
1✔
298
    }
1✔
299

300
    fn path(&self) -> WorkflowOperatorPath {
1✔
301
        self.as_ref().path()
1✔
302
    }
1✔
303

304
    fn data(&self) -> Option<String> {
1✔
305
        self.as_ref().data()
1✔
306
    }
1✔
307
}
308

309
impl InitializedPlotOperator for Box<dyn InitializedPlotOperator> {
310
    fn result_descriptor(&self) -> &PlotResultDescriptor {
×
311
        self.as_ref().result_descriptor()
×
312
    }
×
313

314
    fn query_processor(&self) -> Result<TypedPlotQueryProcessor> {
53✔
315
        self.as_ref().query_processor()
53✔
316
    }
53✔
317

318
    fn canonic_name(&self) -> CanonicOperatorName {
×
319
        self.as_ref().canonic_name()
×
320
    }
×
321
}
322

323
/// An enum to differentiate between `Operator` variants
324
#[derive(Clone, Debug, Serialize, Deserialize)]
208✔
325
#[serde(tag = "type", content = "operator")]
326
pub enum TypedOperator {
327
    Vector(Box<dyn VectorOperator>),
328
    Raster(Box<dyn RasterOperator>),
329
    Plot(Box<dyn PlotOperator>),
330
}
331

332
impl TypedOperator {
333
    pub fn get_vector(self) -> Result<Box<dyn VectorOperator>> {
6✔
334
        if let TypedOperator::Vector(o) = self {
6✔
335
            return Ok(o);
6✔
336
        }
×
337
        Err(error::Error::InvalidOperatorType {
×
338
            expected: "Vector".to_owned(),
×
339
            found: self.type_name().to_owned(),
×
340
        })
×
341
    }
6✔
342

343
    pub fn get_raster(self) -> Result<Box<dyn RasterOperator>> {
28✔
344
        if let TypedOperator::Raster(o) = self {
28✔
345
            return Ok(o);
28✔
346
        }
×
347
        Err(error::Error::InvalidOperatorType {
×
348
            expected: "Raster".to_owned(),
×
349
            found: self.type_name().to_owned(),
×
350
        })
×
351
    }
28✔
352

353
    pub fn get_plot(self) -> Result<Box<dyn PlotOperator>> {
2✔
354
        if let TypedOperator::Plot(o) = self {
2✔
355
            return Ok(o);
2✔
356
        }
×
357
        Err(error::Error::InvalidOperatorType {
×
358
            expected: "Plot".to_owned(),
×
359
            found: self.type_name().to_owned(),
×
360
        })
×
361
    }
2✔
362

363
    fn type_name(&self) -> &str {
×
364
        match self {
×
365
            TypedOperator::Vector(_) => "Vector",
×
366
            TypedOperator::Raster(_) => "Raster",
×
367
            TypedOperator::Plot(_) => "Plot",
×
368
        }
369
    }
×
370
}
371

372
impl From<Box<dyn VectorOperator>> for TypedOperator {
373
    fn from(operator: Box<dyn VectorOperator>) -> Self {
22✔
374
        Self::Vector(operator)
22✔
375
    }
22✔
376
}
377

378
impl From<Box<dyn RasterOperator>> for TypedOperator {
379
    fn from(operator: Box<dyn RasterOperator>) -> Self {
9✔
380
        Self::Raster(operator)
9✔
381
    }
9✔
382
}
383

384
impl From<Box<dyn PlotOperator>> for TypedOperator {
385
    fn from(operator: Box<dyn PlotOperator>) -> Self {
14✔
386
        Self::Plot(operator)
14✔
387
    }
14✔
388
}
389

390
#[macro_export]
391
macro_rules! call_on_typed_operator {
392
    ($typed_operator:expr, $operator_var:ident => $function_call:expr) => {
393
        match $typed_operator {
394
            $crate::engine::TypedOperator::Vector($operator_var) => $function_call,
395
            $crate::engine::TypedOperator::Raster($operator_var) => $function_call,
396
            $crate::engine::TypedOperator::Plot($operator_var) => $function_call,
397
        }
398
    };
399
}
400

401
impl OperatorData for TypedOperator {
402
    fn data_names_collect(&self, data_ids: &mut Vec<NamedData>) {
2✔
403
        match self {
2✔
404
            TypedOperator::Vector(v) => v.data_names_collect(data_ids),
×
405
            TypedOperator::Raster(r) => r.data_names_collect(data_ids),
2✔
406
            TypedOperator::Plot(p) => p.data_names_collect(data_ids),
×
407
        }
408
    }
2✔
409
}
410

411
pub trait OperatorName {
412
    const TYPE_NAME: &'static str;
413
}
414

415
#[cfg(test)]
416
mod tests {
417
    use serde_json::json;
418

419
    use super::*;
420

421
    #[test]
422
    fn op_name_byte_size() {
1✔
423
        let op = CanonicOperatorName::new_unchecked(&json!({"foo": "bar"}));
1✔
424
        assert_eq!(op.byte_size(), 37);
1✔
425

426
        let op = CanonicOperatorName::new_unchecked(&json!({"foo": {"bar": [1,2,3]}}));
1✔
427
        assert_eq!(op.byte_size(), 47);
1✔
428
    }
1✔
429
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc