• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 5751943360

03 Aug 2023 02:19PM UTC coverage: 89.422% (+0.4%) from 88.974%
5751943360

push

github

web-flow
Merge pull request #840 from geo-engine/remove_in_memory

Remove in memory contexts and dbs

5338 of 5338 new or added lines in 37 files covered. (100.0%)

103772 of 116048 relevant lines covered (89.42%)

62390.21 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.26
/services/src/workflows/vector_stream.rs
1
use crate::{contexts::SessionContext, error::Result};
2
use actix::{
3
    fut::wrap_future, Actor, ActorContext, ActorFutureExt, AsyncContext, SpawnHandle, StreamHandler,
4
};
5
use actix_http::ws::{CloseCode, CloseReason, Item};
6
use actix_web_actors::ws;
7
use futures::{stream::BoxStream, FutureExt, StreamExt, TryFutureExt, TryStreamExt};
8
use geoengine_datatypes::{collections::FeatureCollectionIpc, primitives::VectorQueryRectangle};
9
use geoengine_operators::{
10
    call_on_generic_vector_processor,
11
    engine::{
12
        QueryAbortTrigger, QueryContext, QueryProcessorExt, VectorOperator, WorkflowOperatorPath,
13
    },
14
};
15

16
pub struct VectorWebsocketStreamHandler {
17
    state: VectorWebsocketStreamHandlerState,
18
    abort_handle: Option<QueryAbortTrigger>,
19
}
20

21
type ByteStream = BoxStream<'static, StreamResult>;
22
type StreamResult = Result<Vec<u8>>;
23

24
enum VectorWebsocketStreamHandlerState {
25
    Closed,
26
    Idle { stream: ByteStream },
27
    Processing { _fut: SpawnHandle },
28
}
29

30
impl Default for VectorWebsocketStreamHandlerState {
31
    fn default() -> Self {
4✔
32
        Self::Closed
4✔
33
    }
4✔
34
}
35

36
impl Actor for VectorWebsocketStreamHandler {
37
    type Context = ws::WebsocketContext<Self>;
38
}
39

40
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for VectorWebsocketStreamHandler {
41
    fn started(&mut self, _ctx: &mut Self::Context) {}
1✔
42

43
    fn finished(&mut self, ctx: &mut Self::Context) {
1✔
44
        ctx.stop();
1✔
45

1✔
46
        self.abort_processing();
1✔
47
    }
1✔
48

49
    fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
50
        match msg {
4✔
51
            Ok(ws::Message::Ping(msg)) => ctx.pong(&msg),
×
52
            Ok(ws::Message::Text(text)) if &text == "NEXT" => self.next_chunk(ctx),
4✔
53
            Ok(ws::Message::Close(reason)) => {
×
54
                ctx.close(reason);
×
55

×
56
                self.finished(ctx);
×
57
            }
×
58
            // for now, we ignore all other messages
59
            _ => (),
×
60
        }
61
    }
4✔
62
}
63

64
impl VectorWebsocketStreamHandler {
65
    pub async fn new<C: SessionContext>(
1✔
66
        vector_operator: Box<dyn VectorOperator>,
1✔
67
        query_rectangle: VectorQueryRectangle,
1✔
68
        execution_ctx: C::ExecutionContext,
1✔
69
        mut query_ctx: C::QueryContext,
1✔
70
    ) -> Result<Self> {
1✔
71
        let workflow_operator_path_root = WorkflowOperatorPath::initialize_root();
1✔
72

73
        let initialized_operator = vector_operator
1✔
74
            .initialize(workflow_operator_path_root, &execution_ctx)
1✔
75
            .await?;
×
76

77
        let spatial_reference = initialized_operator.result_descriptor().spatial_reference;
1✔
78

79
        let query_processor = initialized_operator.query_processor()?;
1✔
80

81
        let abort_handle = query_ctx.abort_trigger().ok();
1✔
82

83
        let byte_stream = call_on_generic_vector_processor!(query_processor, p => {
1✔
84
            let batch_stream = p
1✔
85
                .query_into_owned_stream(query_rectangle, Box::new(query_ctx))
1✔
86
                .await?;
×
87

88
            batch_stream.and_then(
1✔
89
                move |batch| crate::util::spawn_blocking(
3✔
90
                    move || batch.to_arrow_ipc_file_bytes(spatial_reference).map_err(Into::into)
3✔
91
                ).err_into().map(| r | r.and_then(std::convert::identity))
3✔
92
            ).boxed()
1✔
93
        });
94

95
        // TODO: think about buffering the stream?
96

97
        Ok(Self {
1✔
98
            state: VectorWebsocketStreamHandlerState::Idle {
1✔
99
                stream: byte_stream.map_err(Into::into).boxed(),
1✔
100
            },
1✔
101
            abort_handle,
1✔
102
        })
1✔
103
    }
1✔
104

105
    pub fn next_chunk(&mut self, ctx: &mut <Self as Actor>::Context) {
4✔
106
        let state = std::mem::take(&mut self.state);
4✔
107

108
        self.state = match state {
4✔
109
            VectorWebsocketStreamHandlerState::Closed => {
110
                self.finished(ctx);
×
111
                return;
×
112
            }
113
            VectorWebsocketStreamHandlerState::Idle { mut stream } => {
4✔
114
                VectorWebsocketStreamHandlerState::Processing {
4✔
115
                    _fut: ctx.spawn(
4✔
116
                        wrap_future(async move {
4✔
117
                            let tile = stream.next().await;
4✔
118

119
                            (tile, stream)
4✔
120
                        })
4✔
121
                        .then(send_result),
4✔
122
                    ),
4✔
123
                }
4✔
124
            }
125
            VectorWebsocketStreamHandlerState::Processing { _fut: _ } => state,
×
126
        };
127
    }
4✔
128

129
    pub fn abort_processing(&mut self) {
130
        if let Some(abort_handle) = self.abort_handle.take() {
1✔
131
            abort_handle.abort();
1✔
132
        }
1✔
133
    }
1✔
134
}
135

136
fn send_result(
137
    (tile, stream): (Option<StreamResult>, ByteStream),
138
    actor: &mut VectorWebsocketStreamHandler,
139
    ctx: &mut <VectorWebsocketStreamHandler as Actor>::Context,
140
) -> futures::future::Ready<()> {
141
    // TODO: spawn thread instead of blocking and returning an ok future
142

143
    match tile {
3✔
144
        Some(Ok(tile)) => {
3✔
145
            const MESSAGE_MAX_SIZE: usize = 16 * 1024 * 1024; // 16 MB
3✔
146

3✔
147
            actor.state = VectorWebsocketStreamHandlerState::Idle { stream };
3✔
148

3✔
149
            // we can send the whole message at once if it is small enough, i.e. <= `MESSAGE_MAX_SIZE`
3✔
150
            if tile.len() <= MESSAGE_MAX_SIZE {
3✔
151
                ctx.binary(tile);
3✔
152
                return futures::future::ready(());
3✔
153
            }
×
154

×
155
            // limit message chunks to `MESSAGE_MAX_SIZE`
×
156
            let mut chunks = tile.chunks(MESSAGE_MAX_SIZE).enumerate();
×
157

158
            while let Some((i, chunk)) = chunks.next() {
×
159
                let chunk_bytes = chunk.to_vec().into();
×
160
                if i == 0 {
×
161
                    // first chunk
×
162
                    ctx.write_raw(ws::Message::Continuation(Item::FirstBinary(chunk_bytes)));
×
163
                } else if chunks.len() == 0 {
×
164
                    // last chunk
×
165
                    ctx.write_raw(ws::Message::Continuation(Item::Last(chunk_bytes)));
×
166
                } else {
×
167
                    ctx.write_raw(ws::Message::Continuation(Item::Continue(chunk_bytes)));
×
168
                }
×
169
            }
170
        }
171
        Some(Err(e)) => {
×
172
            // on error, send the error and close the connection
×
173
            actor.state = VectorWebsocketStreamHandlerState::Closed;
×
174
            ctx.close(Some(CloseReason {
×
175
                code: CloseCode::Error,
×
176
                description: Some(e.to_string()),
×
177
            }));
×
178
            actor.finished(ctx);
×
179
        }
×
180
        None => {
1✔
181
            // stream ended
1✔
182
            actor.state = VectorWebsocketStreamHandlerState::Closed;
1✔
183
            ctx.close(Some(CloseReason {
1✔
184
                code: CloseCode::Normal,
1✔
185
                description: None,
1✔
186
            }));
1✔
187
            actor.finished(ctx);
1✔
188
        }
1✔
189
    }
190

191
    futures::future::ready(())
1✔
192
}
4✔
193

194
#[cfg(test)]
195
mod tests {
196
    use super::*;
197
    use crate::contexts::PostgresSessionContext;
198
    use crate::util::tests::with_temp_context_from_spec;
199
    use crate::{contexts::SimpleApplicationContext, workflows::workflow::Workflow};
200
    use actix_http::error::PayloadError;
201
    use actix_web_actors::ws::WebsocketContext;
202
    use bytes::{Bytes, BytesMut};
203
    use futures::channel::mpsc::UnboundedSender;
204
    use geoengine_datatypes::{
205
        collections::MultiPointCollection,
206
        primitives::{
207
            BoundingBox2D, CacheHint, DateTime, FeatureData, MultiPoint, SpatialResolution,
208
            TimeInterval,
209
        },
210
        util::{arrow::arrow_ipc_file_to_record_batches, test::TestDefault},
211
    };
212
    use geoengine_operators::{engine::TypedOperator, mock::MockFeatureCollectionSource};
213
    use tokio_postgres::NoTls;
214

215
    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1✔
216
    async fn test_websocket_stream() {
1✔
217
        fn send_next(input_sender: &UnboundedSender<Result<Bytes, PayloadError>>) {
5✔
218
            let mut buf = BytesMut::new();
5✔
219
            actix_http::ws::Parser::write_message(
5✔
220
                &mut buf,
5✔
221
                "NEXT",
5✔
222
                actix_http::ws::OpCode::Text,
5✔
223
                true,
5✔
224
                true,
5✔
225
            );
5✔
226

5✔
227
            input_sender.unbounded_send(Ok(buf.into())).unwrap();
5✔
228
        }
5✔
229

1✔
230
        with_temp_context_from_spec(
1✔
231
            TestDefault::test_default(),
1✔
232
            usize::MAX.into(), // ensure that we get one chunk per input
1✔
233
            |app_ctx, _| async move {
1✔
234
                let collection = MultiPointCollection::from_data(
1✔
235
                    MultiPoint::many(vec![(0.0, 0.1), (1.0, 1.1), (2.0, 3.1)]).unwrap(),
1✔
236
                    vec![
1✔
237
                        TimeInterval::new(
1✔
238
                            DateTime::new_utc(2014, 1, 1, 0, 0, 0),
1✔
239
                            DateTime::new_utc(2015, 1, 1, 0, 0, 0)
1✔
240
                        )
1✔
241
                        .unwrap();
1✔
242
                        3
1✔
243
                    ],
1✔
244
                    [
1✔
245
                        (
1✔
246
                            "foobar".to_string(),
1✔
247
                            FeatureData::NullableInt(vec![Some(0), None, Some(2)]),
1✔
248
                        ),
1✔
249
                        (
1✔
250
                            "strings".to_string(),
1✔
251
                            FeatureData::Text(vec![
1✔
252
                                "a".to_string(),
1✔
253
                                "b".to_string(),
1✔
254
                                "c".to_string(),
1✔
255
                            ]),
1✔
256
                        ),
1✔
257
                    ]
1✔
258
                    .iter()
1✔
259
                    .cloned()
1✔
260
                    .collect(),
1✔
261
                    CacheHint::default(),
1✔
262
                )
1✔
263
                .unwrap();
1✔
264

265
                let ctx = app_ctx.default_session_context().await.unwrap();
19✔
266

1✔
267
                let workflow = Workflow {
1✔
268
                    operator: TypedOperator::Vector(
1✔
269
                        MockFeatureCollectionSource::multiple(vec![
1✔
270
                            collection.clone(),
1✔
271
                            collection.clone(),
1✔
272
                            collection.clone(),
1✔
273
                        ])
1✔
274
                        .boxed(),
1✔
275
                    ),
1✔
276
                };
1✔
277

1✔
278
                let query_rectangle = VectorQueryRectangle {
1✔
279
                    spatial_bounds: BoundingBox2D::new_upper_left_lower_right(
1✔
280
                        (-180., 90.).into(),
1✔
281
                        (180., -90.).into(),
1✔
282
                    )
1✔
283
                    .unwrap(),
1✔
284
                    time_interval: TimeInterval::new_instant(DateTime::new_utc(
1✔
285
                        2014, 3, 1, 0, 0, 0,
1✔
286
                    ))
1✔
287
                    .unwrap(),
1✔
288
                    spatial_resolution: SpatialResolution::one(),
1✔
289
                };
1✔
290

291
                let handler = VectorWebsocketStreamHandler::new::<PostgresSessionContext<NoTls>>(
1✔
292
                    workflow.operator.get_vector().unwrap(),
1✔
293
                    query_rectangle,
1✔
294
                    ctx.execution_context().unwrap(),
1✔
295
                    ctx.query_context().unwrap(),
1✔
296
                )
1✔
297
                .await
×
298
                .unwrap();
1✔
299

1✔
300
                let (input_sender, input_receiver) = futures::channel::mpsc::unbounded();
1✔
301

1✔
302
                let mut websocket_context = WebsocketContext::create(handler, input_receiver);
1✔
303

304
                // 3 batches
305
                for _ in 0..3 {
4✔
306
                    send_next(&input_sender);
3✔
307

308
                    let bytes = websocket_context.next().await.unwrap().unwrap();
3✔
309

3✔
310
                    let bytes = &bytes[4..]; // the first four bytes are WS op bytes
3✔
311

3✔
312
                    let record_batches = arrow_ipc_file_to_record_batches(bytes).unwrap();
3✔
313
                    assert_eq!(record_batches.len(), 1);
3✔
314
                    let record_batch = record_batches.first().unwrap();
3✔
315
                    let schema = record_batch.schema();
3✔
316

3✔
317
                    assert_eq!(schema.metadata()["spatialReference"], "EPSG:4326");
3✔
318

319
                    assert_eq!(record_batch.column_by_name("foobar").unwrap().len(), 3);
3✔
320
                    assert_eq!(record_batch.column_by_name("strings").unwrap().len(), 3);
3✔
321
                }
322

323
                send_next(&input_sender);
1✔
324
                assert_eq!(websocket_context.next().await.unwrap().unwrap().len(), 4); // close frame
1✔
325

326
                send_next(&input_sender);
1✔
327
                assert!(websocket_context.next().await.is_none());
1✔
328
            },
1✔
329
        )
1✔
330
        .await;
12✔
331
    }
332
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc