• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 11911118784

19 Nov 2024 10:06AM UTC coverage: 90.448% (-0.2%) from 90.687%
11911118784

push

github

web-flow
Merge pull request #994 from geo-engine/workspace-dependencies

use workspace dependencies, update toolchain, use global lock in expression

9 of 11 new or added lines in 6 files covered. (81.82%)

369 existing lines in 74 files now uncovered.

132871 of 146904 relevant lines covered (90.45%)

54798.62 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

58.57
/operators/src/meta/wrapper.rs
1
use crate::adapters::StreamStatisticsAdapter;
2
use crate::engine::{
3
    CanonicOperatorName, CreateSpan, InitializedRasterOperator, InitializedVectorOperator,
4
    QueryContext, QueryProcessor, RasterResultDescriptor, ResultDescriptor,
5
    TypedRasterQueryProcessor, TypedVectorQueryProcessor, VectorResultDescriptor,
6
    WorkflowOperatorPath,
7
};
8
use crate::util::Result;
9
use async_trait::async_trait;
10
use futures::stream::BoxStream;
11
use futures::StreamExt;
12
use geoengine_datatypes::primitives::{
13
    AxisAlignedRectangle, QueryAttributeSelection, QueryRectangle,
14
};
15
use std::sync::atomic::{AtomicUsize, Ordering};
16
use tracing::{span, Level};
17

18
// A wrapper around an initialized operator that adds statistics and quota tracking
19
pub struct InitializedOperatorWrapper<S> {
20
    source: S,
21
    span: CreateSpan,
22
    path: WorkflowOperatorPath,
23
}
24

25
impl<S> InitializedOperatorWrapper<S> {
26
    pub fn new(source: S, span: CreateSpan, path: WorkflowOperatorPath) -> Self {
6✔
27
        Self { source, span, path }
6✔
28
    }
6✔
29
}
30

31
impl InitializedRasterOperator for InitializedOperatorWrapper<Box<dyn InitializedRasterOperator>> {
32
    fn result_descriptor(&self) -> &RasterResultDescriptor {
2✔
33
        tracing::debug!(
2✔
34
            event = "raster result descriptor",
35
            path = self.path.to_string()
×
36
        );
37
        self.source.result_descriptor()
2✔
38
    }
2✔
39

40
    fn query_processor(&self) -> Result<TypedRasterQueryProcessor> {
2✔
41
        tracing::debug!(event = "query processor", path = self.path.to_string());
2✔
42
        let processor_result = self.source.query_processor();
2✔
43
        match processor_result {
2✔
44
            Ok(p) => {
2✔
45
                let path_clone = self.path.clone();
2✔
46
                let res_processor = match p {
2✔
47
                    TypedRasterQueryProcessor::U8(p) => TypedRasterQueryProcessor::U8(Box::new(
2✔
48
                        QueryProcessorWrapper::new(p, self.span, path_clone),
2✔
49
                    )),
2✔
50
                    TypedRasterQueryProcessor::U16(p) => TypedRasterQueryProcessor::U16(Box::new(
×
51
                        QueryProcessorWrapper::new(p, self.span, path_clone),
×
52
                    )),
×
53
                    TypedRasterQueryProcessor::U32(p) => TypedRasterQueryProcessor::U32(Box::new(
×
54
                        QueryProcessorWrapper::new(p, self.span, path_clone),
×
55
                    )),
×
56
                    TypedRasterQueryProcessor::U64(p) => TypedRasterQueryProcessor::U64(Box::new(
×
57
                        QueryProcessorWrapper::new(p, self.span, path_clone),
×
58
                    )),
×
59
                    TypedRasterQueryProcessor::I8(p) => TypedRasterQueryProcessor::I8(Box::new(
×
60
                        QueryProcessorWrapper::new(p, self.span, path_clone),
×
61
                    )),
×
62
                    TypedRasterQueryProcessor::I16(p) => TypedRasterQueryProcessor::I16(Box::new(
×
63
                        QueryProcessorWrapper::new(p, self.span, path_clone),
×
64
                    )),
×
65
                    TypedRasterQueryProcessor::I32(p) => TypedRasterQueryProcessor::I32(Box::new(
×
66
                        QueryProcessorWrapper::new(p, self.span, path_clone),
×
67
                    )),
×
68
                    TypedRasterQueryProcessor::I64(p) => TypedRasterQueryProcessor::I64(Box::new(
×
69
                        QueryProcessorWrapper::new(p, self.span, path_clone),
×
70
                    )),
×
71
                    TypedRasterQueryProcessor::F32(p) => TypedRasterQueryProcessor::F32(Box::new(
×
72
                        QueryProcessorWrapper::new(p, self.span, path_clone),
×
73
                    )),
×
74
                    TypedRasterQueryProcessor::F64(p) => TypedRasterQueryProcessor::F64(Box::new(
×
75
                        QueryProcessorWrapper::new(p, self.span, path_clone),
×
76
                    )),
×
77
                };
78
                tracing::debug!(event = "query processor created");
2✔
79
                Ok(res_processor)
2✔
80
            }
81
            Err(err) => {
×
82
                tracing::debug!(event = "query processor failed");
×
83
                Err(err)
×
84
            }
85
        }
86
    }
2✔
87

88
    fn canonic_name(&self) -> CanonicOperatorName {
×
89
        self.source.canonic_name()
×
90
    }
×
91
}
92

93
impl InitializedVectorOperator for InitializedOperatorWrapper<Box<dyn InitializedVectorOperator>> {
94
    fn result_descriptor(&self) -> &VectorResultDescriptor {
×
95
        tracing::debug!(event = "vector result descriptor");
×
96
        self.source.result_descriptor()
×
97
    }
×
98

99
    fn query_processor(&self) -> Result<TypedVectorQueryProcessor> {
1✔
100
        tracing::debug!(event = "query processor");
1✔
101
        let processor_result = self.source.query_processor();
1✔
102
        match processor_result {
1✔
103
            Ok(p) => {
1✔
104
                let result = map_typed_query_processor!(
1✔
105
                    p,
1✔
106
                    p => Box::new(QueryProcessorWrapper::new(p,
×
107
                    self.span, self.path.clone()))
×
108
                );
109
                tracing::debug!(event = "query processor created");
1✔
110
                Ok(result)
1✔
111
            }
112
            Err(err) => {
×
113
                tracing::debug!(event = "query processor failed");
×
114
                Err(err)
×
115
            }
116
        }
117
    }
1✔
118

119
    fn canonic_name(&self) -> CanonicOperatorName {
×
120
        self.source.canonic_name()
×
121
    }
×
122
}
123

124
// A wrapper around a query processor that adds statistics and quota tracking
125
struct QueryProcessorWrapper<Q, T>
126
where
127
    Q: QueryProcessor<Output = T>,
128
{
129
    processor: Q,
130
    span: CreateSpan,
131
    path: WorkflowOperatorPath,
132
    query_count: AtomicUsize,
133
}
134

135
impl<Q, T> QueryProcessorWrapper<Q, T>
136
where
137
    Q: QueryProcessor<Output = T> + Sized,
138
{
139
    pub fn new(processor: Q, span: CreateSpan, path: WorkflowOperatorPath) -> Self {
3✔
140
        QueryProcessorWrapper {
3✔
141
            processor,
3✔
142
            span,
3✔
143
            path,
3✔
144
            query_count: AtomicUsize::new(0),
3✔
145
        }
3✔
146
    }
3✔
147

148
    pub fn next_query_count(&self) -> usize {
3✔
149
        self.query_count.fetch_add(1, Ordering::SeqCst)
3✔
150
    }
3✔
151
}
152

153
#[async_trait]
154
impl<Q, T, S, A, R> QueryProcessor for QueryProcessorWrapper<Q, T>
155
where
156
    Q: QueryProcessor<Output = T, SpatialBounds = S, Selection = A, ResultDescription = R>,
157
    S: AxisAlignedRectangle + Send + Sync + 'static,
158
    A: QueryAttributeSelection + 'static,
159
    R: ResultDescriptor<QueryRectangleSpatialBounds = S, QueryRectangleAttributeSelection = A>
160
        + 'static,
161
    T: Send,
162
{
163
    type Output = T;
164
    type SpatialBounds = S;
165
    type Selection = A;
166
    type ResultDescription = R;
167

168
    async fn _query<'a>(
169
        &'a self,
170
        query: QueryRectangle<Self::SpatialBounds, Self::Selection>,
171
        ctx: &'a dyn QueryContext,
172
    ) -> Result<BoxStream<'a, Result<Self::Output>>> {
3✔
173
        let qc = self.next_query_count();
3✔
174

175
        // the top level operator creates a new query span for identifying individual queries
176
        let query_span = if self.path.is_root() {
3✔
177
            let span = span!(
3✔
178
                Level::TRACE,
3✔
179
                "Query",
180
                query_id = %uuid::Uuid::new_v4()
3✔
181
            );
182
            Some(span)
3✔
183
        } else {
UNCOV
184
            None
×
185
        };
186

187
        let _query_span_enter = query_span.as_ref().map(tracing::Span::enter);
3✔
188

3✔
189
        let quota_checker = ctx
3✔
190
            .quota_checker()
3✔
191
            .expect("`QuotaChecker` extension should be set during `ProContext` creation");
3✔
192

3✔
193
        // TODO: check the quota only once per query and not for every operator
3✔
194
        quota_checker.ensure_quota_available().await?;
9✔
195

196
        let quota_tracker = ctx
2✔
197
            .quota_tracking()
2✔
198
            .expect("`QuotaTracking` extension should be set during `ProContext` creation")
2✔
199
            .clone();
2✔
200

2✔
201
        let span = (self.span)(&self.path, qc);
2✔
202

2✔
203
        let _enter = span.enter();
2✔
204

2✔
205
        tracing::trace!(
2✔
206
            event = %"query_start",
207
            path = %self.path,
UNCOV
208
            bbox = %format!("[{},{},{},{}]",
×
209
                query.spatial_bounds.lower_left().x,
×
210
                query.spatial_bounds.lower_left().y,
×
211
                query.spatial_bounds.upper_right().x,
×
212
                query.spatial_bounds.upper_right().y
×
213
            ),
UNCOV
214
            time = %format!("[{},{}]",
×
215
                query.time_interval.start().inner(),
×
216
                query.time_interval.end().inner()
×
217
            )
218
        );
219

220
        let stream_result = self.processor.query(query, ctx).await;
2✔
221
        tracing::trace!(event = %"query_ready");
2✔
222

223
        match stream_result {
2✔
224
            Ok(stream) => {
2✔
225
                tracing::trace!(event = %"query_ok");
2✔
226
                Ok(StreamStatisticsAdapter::new(
2✔
227
                    stream,
2✔
228
                    span.clone(),
2✔
229
                    quota_tracker,
2✔
230
                    self.path.clone(),
2✔
231
                )
2✔
232
                .boxed())
2✔
233
            }
UNCOV
234
            Err(err) => {
×
235
                tracing::trace!(event = %"query_error");
×
UNCOV
236
                Err(err)
×
237
            }
238
        }
239
    }
6✔
240

241
    fn result_descriptor(&self) -> &Self::ResultDescription {
6✔
242
        self.processor.result_descriptor()
6✔
243
    }
6✔
244
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc