• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 12469296660

23 Dec 2024 03:15PM UTC coverage: 90.56% (-0.1%) from 90.695%
12469296660

push

github

web-flow
Merge pull request #998 from geo-engine/quota_log_wip

Quota and Data usage Logging

859 of 1214 new or added lines in 66 files covered. (70.76%)

3 existing lines in 2 files now uncovered.

133923 of 147883 relevant lines covered (90.56%)

54439.32 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

46.01
/operators/src/meta/wrapper.rs
1
use crate::adapters::StreamStatisticsAdapter;
2
use crate::engine::{
3
    CanonicOperatorName, CreateSpan, InitializedRasterOperator, InitializedVectorOperator,
4
    QueryContext, QueryProcessor, RasterResultDescriptor, ResultDescriptor,
5
    TypedRasterQueryProcessor, TypedVectorQueryProcessor, VectorResultDescriptor,
6
    WorkflowOperatorPath,
7
};
8
use crate::util::Result;
9
use async_trait::async_trait;
10
use futures::stream::BoxStream;
11
use futures::StreamExt;
12
use geoengine_datatypes::primitives::{
13
    AxisAlignedRectangle, QueryAttributeSelection, QueryRectangle,
14
};
15
use std::sync::atomic::{AtomicUsize, Ordering};
16
use tracing::{span, Level};
17

18
// A wrapper around an initialized operator that adds statistics and quota tracking
19
pub struct InitializedOperatorWrapper<S> {
20
    source: S,
21
    span: CreateSpan,
22
}
23

24
impl<S> InitializedOperatorWrapper<S> {
25
    pub fn new(source: S, span: CreateSpan) -> Self {
6✔
26
        Self { source, span }
6✔
27
    }
6✔
28
}
29

30
impl InitializedRasterOperator for InitializedOperatorWrapper<Box<dyn InitializedRasterOperator>> {
31
    fn result_descriptor(&self) -> &RasterResultDescriptor {
2✔
32
        tracing::debug!(
2✔
33
            event = "raster result descriptor",
NEW
34
            path = self.source.path().to_string()
×
35
        );
36
        self.source.result_descriptor()
2✔
37
    }
2✔
38

39
    #[allow(clippy::too_many_lines)]
40
    fn query_processor(&self) -> Result<TypedRasterQueryProcessor> {
2✔
41
        let path = self.source.path();
2✔
42
        tracing::debug!(event = "query processor", path = path.to_string());
2✔
43
        let processor_result = self.source.query_processor();
2✔
44
        match processor_result {
2✔
45
            Ok(p) => {
2✔
46
                let path_clone = path.clone();
2✔
47
                let res_processor = match p {
2✔
48
                    TypedRasterQueryProcessor::U8(p) => {
2✔
49
                        TypedRasterQueryProcessor::U8(Box::new(QueryProcessorWrapper::new(
2✔
50
                            p,
2✔
51
                            self.span,
2✔
52
                            path_clone,
2✔
53
                            self.source.name(),
2✔
54
                            self.source.data(),
2✔
55
                        )))
2✔
56
                    }
NEW
57
                    TypedRasterQueryProcessor::U16(p) => {
×
NEW
58
                        TypedRasterQueryProcessor::U16(Box::new(QueryProcessorWrapper::new(
×
NEW
59
                            p,
×
NEW
60
                            self.span,
×
NEW
61
                            path_clone,
×
NEW
62
                            self.source.name(),
×
NEW
63
                            self.source.data(),
×
NEW
64
                        )))
×
65
                    }
NEW
66
                    TypedRasterQueryProcessor::U32(p) => {
×
NEW
67
                        TypedRasterQueryProcessor::U32(Box::new(QueryProcessorWrapper::new(
×
NEW
68
                            p,
×
NEW
69
                            self.span,
×
NEW
70
                            path_clone,
×
NEW
71
                            self.source.name(),
×
NEW
72
                            self.source.data(),
×
NEW
73
                        )))
×
74
                    }
NEW
75
                    TypedRasterQueryProcessor::U64(p) => {
×
NEW
76
                        TypedRasterQueryProcessor::U64(Box::new(QueryProcessorWrapper::new(
×
NEW
77
                            p,
×
NEW
78
                            self.span,
×
NEW
79
                            path_clone,
×
NEW
80
                            self.source.name(),
×
NEW
81
                            self.source.data(),
×
NEW
82
                        )))
×
83
                    }
NEW
84
                    TypedRasterQueryProcessor::I8(p) => {
×
NEW
85
                        TypedRasterQueryProcessor::I8(Box::new(QueryProcessorWrapper::new(
×
NEW
86
                            p,
×
NEW
87
                            self.span,
×
NEW
88
                            path_clone,
×
NEW
89
                            self.source.name(),
×
NEW
90
                            self.source.data(),
×
NEW
91
                        )))
×
92
                    }
NEW
93
                    TypedRasterQueryProcessor::I16(p) => {
×
NEW
94
                        TypedRasterQueryProcessor::I16(Box::new(QueryProcessorWrapper::new(
×
NEW
95
                            p,
×
NEW
96
                            self.span,
×
NEW
97
                            path_clone,
×
NEW
98
                            self.source.name(),
×
NEW
99
                            self.source.data(),
×
NEW
100
                        )))
×
101
                    }
NEW
102
                    TypedRasterQueryProcessor::I32(p) => {
×
NEW
103
                        TypedRasterQueryProcessor::I32(Box::new(QueryProcessorWrapper::new(
×
NEW
104
                            p,
×
NEW
105
                            self.span,
×
NEW
106
                            path_clone,
×
NEW
107
                            self.source.name(),
×
NEW
108
                            self.source.data(),
×
NEW
109
                        )))
×
110
                    }
NEW
111
                    TypedRasterQueryProcessor::I64(p) => {
×
NEW
112
                        TypedRasterQueryProcessor::I64(Box::new(QueryProcessorWrapper::new(
×
NEW
113
                            p,
×
NEW
114
                            self.span,
×
NEW
115
                            path_clone,
×
NEW
116
                            self.source.name(),
×
NEW
117
                            self.source.data(),
×
NEW
118
                        )))
×
119
                    }
NEW
120
                    TypedRasterQueryProcessor::F32(p) => {
×
NEW
121
                        TypedRasterQueryProcessor::F32(Box::new(QueryProcessorWrapper::new(
×
NEW
122
                            p,
×
NEW
123
                            self.span,
×
NEW
124
                            path_clone,
×
NEW
125
                            self.source.name(),
×
NEW
126
                            self.source.data(),
×
NEW
127
                        )))
×
128
                    }
NEW
129
                    TypedRasterQueryProcessor::F64(p) => {
×
NEW
130
                        TypedRasterQueryProcessor::F64(Box::new(QueryProcessorWrapper::new(
×
NEW
131
                            p,
×
NEW
132
                            self.span,
×
NEW
133
                            path_clone,
×
NEW
134
                            self.source.name(),
×
NEW
135
                            self.source.data(),
×
NEW
136
                        )))
×
137
                    }
138
                };
139
                tracing::debug!(event = "query processor created");
2✔
140
                Ok(res_processor)
2✔
141
            }
142
            Err(err) => {
×
143
                tracing::debug!(event = "query processor failed");
×
144
                Err(err)
×
145
            }
146
        }
147
    }
2✔
148

149
    fn canonic_name(&self) -> CanonicOperatorName {
×
150
        self.source.canonic_name()
×
151
    }
×
152

NEW
153
    fn name(&self) -> &'static str {
×
NEW
154
        self.source.name()
×
NEW
155
    }
×
156

NEW
157
    fn path(&self) -> WorkflowOperatorPath {
×
NEW
158
        self.source.path()
×
NEW
159
    }
×
160
}
161

162
impl InitializedVectorOperator for InitializedOperatorWrapper<Box<dyn InitializedVectorOperator>> {
163
    fn result_descriptor(&self) -> &VectorResultDescriptor {
×
164
        tracing::debug!(event = "vector result descriptor");
×
165
        self.source.result_descriptor()
×
166
    }
×
167

168
    fn query_processor(&self) -> Result<TypedVectorQueryProcessor> {
1✔
169
        tracing::debug!(event = "query processor");
1✔
170
        let processor_result = self.source.query_processor();
1✔
171
        match processor_result {
1✔
172
            Ok(p) => {
1✔
173
                let result = map_typed_query_processor!(
1✔
174
                    p,
1✔
175
                    p => Box::new(QueryProcessorWrapper::new(p,
×
NEW
176
                    self.span, self.source.path(), self.source.name(), self.source.data()))
×
177
                );
178
                tracing::debug!(event = "query processor created");
1✔
179
                Ok(result)
1✔
180
            }
181
            Err(err) => {
×
182
                tracing::debug!(event = "query processor failed");
×
183
                Err(err)
×
184
            }
185
        }
186
    }
1✔
187

188
    fn canonic_name(&self) -> CanonicOperatorName {
×
189
        self.source.canonic_name()
×
190
    }
×
191

NEW
192
    fn name(&self) -> &'static str {
×
NEW
193
        self.source.name()
×
NEW
194
    }
×
195

NEW
196
    fn path(&self) -> WorkflowOperatorPath {
×
NEW
197
        self.source.path()
×
NEW
198
    }
×
199
}
200

201
// A wrapper around a query processor that adds statistics and quota tracking
202
struct QueryProcessorWrapper<Q, T>
203
where
204
    Q: QueryProcessor<Output = T>,
205
{
206
    processor: Q,
207
    span: CreateSpan,
208
    path: WorkflowOperatorPath,
209
    operator_name: &'static str,
210
    data: Option<String>,
211
    query_count: AtomicUsize,
212
}
213

214
impl<Q, T> QueryProcessorWrapper<Q, T>
215
where
216
    Q: QueryProcessor<Output = T> + Sized,
217
{
218
    pub fn new(
3✔
219
        processor: Q,
3✔
220
        span: CreateSpan,
3✔
221
        path: WorkflowOperatorPath,
3✔
222
        operator_name: &'static str,
3✔
223
        data: Option<String>,
3✔
224
    ) -> Self {
3✔
225
        QueryProcessorWrapper {
3✔
226
            processor,
3✔
227
            span,
3✔
228
            path,
3✔
229
            operator_name,
3✔
230
            data,
3✔
231
            query_count: AtomicUsize::new(0),
3✔
232
        }
3✔
233
    }
3✔
234

235
    pub fn next_query_count(&self) -> usize {
3✔
236
        self.query_count.fetch_add(1, Ordering::SeqCst)
3✔
237
    }
3✔
238
}
239

240
#[async_trait]
241
impl<Q, T, S, A, R> QueryProcessor for QueryProcessorWrapper<Q, T>
242
where
243
    Q: QueryProcessor<Output = T, SpatialBounds = S, Selection = A, ResultDescription = R>,
244
    S: AxisAlignedRectangle + Send + Sync + 'static,
245
    A: QueryAttributeSelection + 'static,
246
    R: ResultDescriptor<QueryRectangleSpatialBounds = S, QueryRectangleAttributeSelection = A>
247
        + 'static,
248
    T: Send,
249
{
250
    type Output = T;
251
    type SpatialBounds = S;
252
    type Selection = A;
253
    type ResultDescription = R;
254

255
    async fn _query<'a>(
256
        &'a self,
257
        query: QueryRectangle<Self::SpatialBounds, Self::Selection>,
258
        ctx: &'a dyn QueryContext,
259
    ) -> Result<BoxStream<'a, Result<Self::Output>>> {
3✔
260
        let qc = self.next_query_count();
3✔
261

262
        // the top level operator creates a new query span for identifying individual queries
263
        let query_span = if self.path.is_root() {
3✔
264
            let span = span!(
3✔
265
                Level::TRACE,
3✔
266
                "Query",
267
                query_id = %uuid::Uuid::new_v4()
3✔
268
            );
269
            Some(span)
3✔
270
        } else {
271
            None
×
272
        };
273

274
        let _query_span_enter = query_span.as_ref().map(tracing::Span::enter);
3✔
275

3✔
276
        let quota_checker = ctx
3✔
277
            .quota_checker()
3✔
278
            .expect("`QuotaChecker` extension should be set during `ProContext` creation");
3✔
279

3✔
280
        // TODO: check the quota only once per query and not for every operator
3✔
281
        quota_checker.ensure_quota_available().await?;
3✔
282

283
        let quota_tracker = ctx
2✔
284
            .quota_tracking()
2✔
285
            .expect("`QuotaTracking` extension should be set during `ProContext` creation")
2✔
286
            .clone();
2✔
287

2✔
288
        let span = (self.span)(&self.path, qc);
2✔
289

2✔
290
        let _enter = span.enter();
2✔
291

2✔
292
        tracing::trace!(
2✔
293
            event = %"query_start",
294
            path = %self.path,
295
            bbox = %format!("[{},{},{},{}]",
×
296
                query.spatial_bounds.lower_left().x,
×
297
                query.spatial_bounds.lower_left().y,
×
298
                query.spatial_bounds.upper_right().x,
×
299
                query.spatial_bounds.upper_right().y
×
300
            ),
301
            time = %format!("[{},{}]",
×
302
                query.time_interval.start().inner(),
×
303
                query.time_interval.end().inner()
×
304
            )
305
        );
306

307
        let stream_result = self.processor.query(query, ctx).await;
2✔
308
        tracing::trace!(event = %"query_ready");
2✔
309

310
        match stream_result {
2✔
311
            Ok(stream) => {
2✔
312
                tracing::trace!(event = %"query_ok");
2✔
313
                Ok(StreamStatisticsAdapter::new(
2✔
314
                    stream,
2✔
315
                    span.clone(),
2✔
316
                    quota_tracker,
2✔
317
                    self.path.clone(),
2✔
318
                    self.operator_name,
2✔
319
                    self.data.clone(),
2✔
320
                )
2✔
321
                .boxed())
2✔
322
            }
323
            Err(err) => {
×
324
                tracing::trace!(event = %"query_error");
×
325
                Err(err)
×
326
            }
327
        }
328
    }
6✔
329

330
    fn result_descriptor(&self) -> &Self::ResultDescription {
6✔
331
        self.processor.result_descriptor()
6✔
332
    }
6✔
333
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc