• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 16474557374

23 Jul 2025 03:10PM UTC coverage: 88.939%. First build
16474557374

Pull #1064

github

web-flow
Merge 915f15871 into 116e05267
Pull Request #1064: refactor(services): use actix-ws instead of actix-web-actors

329 of 375 new or added lines in 4 files covered. (87.73%)

111698 of 125590 relevant lines covered (88.94%)

80297.58 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

69.85
/services/src/workflows/websocket_stream.rs
1
use crate::{contexts::SessionContext, error::Result};
2
use actix_http::ws::{CloseCode, CloseReason};
3
use actix_ws::Item;
4
use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt, stream::BoxStream};
5
use geoengine_datatypes::{
6
    collections::FeatureCollectionIpc,
7
    primitives::{RasterQueryRectangle, VectorQueryRectangle},
8
    raster::raster_tile_2d_to_arrow_ipc_file,
9
};
10
use geoengine_operators::{
11
    call_on_generic_raster_processor, call_on_generic_vector_processor,
12
    engine::{
13
        QueryAbortTrigger, QueryContext, QueryProcessorExt, RasterOperator, VectorOperator,
14
        WorkflowOperatorPath,
15
    },
16
};
17
use tokio::{
18
    sync::mpsc::{Receiver, Sender, error::TrySendError},
19
    task::JoinHandle,
20
};
21

22
type ByteStream = BoxStream<'static, StreamResult>;
23
type StreamResult = Result<Vec<u8>>;
24

25
pub struct WebsocketStreamTask {
26
    _handle: JoinHandle<()>,
27
    compute_sender: Sender<()>,
28
    tile_receiver: Receiver<Option<Result<Vec<u8>>>>,
29
    abort_handle: QueryAbortTrigger,
30
}
31

32
impl WebsocketStreamTask {
33
    pub async fn new_raster<C: SessionContext>(
1✔
34
        raster_operator: Box<dyn RasterOperator>,
1✔
35
        query_rectangle: RasterQueryRectangle,
1✔
36
        execution_ctx: C::ExecutionContext,
1✔
37
        mut query_ctx: C::QueryContext,
1✔
38
    ) -> Result<Self> {
1✔
39
        let workflow_operator_path_root = WorkflowOperatorPath::initialize_root();
1✔
40

41
        let initialized_operator = raster_operator
1✔
42
            .initialize(workflow_operator_path_root, &execution_ctx)
1✔
43
            .await?;
1✔
44

45
        let spatial_reference = initialized_operator.result_descriptor().spatial_reference;
1✔
46

47
        let query_processor = initialized_operator.query_processor()?;
1✔
48

49
        let abort_handle = query_ctx.abort_trigger()?;
1✔
50

51
        let byte_stream = call_on_generic_raster_processor!(query_processor, p => {
1✔
52
            let tile_stream = p
1✔
53
                .query_into_owned_stream(query_rectangle, Box::new(query_ctx))
1✔
NEW
54
                .await?;
×
55

56
            tile_stream.and_then(
1✔
57
                move |tile| crate::util::spawn_blocking(
4✔
58
                    move || raster_tile_2d_to_arrow_ipc_file(tile, spatial_reference).map_err(Into::into)
4✔
59
                ).err_into().map(| r | r.and_then(std::convert::identity))
4✔
60
            ).boxed()
1✔
61
        });
62

63
        // TODO: think about buffering the stream?
64

65
        Ok(Self::new_from_byte_stream(
1✔
66
            byte_stream.map_err(Into::into).boxed(),
1✔
67
            abort_handle,
1✔
68
        ))
1✔
69
    }
1✔
70

71
    pub async fn new_vector<C: SessionContext>(
1✔
72
        vector_operator: Box<dyn VectorOperator>,
1✔
73
        query_rectangle: VectorQueryRectangle,
1✔
74
        execution_ctx: C::ExecutionContext,
1✔
75
        mut query_ctx: C::QueryContext,
1✔
76
    ) -> Result<Self> {
1✔
77
        let workflow_operator_path_root = WorkflowOperatorPath::initialize_root();
1✔
78

79
        let initialized_operator = vector_operator
1✔
80
            .initialize(workflow_operator_path_root, &execution_ctx)
1✔
81
            .await?;
1✔
82

83
        let spatial_reference = initialized_operator.result_descriptor().spatial_reference;
1✔
84

85
        let query_processor = initialized_operator.query_processor()?;
1✔
86

87
        let abort_handle = query_ctx.abort_trigger()?;
1✔
88

89
        let byte_stream = call_on_generic_vector_processor!(query_processor, p => {
1✔
NEW
90
            let batch_stream = p
×
NEW
91
                .query_into_owned_stream(query_rectangle, Box::new(query_ctx))
×
NEW
92
                .await?;
×
93

NEW
94
            batch_stream.and_then(
×
95
                move |batch| crate::util::spawn_blocking(
1✔
96
                    move || batch.to_arrow_ipc_file_bytes(spatial_reference).map_err(Into::into)
1✔
97
                ).err_into().map(| r | r.and_then(std::convert::identity))
1✔
NEW
98
            ).boxed()
×
99
        });
100

101
        // TODO: think about buffering the stream?
102

103
        Ok(Self::new_from_byte_stream(
1✔
104
            byte_stream.map_err(Into::into).boxed(),
1✔
105
            abort_handle,
1✔
106
        ))
1✔
107
    }
1✔
108

109
    fn new_from_byte_stream(mut byte_stream: ByteStream, abort_handle: QueryAbortTrigger) -> Self {
2✔
110
        let (compute_sender, mut compute_msgs) = tokio::sync::mpsc::channel(1);
2✔
111
        let (tile_sender, tile_msgs) = tokio::sync::mpsc::channel(1);
2✔
112

113
        let handle = crate::util::spawn(async move {
2✔
114
            while compute_msgs.recv().await.is_some() {
7✔
115
                let tile = byte_stream.next().await;
5✔
116
                tile_sender.send(tile).await.unwrap_or_else(|e| {
5✔
NEW
117
                    tracing::error!("Failed to send tile: {}", e);
×
NEW
118
                });
×
119
            }
120
        });
2✔
121

122
        Self {
2✔
123
            _handle: handle,
2✔
124
            compute_sender,
2✔
125
            tile_receiver: tile_msgs,
2✔
126
            abort_handle,
2✔
127
        }
2✔
128
    }
2✔
129

130
    pub fn abort_processing(self) {
2✔
131
        self.abort_handle.abort();
2✔
132
    }
2✔
133

134
    pub fn compute_next_tile(&mut self) {
5✔
135
        match self.compute_sender.try_send(()) {
5✔
136
            Ok(()) | Err(TrySendError::Full(())) => { /* ok or drop message */ }
5✔
137
            Err(TrySendError::Closed(())) => {
NEW
138
                debug_assert!(
×
NEW
139
                    false, // must not happen
×
NEW
140
                    "Compute sender is closed, cannot send next tile request"
×
141
                );
142
            }
143
        }
144
    }
5✔
145

146
    pub async fn receive_tile(&mut self) -> Option<Result<Vec<u8>>> {
12✔
147
        self.tile_receiver.recv().await?
12✔
148
    }
5✔
149
}
150

151
/// Sends the result of a stream to the websocket session.
152
///
153
/// Return `Some` if the tile was sent successfully, `None` if the stream ended,
154
///
155
pub async fn send_websocket_message(
5✔
156
    tile: Option<StreamResult>,
5✔
157
    mut ctx: actix_ws::Session,
5✔
158
) -> Option<()> {
5✔
159
    match tile {
5✔
160
        Some(Ok(bytes)) => {
5✔
161
            const MESSAGE_MAX_SIZE: usize = 16 * 1024 * 1024; // 16 MB
162

163
            // we can send the whole message at once if it is small enough, i.e. <= `MESSAGE_MAX_SIZE`
164
            if bytes.len() <= MESSAGE_MAX_SIZE {
5✔
165
                ctx.binary(bytes).await.ok()?;
5✔
166
                return Some(());
5✔
NEW
167
            }
×
168

169
            // limit message chunks to `MESSAGE_MAX_SIZE`
NEW
170
            let mut chunks = bytes.chunks(MESSAGE_MAX_SIZE).enumerate();
×
171

NEW
172
            while let Some((i, chunk)) = chunks.next() {
×
NEW
173
                let chunk_bytes = chunk.to_vec().into();
×
NEW
174
                if i == 0 {
×
175
                    // first chunk
NEW
176
                    ctx.continuation(Item::FirstBinary(chunk_bytes))
×
NEW
177
                        .await
×
NEW
178
                        .ok()?;
×
NEW
179
                } else if chunks.len() == 0 {
×
180
                    // last chunk
NEW
181
                    ctx.continuation(Item::Last(chunk_bytes)).await.ok()?;
×
182
                } else {
NEW
183
                    ctx.continuation(Item::Continue(chunk_bytes)).await.ok()?;
×
184
                }
185
            }
186
        }
NEW
187
        Some(Err(e)) => {
×
188
            // on error, send the error and close the connection
NEW
189
            ctx.close(Some(CloseReason {
×
NEW
190
                code: CloseCode::Error,
×
NEW
191
                description: Some(e.to_string()),
×
NEW
192
            }))
×
NEW
193
            .await
×
NEW
194
            .ok()?;
×
NEW
195
            return None;
×
196
        }
197
        None => {
198
            // stream ended
NEW
199
            ctx.close(Some(CloseReason {
×
NEW
200
                code: CloseCode::Normal,
×
NEW
201
                description: None,
×
NEW
202
            }))
×
NEW
203
            .await
×
NEW
204
            .ok()?;
×
NEW
205
            return None;
×
206
        }
207
    }
208

NEW
209
    Some(())
×
210
}
5✔
211

212
pub async fn handle_websocket_message(
7✔
213
    msg: actix_ws::Message,
7✔
214
    stream_task: &mut WebsocketStreamTask,
7✔
215
    session: &mut actix_ws::Session,
7✔
216
) -> Option<()> {
7✔
217
    match msg {
5✔
NEW
218
        actix_ws::Message::Ping(bytes) => {
×
NEW
219
            session.pong(&bytes).await.ok()?;
×
220
        }
221
        actix_ws::Message::Text(msg) if &msg == "NEXT" => {
5✔
222
            stream_task.compute_next_tile();
5✔
223
        }
5✔
224
        actix_ws::Message::Close(_) => {
225
            return None; // close the session
2✔
226
        }
NEW
227
        _ => { /* ignore other messages */ }
×
228
    }
229

230
    Some(())
5✔
231
}
7✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc