• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 10178074589

31 Jul 2024 09:34AM UTC coverage: 91.068% (+0.4%) from 90.682%
10178074589

push

github

web-flow
Merge pull request #973 from geo-engine/remove-XGB-update-toolchain

Remove-XGB-update-toolchain

81 of 88 new or added lines in 29 files covered. (92.05%)

456 existing lines in 119 files now uncovered.

131088 of 143945 relevant lines covered (91.07%)

53581.03 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.36
/services/src/workflows/vector_stream.rs
1
use crate::{contexts::SessionContext, error::Result};
2
use actix::{
3
    fut::wrap_future, Actor, ActorContext, ActorFutureExt, AsyncContext, SpawnHandle, StreamHandler,
4
};
5
use actix_http::ws::{CloseCode, CloseReason, Item};
6
use actix_web_actors::ws;
7
use futures::{stream::BoxStream, FutureExt, StreamExt, TryFutureExt, TryStreamExt};
8
use geoengine_datatypes::{collections::FeatureCollectionIpc, primitives::VectorQueryRectangle};
9
use geoengine_operators::{
10
    call_on_generic_vector_processor,
11
    engine::{
12
        QueryAbortTrigger, QueryContext, QueryProcessorExt, VectorOperator, WorkflowOperatorPath,
13
    },
14
};
15

16
pub struct VectorWebsocketStreamHandler {
17
    state: VectorWebsocketStreamHandlerState,
18
    abort_handle: Option<QueryAbortTrigger>,
19
}
20

21
type ByteStream = BoxStream<'static, StreamResult>;
22
type StreamResult = Result<Vec<u8>>;
23

24
enum VectorWebsocketStreamHandlerState {
25
    Closed,
26
    Idle { stream: ByteStream },
27
    Processing { _fut: SpawnHandle },
28
}
29

30
impl Default for VectorWebsocketStreamHandlerState {
31
    fn default() -> Self {
4✔
32
        Self::Closed
4✔
33
    }
4✔
34
}
35

36
impl Actor for VectorWebsocketStreamHandler {
37
    type Context = ws::WebsocketContext<Self>;
38
}
39

40
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for VectorWebsocketStreamHandler {
41
    fn started(&mut self, _ctx: &mut Self::Context) {}
1✔
42

43
    fn finished(&mut self, ctx: &mut Self::Context) {
1✔
44
        ctx.stop();
1✔
45

1✔
46
        self.abort_processing();
1✔
47
    }
1✔
48

49
    fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
4✔
50
        match msg {
4✔
51
            Ok(ws::Message::Ping(msg)) => ctx.pong(&msg),
×
52
            Ok(ws::Message::Text(text)) if &text == "NEXT" => self.next_chunk(ctx),
4✔
53
            Ok(ws::Message::Close(reason)) => {
×
54
                ctx.close(reason);
×
55

×
56
                self.finished(ctx);
×
57
            }
×
58
            // for now, we ignore all other messages
59
            _ => (),
×
60
        }
61
    }
4✔
62
}
63

64
impl VectorWebsocketStreamHandler {
65
    pub async fn new<C: SessionContext>(
1✔
66
        vector_operator: Box<dyn VectorOperator>,
1✔
67
        query_rectangle: VectorQueryRectangle,
1✔
68
        execution_ctx: C::ExecutionContext,
1✔
69
        mut query_ctx: C::QueryContext,
1✔
70
    ) -> Result<Self> {
1✔
71
        let workflow_operator_path_root = WorkflowOperatorPath::initialize_root();
1✔
72

73
        let initialized_operator = vector_operator
1✔
74
            .initialize(workflow_operator_path_root, &execution_ctx)
1✔
75
            .await?;
×
76

77
        let spatial_reference = initialized_operator.result_descriptor().spatial_reference;
1✔
78

79
        let query_processor = initialized_operator.query_processor()?;
1✔
80

81
        let abort_handle = query_ctx.abort_trigger().ok();
1✔
82

83
        let byte_stream = call_on_generic_vector_processor!(query_processor, p => {
1✔
UNCOV
84
            let batch_stream = p
×
UNCOV
85
                .query_into_owned_stream(query_rectangle, Box::new(query_ctx))
×
86
                .await?;
×
87

UNCOV
88
            batch_stream.and_then(
×
89
                move |batch| crate::util::spawn_blocking(
3✔
90
                    move || batch.to_arrow_ipc_file_bytes(spatial_reference).map_err(Into::into)
3✔
91
                ).err_into().map(| r | r.and_then(std::convert::identity))
3✔
UNCOV
92
            ).boxed()
×
93
        });
94

95
        // TODO: think about buffering the stream?
96

97
        Ok(Self {
1✔
98
            state: VectorWebsocketStreamHandlerState::Idle {
1✔
99
                stream: byte_stream.map_err(Into::into).boxed(),
1✔
100
            },
1✔
101
            abort_handle,
1✔
102
        })
1✔
103
    }
1✔
104

105
    pub fn next_chunk(&mut self, ctx: &mut <Self as Actor>::Context) {
4✔
106
        let state = std::mem::take(&mut self.state);
4✔
107

108
        self.state = match state {
4✔
109
            VectorWebsocketStreamHandlerState::Closed => {
110
                self.finished(ctx);
×
111
                return;
×
112
            }
113
            VectorWebsocketStreamHandlerState::Idle { mut stream } => {
4✔
114
                VectorWebsocketStreamHandlerState::Processing {
4✔
115
                    _fut: ctx.spawn(
4✔
116
                        wrap_future(async move {
4✔
117
                            let tile = stream.next().await;
4✔
118

119
                            (tile, stream)
4✔
120
                        })
4✔
121
                        .then(send_result),
4✔
122
                    ),
4✔
123
                }
4✔
124
            }
125
            VectorWebsocketStreamHandlerState::Processing { _fut: _ } => state,
×
126
        };
127
    }
4✔
128

129
    pub fn abort_processing(&mut self) {
1✔
130
        if let Some(abort_handle) = self.abort_handle.take() {
1✔
131
            abort_handle.abort();
1✔
132
        }
1✔
133
    }
1✔
134
}
135

136
fn send_result(
4✔
137
    (tile, stream): (Option<StreamResult>, ByteStream),
4✔
138
    actor: &mut VectorWebsocketStreamHandler,
4✔
139
    ctx: &mut <VectorWebsocketStreamHandler as Actor>::Context,
4✔
140
) -> futures::future::Ready<()> {
4✔
141
    // TODO: spawn thread instead of blocking and returning an ok future
142

143
    match tile {
3✔
144
        Some(Ok(tile)) => {
3✔
145
            const MESSAGE_MAX_SIZE: usize = 16 * 1024 * 1024; // 16 MB
3✔
146

3✔
147
            actor.state = VectorWebsocketStreamHandlerState::Idle { stream };
3✔
148

3✔
149
            // we can send the whole message at once if it is small enough, i.e. <= `MESSAGE_MAX_SIZE`
3✔
150
            if tile.len() <= MESSAGE_MAX_SIZE {
3✔
151
                ctx.binary(tile);
3✔
152
                return futures::future::ready(());
3✔
153
            }
×
154

×
155
            // limit message chunks to `MESSAGE_MAX_SIZE`
×
156
            let mut chunks = tile.chunks(MESSAGE_MAX_SIZE).enumerate();
×
157

158
            while let Some((i, chunk)) = chunks.next() {
×
159
                let chunk_bytes = chunk.to_vec().into();
×
160
                if i == 0 {
×
161
                    // first chunk
×
162
                    ctx.write_raw(ws::Message::Continuation(Item::FirstBinary(chunk_bytes)));
×
163
                } else if chunks.len() == 0 {
×
164
                    // last chunk
×
165
                    ctx.write_raw(ws::Message::Continuation(Item::Last(chunk_bytes)));
×
166
                } else {
×
167
                    ctx.write_raw(ws::Message::Continuation(Item::Continue(chunk_bytes)));
×
168
                }
×
169
            }
170
        }
171
        Some(Err(e)) => {
×
172
            // on error, send the error and close the connection
×
173
            actor.state = VectorWebsocketStreamHandlerState::Closed;
×
174
            ctx.close(Some(CloseReason {
×
175
                code: CloseCode::Error,
×
176
                description: Some(e.to_string()),
×
177
            }));
×
178
            actor.finished(ctx);
×
179
        }
×
180
        None => {
1✔
181
            // stream ended
1✔
182
            actor.state = VectorWebsocketStreamHandlerState::Closed;
1✔
183
            ctx.close(Some(CloseReason {
1✔
184
                code: CloseCode::Normal,
1✔
185
                description: None,
1✔
186
            }));
1✔
187
            actor.finished(ctx);
1✔
188
        }
1✔
189
    }
190

191
    futures::future::ready(())
1✔
192
}
4✔
193

194
#[cfg(test)]
195
mod tests {
196
    use super::*;
197
    use crate::contexts::{PostgresContext, PostgresSessionContext};
198
    use crate::ge_context;
199
    use crate::{contexts::SimpleApplicationContext, workflows::workflow::Workflow};
200
    use actix_http::error::PayloadError;
201
    use actix_web_actors::ws::WebsocketContext;
202
    use bytes::{Bytes, BytesMut};
203
    use futures::channel::mpsc::UnboundedSender;
204
    use geoengine_datatypes::primitives::ColumnSelection;
205
    use geoengine_datatypes::{
206
        collections::MultiPointCollection,
207
        primitives::{
208
            BoundingBox2D, CacheHint, DateTime, FeatureData, MultiPoint, SpatialResolution,
209
            TimeInterval,
210
        },
211
        util::arrow::arrow_ipc_file_to_record_batches,
212
    };
213
    use geoengine_operators::engine::ChunkByteSize;
214
    use geoengine_operators::{engine::TypedOperator, mock::MockFeatureCollectionSource};
215
    use tokio_postgres::NoTls;
216

217
    /// ensure that we get one chunk per input
218
    fn max_chunk_size() -> ChunkByteSize {
1✔
219
        usize::MAX.into()
1✔
220
    }
1✔
221

222
    #[ge_context::test(query_ctx_chunk_size = "max_chunk_size")]
3✔
223
    async fn test_websocket_stream(app_ctx: PostgresContext<NoTls>) {
1✔
224
        fn send_next(input_sender: &UnboundedSender<Result<Bytes, PayloadError>>) {
5✔
225
            let mut buf = BytesMut::new();
5✔
226
            actix_http::ws::Parser::write_message(
5✔
227
                &mut buf,
5✔
228
                "NEXT",
5✔
229
                actix_http::ws::OpCode::Text,
5✔
230
                true,
5✔
231
                true,
5✔
232
            );
5✔
233

5✔
234
            input_sender.unbounded_send(Ok(buf.into())).unwrap();
5✔
235
        }
5✔
236

1✔
237
        let collection = MultiPointCollection::from_data(
1✔
238
            MultiPoint::many(vec![(0.0, 0.1), (1.0, 1.1), (2.0, 3.1)]).unwrap(),
1✔
239
            vec![
1✔
240
                TimeInterval::new(
1✔
241
                    DateTime::new_utc(2014, 1, 1, 0, 0, 0),
1✔
242
                    DateTime::new_utc(2015, 1, 1, 0, 0, 0)
1✔
243
                )
1✔
244
                .unwrap();
1✔
245
                3
1✔
246
            ],
1✔
247
            [
1✔
248
                (
1✔
249
                    "foobar".to_string(),
1✔
250
                    FeatureData::NullableInt(vec![Some(0), None, Some(2)]),
1✔
251
                ),
1✔
252
                (
1✔
253
                    "strings".to_string(),
1✔
254
                    FeatureData::Text(vec!["a".to_string(), "b".to_string(), "c".to_string()]),
1✔
255
                ),
1✔
256
            ]
1✔
257
            .iter()
1✔
258
            .cloned()
1✔
259
            .collect(),
1✔
260
            CacheHint::default(),
1✔
261
        )
1✔
262
        .unwrap();
1✔
263

264
        let ctx = app_ctx.default_session_context().await.unwrap();
19✔
265

1✔
266
        let workflow = Workflow {
1✔
267
            operator: TypedOperator::Vector(
1✔
268
                MockFeatureCollectionSource::multiple(vec![
1✔
269
                    collection.clone(),
1✔
270
                    collection.clone(),
1✔
271
                    collection.clone(),
1✔
272
                ])
1✔
273
                .boxed(),
1✔
274
            ),
1✔
275
        };
1✔
276

1✔
277
        let query_rectangle = VectorQueryRectangle {
1✔
278
            spatial_bounds: BoundingBox2D::new_upper_left_lower_right(
1✔
279
                (-180., 90.).into(),
1✔
280
                (180., -90.).into(),
1✔
281
            )
1✔
282
            .unwrap(),
1✔
283
            time_interval: TimeInterval::new_instant(DateTime::new_utc(2014, 3, 1, 0, 0, 0))
1✔
284
                .unwrap(),
1✔
285
            spatial_resolution: SpatialResolution::one(),
1✔
286
            attributes: ColumnSelection::all(),
1✔
287
        };
1✔
288

289
        let handler = VectorWebsocketStreamHandler::new::<PostgresSessionContext<NoTls>>(
1✔
290
            workflow.operator.get_vector().unwrap(),
1✔
291
            query_rectangle,
1✔
292
            ctx.execution_context().unwrap(),
1✔
293
            ctx.query_context().unwrap(),
1✔
294
        )
1✔
295
        .await
×
296
        .unwrap();
1✔
297

1✔
298
        let (input_sender, input_receiver) = futures::channel::mpsc::unbounded();
1✔
299

1✔
300
        let mut websocket_context = WebsocketContext::create(handler, input_receiver);
1✔
301

302
        // 3 batches
303
        for _ in 0..3 {
4✔
304
            send_next(&input_sender);
3✔
305

306
            let bytes = websocket_context.next().await.unwrap().unwrap();
3✔
307

3✔
308
            let bytes = &bytes[4..]; // the first four bytes are WS op bytes
3✔
309

3✔
310
            let record_batches = arrow_ipc_file_to_record_batches(bytes).unwrap();
3✔
311
            assert_eq!(record_batches.len(), 1);
3✔
312
            let record_batch = record_batches.first().unwrap();
3✔
313
            let schema = record_batch.schema();
3✔
314

3✔
315
            assert_eq!(schema.metadata()["spatialReference"], "EPSG:4326");
3✔
316

317
            assert_eq!(record_batch.column_by_name("foobar").unwrap().len(), 3);
3✔
318
            assert_eq!(record_batch.column_by_name("strings").unwrap().len(), 3);
3✔
319
        }
320

321
        send_next(&input_sender);
1✔
322
        assert_eq!(websocket_context.next().await.unwrap().unwrap().len(), 4); // close frame
1✔
323

324
        send_next(&input_sender);
1✔
325
        assert!(websocket_context.next().await.is_none());
1✔
326
    }
1✔
327
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc