• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 3929938005

pending completion
3929938005

push

github

GitHub
Merge #713

84930 of 96741 relevant lines covered (87.79%)

79640.1 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.13
/operators/src/engine/query.rs
1
use std::{
2
    any::{Any, TypeId},
3
    collections::HashMap,
4
    task::{Context, Poll},
5
    {pin::Pin, sync::Arc},
6
};
7

8
use crate::util::Result;
9
use crate::{error, util::create_rayon_thread_pool};
10
use futures::Stream;
11
use geoengine_datatypes::util::test::TestDefault;
12
use pin_project::pin_project;
13
use rayon::ThreadPool;
14
use serde::{Deserialize, Serialize};
15
use stream_cancel::{Trigger, Valve, Valved};
16

17
/// Defines the size in bytes of a vector data chunk
18
#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Debug, Serialize, Deserialize)]
189✔
19
pub struct ChunkByteSize(usize);
20

21
impl ChunkByteSize {
22
    pub const MIN: ChunkByteSize = ChunkByteSize(usize::MIN);
23
    pub const MAX: ChunkByteSize = ChunkByteSize(usize::MAX);
24

25
    pub fn new(cbs: usize) -> Self {
×
26
        ChunkByteSize(cbs)
×
27
    }
×
28

29
    pub fn bytes(self) -> usize {
×
30
        self.0
×
31
    }
×
32
}
33

34
impl From<usize> for ChunkByteSize {
35
    fn from(size: usize) -> Self {
23✔
36
        ChunkByteSize(size)
23✔
37
    }
23✔
38
}
39

40
impl From<ChunkByteSize> for usize {
41
    fn from(cbs: ChunkByteSize) -> Self {
103✔
42
        cbs.0
103✔
43
    }
103✔
44
}
45

46
impl TestDefault for ChunkByteSize {
47
    fn test_default() -> Self {
421✔
48
        Self(1024 * 1024)
421✔
49
    }
421✔
50
}
51

52
pub trait QueryContext: Send + Sync {
53
    fn chunk_byte_size(&self) -> ChunkByteSize;
54
    fn thread_pool(&self) -> &Arc<ThreadPool>;
55

56
    /// get the `QueryContextExtensions` that contain additional information
57
    fn extensions(&self) -> &QueryContextExtensions;
58

59
    fn abort_registration(&self) -> &QueryAbortRegistration;
60
    fn abort_trigger(&mut self) -> Result<QueryAbortTrigger>;
61
}
62

63
/// This type allows adding additional information to the `QueryContext`.
64
/// It acts like a type map, allowing one to store one value per type.
65
#[derive(Default)]
260✔
66
pub struct QueryContextExtensions {
67
    map: HashMap<TypeId, Box<dyn Any + Send + Sync>>,
68
}
69

70
impl QueryContextExtensions {
71
    pub fn insert<T: 'static + Send + Sync>(&mut self, val: T) -> Option<T> {
2✔
72
        self.map
2✔
73
            .insert(TypeId::of::<T>(), Box::new(val))
2✔
74
            .and_then(downcast_owned)
2✔
75
    }
2✔
76

77
    pub fn get<T: 'static + Send + Sync>(&self) -> Option<&T> {
2✔
78
        self.map
2✔
79
            .get(&TypeId::of::<T>())
2✔
80
            .and_then(|boxed| boxed.downcast_ref())
2✔
81
    }
2✔
82
}
83

84
fn downcast_owned<T: 'static + Send + Sync>(boxed: Box<dyn Any + Send + Sync>) -> Option<T> {
×
85
    boxed.downcast().ok().map(|boxed| *boxed)
×
86
}
×
87

88
/// This type allow wrapping multiple streams with `QueryAbortWrapper`s that
89
/// can all be aborted at the same time using the corresponding `QueryAbortTrigger`.
90
pub struct QueryAbortRegistration {
91
    valve: Valve,
92
}
93

94
impl QueryAbortRegistration {
95
    pub fn new() -> (Self, QueryAbortTrigger) {
260✔
96
        let (trigger, valve) = Valve::new();
260✔
97

260✔
98
        (Self { valve }, QueryAbortTrigger { trigger })
260✔
99
    }
260✔
100

101
    pub fn wrap<S: Stream>(&self, stream: S) -> QueryAbortWrapper<S> {
889✔
102
        QueryAbortWrapper {
889✔
103
            valved: self.valve.wrap(stream),
889✔
104
        }
889✔
105
    }
889✔
106
}
107

108
/// This type wraps a stream and allows aborting it using the corresponding `QueryAbortTrigger`
109
/// from its `QueryAbortRegistration`.
110
#[pin_project(project = AbortWrapperProjection)]
161,692✔
111
pub struct QueryAbortWrapper<S> {
112
    #[pin]
113
    valved: Valved<S>,
114
}
115

116
impl<S> Stream for QueryAbortWrapper<S>
117
where
118
    S: Stream,
119
{
120
    type Item = S::Item;
121

122
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
161,692✔
123
        self.project().valved.poll_next(cx)
161,692✔
124
    }
161,692✔
125
}
126

127
/// This type allows aborting all streams that were wrapped using the corresponding
128
/// `QueryAbortRegistration`.
129
pub struct QueryAbortTrigger {
130
    trigger: Trigger,
131
}
132

133
impl QueryAbortTrigger {
134
    pub fn abort(self) {
×
135
        self.trigger.cancel();
×
136
    }
×
137
}
138

139
pub struct MockQueryContext {
140
    pub chunk_byte_size: ChunkByteSize,
141
    pub thread_pool: Arc<ThreadPool>,
142

143
    pub extensions: QueryContextExtensions,
144
    pub abort_registration: QueryAbortRegistration,
145
    pub abort_trigger: Option<QueryAbortTrigger>,
146
}
147

148
impl TestDefault for MockQueryContext {
149
    fn test_default() -> Self {
118✔
150
        let (abort_registration, abort_trigger) = QueryAbortRegistration::new();
118✔
151
        Self {
118✔
152
            chunk_byte_size: ChunkByteSize::test_default(),
118✔
153
            thread_pool: create_rayon_thread_pool(0),
118✔
154
            extensions: QueryContextExtensions::default(),
118✔
155
            abort_registration,
118✔
156
            abort_trigger: Some(abort_trigger),
118✔
157
        }
118✔
158
    }
118✔
159
}
160

161
impl MockQueryContext {
162
    pub fn new(chunk_byte_size: ChunkByteSize) -> Self {
122✔
163
        let (abort_registration, abort_trigger) = QueryAbortRegistration::new();
122✔
164
        Self {
122✔
165
            chunk_byte_size,
122✔
166
            thread_pool: create_rayon_thread_pool(0),
122✔
167
            extensions: QueryContextExtensions::default(),
122✔
168
            abort_registration,
122✔
169
            abort_trigger: Some(abort_trigger),
122✔
170
        }
122✔
171
    }
122✔
172

173
    pub fn with_chunk_size_and_thread_count(
×
174
        chunk_byte_size: ChunkByteSize,
×
175
        num_threads: usize,
×
176
    ) -> Self {
×
177
        let (abort_registration, abort_trigger) = QueryAbortRegistration::new();
×
178
        Self {
×
179
            chunk_byte_size,
×
180
            thread_pool: create_rayon_thread_pool(num_threads),
×
181
            extensions: QueryContextExtensions::default(),
×
182
            abort_registration,
×
183
            abort_trigger: Some(abort_trigger),
×
184
        }
×
185
    }
×
186
}
187

188
impl QueryContext for MockQueryContext {
189
    fn chunk_byte_size(&self) -> ChunkByteSize {
100✔
190
        self.chunk_byte_size
100✔
191
    }
100✔
192

193
    fn thread_pool(&self) -> &Arc<ThreadPool> {
54,224✔
194
        &self.thread_pool
54,224✔
195
    }
54,224✔
196

197
    fn extensions(&self) -> &QueryContextExtensions {
×
198
        &self.extensions
×
199
    }
×
200

201
    fn abort_registration(&self) -> &QueryAbortRegistration {
851✔
202
        &self.abort_registration
851✔
203
    }
851✔
204

205
    fn abort_trigger(&mut self) -> Result<QueryAbortTrigger> {
12✔
206
        self.abort_trigger
12✔
207
            .take()
12✔
208
            .ok_or(error::Error::AbortTriggerAlreadyUsed)
12✔
209
    }
12✔
210
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc