• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 10178074589

31 Jul 2024 09:34AM UTC coverage: 91.068% (+0.4%) from 90.682%
10178074589

push

github

web-flow
Merge pull request #973 from geo-engine/remove-XGB-update-toolchain

Remove-XGB-update-toolchain

81 of 88 new or added lines in 29 files covered. (92.05%)

456 existing lines in 119 files now uncovered.

131088 of 143945 relevant lines covered (91.07%)

53581.03 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.9
/operators/src/engine/query.rs
1
use std::{
2
    any::{Any, TypeId},
3
    collections::HashMap,
4
    task::{Context, Poll},
5
    {pin::Pin, sync::Arc},
6
};
7

8
use crate::util::Result;
9
use crate::{error, util::create_rayon_thread_pool};
10
use futures::Stream;
11
use geoengine_datatypes::util::test::TestDefault;
12
use pin_project::pin_project;
13
use rayon::ThreadPool;
14
use serde::{Deserialize, Serialize};
15
use stream_cancel::{Trigger, Valve, Valved};
16

17
/// Defines the size in bytes of a vector data chunk
UNCOV
18
#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Debug, Serialize, Deserialize)]
×
19
pub struct ChunkByteSize(usize);
20

21
impl ChunkByteSize {
22
    pub const MIN: ChunkByteSize = ChunkByteSize(usize::MIN);
23
    pub const MAX: ChunkByteSize = ChunkByteSize(usize::MAX);
24

25
    pub fn new(cbs: usize) -> Self {
×
26
        ChunkByteSize(cbs)
×
27
    }
×
28

29
    pub fn bytes(self) -> usize {
×
30
        self.0
×
31
    }
×
32
}
33

34
impl From<usize> for ChunkByteSize {
35
    fn from(size: usize) -> Self {
28✔
36
        ChunkByteSize(size)
28✔
37
    }
28✔
38
}
39

40
impl From<ChunkByteSize> for usize {
41
    fn from(cbs: ChunkByteSize) -> Self {
167✔
42
        cbs.0
167✔
43
    }
167✔
44
}
45

46
impl TestDefault for ChunkByteSize {
47
    fn test_default() -> Self {
463✔
48
        Self(1024 * 1024)
463✔
49
    }
463✔
50
}
51

52
pub trait QueryContext: Send + Sync {
53
    fn chunk_byte_size(&self) -> ChunkByteSize;
54
    fn thread_pool(&self) -> &Arc<ThreadPool>;
55

56
    /// get the `QueryContextExtensions` that contain additional information
57
    fn extensions(&self) -> &QueryContextExtensions;
58

59
    fn abort_registration(&self) -> &QueryAbortRegistration;
60
    fn abort_trigger(&mut self) -> Result<QueryAbortTrigger>;
61
}
62

63
/// This type allows adding additional information to the `QueryContext`.
64
/// It acts like a type map, allowing one to store one value per type.
65
#[derive(Default)]
66
pub struct QueryContextExtensions {
67
    map: HashMap<TypeId, Box<dyn Any + Send + Sync>>,
68
}
69

70
impl QueryContextExtensions {
71
    pub fn insert<T: 'static + Send + Sync>(&mut self, val: T) -> Option<T> {
11✔
72
        self.map
11✔
73
            .insert(TypeId::of::<T>(), Box::new(val))
11✔
74
            .and_then(downcast_owned)
11✔
75
    }
11✔
76

77
    pub fn get<T: 'static + Send + Sync>(&self) -> Option<&T> {
9✔
78
        self.map
9✔
79
            .get(&TypeId::of::<T>())
9✔
80
            .and_then(|boxed| boxed.downcast_ref())
9✔
81
    }
9✔
82
}
83

84
fn downcast_owned<T: 'static + Send + Sync>(boxed: Box<dyn Any + Send + Sync>) -> Option<T> {
×
85
    boxed.downcast().ok().map(|boxed| *boxed)
×
86
}
×
87

88
/// This type allow wrapping multiple streams with `QueryAbortWrapper`s that
89
/// can all be aborted at the same time using the corresponding `QueryAbortTrigger`.
90
pub struct QueryAbortRegistration {
91
    valve: Valve,
92
}
93

94
impl QueryAbortRegistration {
95
    pub fn new() -> (Self, QueryAbortTrigger) {
319✔
96
        let (trigger, valve) = Valve::new();
319✔
97

319✔
98
        (Self { valve }, QueryAbortTrigger { trigger })
319✔
99
    }
319✔
100

101
    pub fn wrap<S: Stream>(&self, stream: S) -> QueryAbortWrapper<S> {
1,156✔
102
        QueryAbortWrapper {
1,156✔
103
            valved: self.valve.wrap(stream),
1,156✔
104
        }
1,156✔
105
    }
1,156✔
106
}
107

108
/// This type wraps a stream and allows aborting it using the corresponding `QueryAbortTrigger`
109
/// from its `QueryAbortRegistration`.
110
#[pin_project(project = AbortWrapperProjection)]
223,775✔
111
pub struct QueryAbortWrapper<S> {
112
    #[pin]
113
    valved: Valved<S>,
114
}
115

116
impl<S> Stream for QueryAbortWrapper<S>
117
where
118
    S: Stream,
119
{
120
    type Item = S::Item;
121

122
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
223,775✔
123
        self.project().valved.poll_next(cx)
223,775✔
124
    }
223,775✔
125
}
126

127
/// This type allows aborting all streams that were wrapped using the corresponding
128
/// `QueryAbortRegistration`.
129
pub struct QueryAbortTrigger {
130
    trigger: Trigger,
131
}
132

133
impl QueryAbortTrigger {
134
    pub fn abort(self) {
2✔
135
        self.trigger.cancel();
2✔
136
    }
2✔
137
}
138

139
pub struct MockQueryContext {
140
    pub chunk_byte_size: ChunkByteSize,
141
    pub thread_pool: Arc<ThreadPool>,
142

143
    pub extensions: QueryContextExtensions,
144
    pub abort_registration: QueryAbortRegistration,
145
    pub abort_trigger: Option<QueryAbortTrigger>,
146
}
147

148
impl TestDefault for MockQueryContext {
149
    fn test_default() -> Self {
148✔
150
        let (abort_registration, abort_trigger) = QueryAbortRegistration::new();
148✔
151
        Self {
148✔
152
            chunk_byte_size: ChunkByteSize::test_default(),
148✔
153
            thread_pool: create_rayon_thread_pool(0),
148✔
154
            extensions: QueryContextExtensions::default(),
148✔
155
            abort_registration,
148✔
156
            abort_trigger: Some(abort_trigger),
148✔
157
        }
148✔
158
    }
148✔
159
}
160

161
impl MockQueryContext {
162
    pub fn new(chunk_byte_size: ChunkByteSize) -> Self {
136✔
163
        let (abort_registration, abort_trigger) = QueryAbortRegistration::new();
136✔
164
        Self {
136✔
165
            chunk_byte_size,
136✔
166
            thread_pool: create_rayon_thread_pool(0),
136✔
167
            extensions: QueryContextExtensions::default(),
136✔
168
            abort_registration,
136✔
169
            abort_trigger: Some(abort_trigger),
136✔
170
        }
136✔
171
    }
136✔
172

173
    pub fn new_with_query_extensions(
2✔
174
        chunk_byte_size: ChunkByteSize,
2✔
175
        extensions: QueryContextExtensions,
2✔
176
    ) -> Self {
2✔
177
        let (abort_registration, abort_trigger) = QueryAbortRegistration::new();
2✔
178
        Self {
2✔
179
            chunk_byte_size,
2✔
180
            thread_pool: create_rayon_thread_pool(0),
2✔
181
            extensions,
2✔
182
            abort_registration,
2✔
183
            abort_trigger: Some(abort_trigger),
2✔
184
        }
2✔
185
    }
2✔
186

187
    pub fn with_chunk_size_and_thread_count(
×
188
        chunk_byte_size: ChunkByteSize,
×
189
        num_threads: usize,
×
190
    ) -> Self {
×
191
        let (abort_registration, abort_trigger) = QueryAbortRegistration::new();
×
192
        Self {
×
193
            chunk_byte_size,
×
194
            thread_pool: create_rayon_thread_pool(num_threads),
×
195
            extensions: QueryContextExtensions::default(),
×
196
            abort_registration,
×
197
            abort_trigger: Some(abort_trigger),
×
198
        }
×
199
    }
×
200
}
201

202
impl QueryContext for MockQueryContext {
203
    fn chunk_byte_size(&self) -> ChunkByteSize {
161✔
204
        self.chunk_byte_size
161✔
205
    }
161✔
206

207
    fn thread_pool(&self) -> &Arc<ThreadPool> {
66,555✔
208
        &self.thread_pool
66,555✔
209
    }
66,555✔
210

211
    fn extensions(&self) -> &QueryContextExtensions {
4✔
212
        &self.extensions
4✔
213
    }
4✔
214

215
    fn abort_registration(&self) -> &QueryAbortRegistration {
1,096✔
216
        &self.abort_registration
1,096✔
217
    }
1,096✔
218

219
    fn abort_trigger(&mut self) -> Result<QueryAbortTrigger> {
19✔
220
        self.abort_trigger
19✔
221
            .take()
19✔
222
            .ok_or(error::Error::AbortTriggerAlreadyUsed)
19✔
223
    }
19✔
224
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc