• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 4286247254

pending completion
4286247254

push

github

GitHub
Merge #747

585 of 585 new or added lines in 10 files covered. (100.0%)

89816 of 102556 relevant lines covered (87.58%)

77682.58 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.86
/services/src/workflows/vector_stream.rs
1
use crate::{contexts::Context, error::Result};
2
use actix::{
3
    fut::wrap_future, Actor, ActorContext, ActorFutureExt, AsyncContext, SpawnHandle, StreamHandler,
4
};
5
use actix_http::ws::{CloseCode, CloseReason, Item};
6
use actix_web_actors::ws;
7
use futures::{stream::BoxStream, FutureExt, StreamExt, TryFutureExt, TryStreamExt};
8
use geoengine_datatypes::{collections::FeatureCollectionIpc, primitives::VectorQueryRectangle};
9
use geoengine_operators::{
10
    call_on_generic_vector_processor,
11
    engine::{QueryAbortTrigger, QueryContext, QueryProcessorExt, VectorOperator},
12
};
13

14
pub struct VectorWebsocketStreamHandler {
15
    state: VectorWebsocketStreamHandlerState,
16
    abort_handle: Option<QueryAbortTrigger>,
17
}
18

19
type ByteStream = BoxStream<'static, StreamResult>;
20
type StreamResult = Result<Vec<u8>>;
21

22
enum VectorWebsocketStreamHandlerState {
23
    Closed,
24
    Idle { stream: ByteStream },
25
    Processing { _fut: SpawnHandle },
26
}
27

28
impl Default for VectorWebsocketStreamHandlerState {
29
    fn default() -> Self {
4✔
30
        Self::Closed
4✔
31
    }
4✔
32
}
33

34
impl Actor for VectorWebsocketStreamHandler {
35
    type Context = ws::WebsocketContext<Self>;
36
}
37

38
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for VectorWebsocketStreamHandler {
39
    fn started(&mut self, _ctx: &mut Self::Context) {}
1✔
40

41
    fn finished(&mut self, ctx: &mut Self::Context) {
1✔
42
        ctx.stop();
1✔
43

1✔
44
        self.abort_processing();
1✔
45
    }
1✔
46

47
    fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
48
        match msg {
4✔
49
            Ok(ws::Message::Ping(msg)) => ctx.pong(&msg),
×
50
            Ok(ws::Message::Text(text)) if &text == "NEXT" => self.next_chunk(ctx),
4✔
51
            Ok(ws::Message::Close(reason)) => {
×
52
                ctx.close(reason);
×
53

×
54
                self.finished(ctx);
×
55
            }
×
56
            // for now, we ignore all other messages
57
            _ => (),
×
58
        }
59
    }
4✔
60
}
61

62
impl VectorWebsocketStreamHandler {
63
    pub async fn new<C: Context>(
1✔
64
        vector_operator: Box<dyn VectorOperator>,
1✔
65
        query_rectangle: VectorQueryRectangle,
1✔
66
        execution_ctx: C::ExecutionContext,
1✔
67
        mut query_ctx: C::QueryContext,
1✔
68
    ) -> Result<Self> {
1✔
69
        let initialized_operator = vector_operator.initialize(&execution_ctx).await?;
1✔
70

71
        let spatial_reference = initialized_operator.result_descriptor().spatial_reference;
1✔
72

73
        let query_processor = initialized_operator.query_processor()?;
1✔
74

75
        let abort_handle = query_ctx.abort_trigger().ok();
1✔
76

77
        let byte_stream = call_on_generic_vector_processor!(query_processor, p => {
1✔
78
            let batch_stream = p
1✔
79
                .query_into_owned_stream(query_rectangle, Box::new(query_ctx))
1✔
80
                .await?;
×
81

82
            batch_stream.and_then(
1✔
83
                move |batch| crate::util::spawn_blocking(
3✔
84
                    move || batch.to_arrow_ipc_file_bytes(spatial_reference).map_err(Into::into)
3✔
85
                ).err_into().map(| r | r.and_then(std::convert::identity))
3✔
86
            ).boxed()
1✔
87
        });
88

89
        // TODO: think about buffering the stream?
90

91
        Ok(Self {
1✔
92
            state: VectorWebsocketStreamHandlerState::Idle {
1✔
93
                stream: byte_stream.map_err(Into::into).boxed(),
1✔
94
            },
1✔
95
            abort_handle,
1✔
96
        })
1✔
97
    }
1✔
98

99
    pub fn next_chunk(&mut self, ctx: &mut <Self as Actor>::Context) {
4✔
100
        let state = std::mem::take(&mut self.state);
4✔
101

4✔
102
        self.state = match state {
4✔
103
            VectorWebsocketStreamHandlerState::Closed => {
104
                self.finished(ctx);
×
105
                return;
×
106
            }
107
            VectorWebsocketStreamHandlerState::Idle { mut stream } => {
4✔
108
                VectorWebsocketStreamHandlerState::Processing {
4✔
109
                    _fut: ctx.spawn(
4✔
110
                        wrap_future(async move {
4✔
111
                            let tile = stream.next().await;
5✔
112

113
                            (tile, stream)
4✔
114
                        })
4✔
115
                        .then(send_result),
4✔
116
                    ),
4✔
117
                }
4✔
118
            }
119
            VectorWebsocketStreamHandlerState::Processing { _fut: _ } => state,
×
120
        };
121
    }
4✔
122

123
    pub fn abort_processing(&mut self) {
124
        if let Some(abort_handle) = self.abort_handle.take() {
1✔
125
            abort_handle.abort();
1✔
126
        }
1✔
127
    }
1✔
128
}
129

130
fn send_result(
131
    (tile, stream): (Option<StreamResult>, ByteStream),
132
    actor: &mut VectorWebsocketStreamHandler,
133
    ctx: &mut <VectorWebsocketStreamHandler as Actor>::Context,
134
) -> futures::future::Ready<()> {
135
    // TODO: spawn thread instead of blocking and returning an ok future
136

137
    match tile {
3✔
138
        Some(Ok(tile)) => {
3✔
139
            const MESSAGE_MAX_SIZE: usize = 16 * 1024 * 1024; // 16 MB
3✔
140

3✔
141
            actor.state = VectorWebsocketStreamHandlerState::Idle { stream };
3✔
142

3✔
143
            // we can send the whole message at once if it is small enough, i.e. <= `MESSAGE_MAX_SIZE`
3✔
144
            if tile.len() <= MESSAGE_MAX_SIZE {
3✔
145
                ctx.binary(tile);
3✔
146
                return futures::future::ready(());
3✔
147
            }
×
148

×
149
            // limit message chunks to `MESSAGE_MAX_SIZE`
×
150
            let mut chunks = tile.chunks(MESSAGE_MAX_SIZE).enumerate();
×
151

152
            while let Some((i, chunk)) = chunks.next() {
×
153
                let chunk_bytes = chunk.to_vec().into();
×
154
                if i == 0 {
×
155
                    // first chunk
×
156
                    ctx.write_raw(ws::Message::Continuation(Item::FirstBinary(chunk_bytes)));
×
157
                } else if chunks.len() == 0 {
×
158
                    // last chunk
×
159
                    ctx.write_raw(ws::Message::Continuation(Item::Last(chunk_bytes)));
×
160
                } else {
×
161
                    ctx.write_raw(ws::Message::Continuation(Item::Continue(chunk_bytes)));
×
162
                }
×
163
            }
164
        }
165
        Some(Err(e)) => {
×
166
            // on error, send the error and close the connection
×
167
            actor.state = VectorWebsocketStreamHandlerState::Closed;
×
168
            ctx.close(Some(CloseReason {
×
169
                code: CloseCode::Error,
×
170
                description: Some(e.to_string()),
×
171
            }));
×
172
            actor.finished(ctx);
×
173
        }
×
174
        None => {
1✔
175
            // stream ended
1✔
176
            actor.state = VectorWebsocketStreamHandlerState::Closed;
1✔
177
            ctx.close(Some(CloseReason {
1✔
178
                code: CloseCode::Normal,
1✔
179
                description: None,
1✔
180
            }));
1✔
181
            actor.finished(ctx);
1✔
182
        }
1✔
183
    }
184

185
    futures::future::ready(())
1✔
186
}
4✔
187

188
#[cfg(test)]
189
mod tests {
190
    use super::*;
191
    use crate::{
192
        contexts::{InMemoryContext, SimpleContext},
193
        workflows::workflow::Workflow,
194
    };
195
    use actix_http::error::PayloadError;
196
    use actix_web_actors::ws::WebsocketContext;
197
    use bytes::{Bytes, BytesMut};
198
    use futures::channel::mpsc::UnboundedSender;
199
    use geoengine_datatypes::{
200
        collections::MultiPointCollection,
201
        primitives::{
202
            BoundingBox2D, DateTime, FeatureData, MultiPoint, SpatialResolution, TimeInterval,
203
        },
204
        util::{arrow::arrow_ipc_file_to_record_batches, test::TestDefault},
205
    };
206
    use geoengine_operators::{engine::TypedOperator, mock::MockFeatureCollectionSource};
207

208
    #[tokio::test]
1✔
209
    async fn test_websocket_stream() {
1✔
210
        fn send_next(input_sender: &UnboundedSender<Result<Bytes, PayloadError>>) {
5✔
211
            let mut buf = BytesMut::new();
5✔
212
            actix_http::ws::Parser::write_message(
5✔
213
                &mut buf,
5✔
214
                "NEXT",
5✔
215
                actix_http::ws::OpCode::Text,
5✔
216
                true,
5✔
217
                true,
5✔
218
            );
5✔
219

5✔
220
            input_sender.unbounded_send(Ok(buf.into())).unwrap();
5✔
221
        }
5✔
222

1✔
223
        let collection = MultiPointCollection::from_data(
1✔
224
            MultiPoint::many(vec![(0.0, 0.1), (1.0, 1.1), (2.0, 3.1)]).unwrap(),
1✔
225
            vec![
1✔
226
                TimeInterval::new(
1✔
227
                    DateTime::new_utc(2014, 1, 1, 0, 0, 0),
1✔
228
                    DateTime::new_utc(2015, 1, 1, 0, 0, 0)
1✔
229
                )
1✔
230
                .unwrap();
1✔
231
                3
1✔
232
            ],
1✔
233
            [
1✔
234
                (
1✔
235
                    "foobar".to_string(),
1✔
236
                    FeatureData::NullableInt(vec![Some(0), None, Some(2)]),
1✔
237
                ),
1✔
238
                (
1✔
239
                    "strings".to_string(),
1✔
240
                    FeatureData::Text(vec!["a".to_string(), "b".to_string(), "c".to_string()]),
1✔
241
                ),
1✔
242
            ]
1✔
243
            .iter()
1✔
244
            .cloned()
1✔
245
            .collect(),
1✔
246
        )
1✔
247
        .unwrap();
1✔
248

1✔
249
        let ctx = InMemoryContext::new_with_context_spec(
1✔
250
            TestDefault::test_default(),
1✔
251
            usize::MAX.into(), // ensure that we get one chunk per input
1✔
252
        );
1✔
253
        let session = ctx.default_session_ref().await.clone();
1✔
254

1✔
255
        let workflow = Workflow {
1✔
256
            operator: TypedOperator::Vector(
1✔
257
                MockFeatureCollectionSource::multiple(vec![
1✔
258
                    collection.clone(),
1✔
259
                    collection.clone(),
1✔
260
                    collection.clone(),
1✔
261
                ])
1✔
262
                .boxed(),
1✔
263
            ),
1✔
264
        };
1✔
265

1✔
266
        let query_rectangle = VectorQueryRectangle {
1✔
267
            spatial_bounds: BoundingBox2D::new_upper_left_lower_right(
1✔
268
                (-180., 90.).into(),
1✔
269
                (180., -90.).into(),
1✔
270
            )
1✔
271
            .unwrap(),
1✔
272
            time_interval: TimeInterval::new_instant(DateTime::new_utc(2014, 3, 1, 0, 0, 0))
1✔
273
                .unwrap(),
1✔
274
            spatial_resolution: SpatialResolution::one(),
1✔
275
        };
1✔
276

277
        let handler = VectorWebsocketStreamHandler::new::<InMemoryContext>(
1✔
278
            workflow.operator.get_vector().unwrap(),
1✔
279
            query_rectangle,
1✔
280
            ctx.execution_context(session.clone()).unwrap(),
1✔
281
            ctx.query_context(session).unwrap(),
1✔
282
        )
1✔
283
        .await
×
284
        .unwrap();
1✔
285

1✔
286
        let (input_sender, input_receiver) = futures::channel::mpsc::unbounded();
1✔
287

1✔
288
        let mut websocket_context = WebsocketContext::create(handler, input_receiver);
1✔
289

290
        // 3 batches
291
        for _ in 0..3 {
4✔
292
            send_next(&input_sender);
3✔
293

294
            let bytes = websocket_context.next().await.unwrap().unwrap();
5✔
295

3✔
296
            let bytes = &bytes[4..]; // the first four bytes are WS op bytes
3✔
297

3✔
298
            let record_batches = arrow_ipc_file_to_record_batches(bytes).unwrap();
3✔
299
            assert_eq!(record_batches.len(), 1);
3✔
300
            let record_batch = record_batches.first().unwrap();
3✔
301
            let schema = record_batch.schema();
3✔
302

3✔
303
            assert_eq!(schema.metadata()["spatialReference"], "EPSG:4326");
3✔
304

305
            assert_eq!(record_batch.column_by_name("foobar").unwrap().len(), 3);
3✔
306
            assert_eq!(record_batch.column_by_name("strings").unwrap().len(), 3);
3✔
307
        }
308

309
        send_next(&input_sender);
1✔
310
        assert_eq!(websocket_context.next().await.unwrap().unwrap().len(), 4); // close frame
1✔
311

312
        send_next(&input_sender);
1✔
313
        assert!(websocket_context.next().await.is_none());
1✔
314
    }
315
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc