• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

OISF / suricata / 23350122333

20 Mar 2026 03:33PM UTC coverage: 76.492% (-2.8%) from 79.315%
23350122333

Pull #15053

github

web-flow
Merge f5bf69f97 into 6587e363a
Pull Request #15053: Flow queue/v3

113 of 129 new or added lines in 9 files covered. (87.6%)

9534 existing lines in 453 files now uncovered.

256601 of 335461 relevant lines covered (76.49%)

4680806.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.09
/rust/src/websocket/websocket.rs
1
/* Copyright (C) 2023-2025 Open Information Security Foundation
2
 *
3
 * You can copy, redistribute or modify this Program under the terms of
4
 * the GNU General Public License version 2 as published by the Free
5
 * Software Foundation.
6
 *
7
 * This program is distributed in the hope that it will be useful,
8
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10
 * GNU General Public License for more details.
11
 *
12
 * You should have received a copy of the GNU General Public License
13
 * version 2 along with this program; if not, write to the Free Software
14
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15
 * 02110-1301, USA.
16
 */
17

18
use super::parser;
19
use crate::applayer::{self, *};
20
use crate::conf::conf_get;
21
use crate::core::{
22
    sc_app_layer_parser_trigger_raw_stream_inspection, ALPROTO_FAILED, ALPROTO_UNKNOWN, IPPROTO_TCP,
23
};
24
use crate::direction::Direction;
25
use crate::flow::Flow;
26
use crate::frames::Frame;
27

28
use nom8 as nom;
29
use nom8::Needed;
30

31
use flate2::Decompress;
32
use flate2::FlushDecompress;
33
use suricata_sys::sys::{
34
    AppLayerParserState, AppProto, SCAppLayerParserConfParserEnabled,
35
    SCAppLayerParserRegisterLogger, SCAppLayerProtoDetectConfProtoDetectionEnabled,
36
};
37

38
use std;
39
use std::collections::VecDeque;
40
use std::ffi::CString;
41
use std::os::raw::{c_char, c_int, c_void};
42

43
pub(super) static mut ALPROTO_WEBSOCKET: AppProto = ALPROTO_UNKNOWN;
44

45
static mut WEBSOCKET_MAX_PAYLOAD_SIZE: u32 = 0xFFFF;
46

47
const WEBSOCKET_DECOMPRESS_BUF_SIZE: usize = 8192;
48

49
#[derive(AppLayerFrameType)]
50
pub enum WebSocketFrameType {
51
    Header,
52
    Pdu,
53
    Data,
54
}
55

UNCOV
56
#[derive(AppLayerEvent)]
×
57
pub enum WebSocketEvent {
58
    SkipEndOfPayload,
59
    ReassemblyLimitReached,
60
}
61

62
#[derive(Default)]
63
pub struct WebSocketTransaction {
64
    tx_id: u64,
65
    pub pdu: parser::WebSocketPdu,
66
    tx_data: AppLayerTxData,
67
}
68

69
impl WebSocketTransaction {
70
    pub fn new(direction: Direction) -> WebSocketTransaction {
60✔
71
        Self {
60✔
72
            tx_data: AppLayerTxData::for_direction(direction),
60✔
73
            ..Default::default()
60✔
74
        }
60✔
75
    }
60✔
76
}
77

78
impl Transaction for WebSocketTransaction {
79
    fn id(&self) -> u64 {
753✔
80
        self.tx_id
753✔
81
    }
753✔
82
}
83

84
#[derive(Default)]
85
struct WebSocketReassemblyBuffer {
86
    data: Vec<u8>,
87
    compress: bool,
88
}
89

90
#[derive(Default)]
91
pub struct WebSocketState {
92
    state_data: AppLayerStateData,
93
    tx_id: u64,
94
    transactions: VecDeque<WebSocketTransaction>,
95

96
    c2s_dec: Option<flate2::Decompress>,
97
    s2c_dec: Option<flate2::Decompress>,
98

99
    c2s_buf: WebSocketReassemblyBuffer,
100
    s2c_buf: WebSocketReassemblyBuffer,
101

102
    to_skip_tc: u64,
103
    to_skip_ts: u64,
104
}
105

106
impl State<WebSocketTransaction> for WebSocketState {
107
    fn get_transaction_count(&self) -> usize {
353✔
108
        self.transactions.len()
353✔
109
    }
353✔
110

111
    fn get_transaction_by_index(&self, index: usize) -> Option<&WebSocketTransaction> {
438✔
112
        self.transactions.get(index)
438✔
113
    }
438✔
114
}
115

116
impl WebSocketState {
117
    pub fn new() -> Self {
6✔
118
        Default::default()
6✔
119
    }
6✔
120

121
    // Free a transaction by ID.
122
    fn free_tx(&mut self, tx_id: u64) {
60✔
123
        let len = self.transactions.len();
60✔
124
        let mut found = false;
60✔
125
        let mut index = 0;
60✔
126
        for i in 0..len {
60✔
127
            let tx = &self.transactions[i];
60✔
128
            if tx.tx_id == tx_id + 1 {
60✔
129
                found = true;
60✔
130
                index = i;
60✔
131
                break;
60✔
UNCOV
132
            }
×
133
        }
134
        if found {
60✔
135
            self.transactions.remove(index);
60✔
136
        }
60✔
137
    }
60✔
138

139
    pub fn get_tx(&mut self, tx_id: u64) -> Option<&WebSocketTransaction> {
62✔
140
        self.transactions.iter().find(|tx| tx.tx_id == tx_id + 1)
184✔
141
    }
62✔
142

143
    fn new_tx(&mut self, direction: Direction) -> WebSocketTransaction {
60✔
144
        let mut tx = WebSocketTransaction::new(direction);
60✔
145
        self.tx_id += 1;
60✔
146
        tx.tx_id = self.tx_id;
60✔
147
        return tx;
60✔
148
    }
60✔
149

150
    fn parse(
44✔
151
        &mut self, stream_slice: StreamSlice, direction: Direction, flow: *mut Flow,
44✔
152
    ) -> AppLayerResult {
44✔
153
        let to_skip = if direction == Direction::ToClient {
44✔
154
            &mut self.to_skip_tc
32✔
155
        } else {
156
            &mut self.to_skip_ts
12✔
157
        };
158
        let input = stream_slice.as_slice();
44✔
159
        let mut start = input;
44✔
160
        if *to_skip > 0 {
44✔
161
            if *to_skip >= input.len() as u64 {
×
162
                *to_skip -= input.len() as u64;
×
163
                return AppLayerResult::ok();
×
164
            } else {
×
165
                start = &input[*to_skip as usize..];
×
166
                *to_skip = 0;
×
167
            }
×
168
        }
44✔
169

170
        let max_pl_size = unsafe { WEBSOCKET_MAX_PAYLOAD_SIZE };
44✔
171
        while !start.is_empty() {
104✔
172
            match parser::parse_message(start, max_pl_size) {
60✔
173
                Ok((rem, pdu)) => {
60✔
174
                    let mut tx = self.new_tx(direction);
60✔
175
                    let _pdu = Frame::new(
60✔
176
                        flow,
60✔
177
                        &stream_slice,
60✔
178
                        start,
60✔
179
                        (start.len() - rem.len() - pdu.payload.len()) as i64,
60✔
180
                        WebSocketFrameType::Header as u8,
60✔
181
                        Some(tx.tx_id),
60✔
182
                    );
60✔
183
                    let _pdu = Frame::new(
60✔
184
                        flow,
60✔
185
                        &stream_slice,
60✔
186
                        start,
60✔
187
                        (start.len() - rem.len()) as i64,
60✔
188
                        WebSocketFrameType::Pdu as u8,
60✔
189
                        Some(tx.tx_id),
60✔
190
                    );
60✔
191
                    let _pdu = Frame::new(
60✔
192
                        flow,
60✔
193
                        &stream_slice,
60✔
194
                        &start[(start.len() - rem.len() - pdu.payload.len())..],
60✔
195
                        pdu.payload.len() as i64,
60✔
196
                        WebSocketFrameType::Data as u8,
60✔
197
                        Some(tx.tx_id),
60✔
198
                    );
60✔
199
                    start = rem;
60✔
200
                    if pdu.to_skip > 0 {
60✔
201
                        if direction == Direction::ToClient {
×
202
                            self.to_skip_tc = pdu.to_skip;
×
203
                        } else {
×
204
                            self.to_skip_ts = pdu.to_skip;
×
205
                        }
×
206
                        tx.tx_data.set_event(WebSocketEvent::SkipEndOfPayload as u8);
×
207
                    }
60✔
208
                    if pdu.compress {
60✔
209
                        // RFC 7692 section 7.1.2 states that
210
                        // absence of precision means LZ77 sliding window of up to 2^15 bytes
211
                        if direction == Direction::ToClient && self.s2c_dec.is_none() {
20✔
212
                            self.s2c_dec = Some(Decompress::new_with_window_bits(false, 15));
2✔
213
                        } else if direction == Direction::ToServer && self.c2s_dec.is_none() {
18✔
UNCOV
214
                            self.c2s_dec = Some(Decompress::new_with_window_bits(false, 15));
×
215
                        }
18✔
216
                    }
40✔
217
                    let (buf, dec) = if direction == Direction::ToClient {
60✔
218
                        (&mut self.s2c_buf, &mut self.s2c_dec)
52✔
219
                    } else {
220
                        (&mut self.c2s_buf, &mut self.c2s_dec)
8✔
221
                    };
222
                    let mut compress = pdu.compress;
60✔
223
                    if !buf.data.is_empty() || !pdu.fin {
60✔
UNCOV
224
                        if buf.data.is_empty() {
×
UNCOV
225
                            buf.compress = pdu.compress;
×
UNCOV
226
                        }
×
UNCOV
227
                        if buf.data.len() + pdu.payload.len() < max_pl_size as usize {
×
UNCOV
228
                            buf.data.extend(&pdu.payload);
×
UNCOV
229
                        } else if buf.data.len() < max_pl_size as usize {
×
230
                            buf.data
×
231
                                .extend(&pdu.payload[..max_pl_size as usize - buf.data.len()]);
×
232
                            tx.tx_data
×
233
                                .set_event(WebSocketEvent::ReassemblyLimitReached as u8);
×
234
                        }
×
235
                    }
60✔
236
                    tx.pdu = pdu;
60✔
237
                    if tx.pdu.fin && !buf.data.is_empty() {
60✔
238
                        // the final PDU gets the full reassembled payload
×
239
                        compress = buf.compress;
×
240
                        std::mem::swap(&mut tx.pdu.payload, &mut buf.data);
×
241
                        buf.data.clear();
×
242
                    }
60✔
243
                    if compress && tx.pdu.fin {
60✔
244
                        buf.compress = false;
20✔
245
                        // cf RFC 7692 section-7.2.2
20✔
246
                        tx.pdu.payload.extend_from_slice(&[0, 0, 0xFF, 0xFF]);
20✔
247
                        let mut v = Vec::with_capacity(std::cmp::min(
20✔
248
                            WEBSOCKET_DECOMPRESS_BUF_SIZE,
20✔
249
                            // Do not allocate 8kbytes for a small size.
20✔
250
                            // Numbers here may be optimized.
20✔
251
                            256 + 16 * tx.pdu.payload.len(),
20✔
252
                        ));
20✔
253
                        if let Some(dec) = dec {
20✔
254
                            let expect = dec.total_in() + tx.pdu.payload.len() as u64;
20✔
255
                            let start = dec.total_in();
20✔
256
                            let mut e = dec.decompress_vec(
20✔
257
                                &tx.pdu.payload,
20✔
258
                                &mut v,
20✔
259
                                FlushDecompress::Finish,
20✔
260
                            );
20✔
261
                            while e.is_ok() && dec.total_in() < expect {
20✔
262
                                let mut s = vec![0u8; WEBSOCKET_DECOMPRESS_BUF_SIZE];
×
263
                                let before = dec.total_out();
×
264
                                let check = dec.total_in();
×
265
                                e = dec.decompress(
×
266
                                    &tx.pdu.payload[(dec.total_in() - start) as usize..],
×
267
                                    &mut s,
×
268
                                    FlushDecompress::Finish,
×
269
                                );
×
270
                                if v.len() < max_pl_size as usize {
×
271
                                    let end = if v.len() + (dec.total_out() - before) as usize
×
272
                                        > max_pl_size as usize
×
273
                                    {
274
                                        max_pl_size as usize - v.len()
×
275
                                    } else {
276
                                        (dec.total_out() - before) as usize
×
277
                                    };
278
                                    v.extend_from_slice(&s[..end]);
×
279
                                }
×
280
                                if check >= dec.total_in() {
×
281
                                    // safety check against infinite loop : dec.total_in() should increase
282
                                    break;
×
283
                                }
×
284
                            }
285
                            if !v.is_empty() {
20✔
286
                                std::mem::swap(&mut tx.pdu.payload, &mut v);
20✔
287
                            }
20✔
288
                        }
×
289
                    }
40✔
290
                    if tx.pdu.fin {
60✔
291
                        sc_app_layer_parser_trigger_raw_stream_inspection(flow, direction as i32);
60✔
292
                    }
60✔
293
                    self.transactions.push_back(tx);
60✔
294
                }
UNCOV
295
                Err(nom::Err::Incomplete(needed)) => {
×
UNCOV
296
                    if let Needed::Size(n) = needed {
×
UNCOV
297
                        let n = usize::from(n);
×
UNCOV
298
                        // Not enough data. just ask for one more byte.
×
UNCOV
299
                        let consumed = input.len() - start.len();
×
UNCOV
300
                        let needed = start.len() + n;
×
UNCOV
301
                        return AppLayerResult::incomplete(consumed as u32, needed as u32);
×
302
                    }
×
303
                    return AppLayerResult::err();
×
304
                }
305
                Err(_) => {
306
                    return AppLayerResult::err();
×
307
                }
308
            }
309
        }
310
        // Input was fully consumed.
311
        return AppLayerResult::ok();
44✔
312
    }
44✔
313
}
314

315
// C exports.
316

317
unsafe extern "C" fn websocket_probing_parser(
12✔
318
    _flow: *const Flow, _direction: u8, input: *const u8, input_len: u32, _rdir: *mut u8,
12✔
319
) -> AppProto {
12✔
320
    if !input.is_null() {
12✔
321
        let slice = build_slice!(input, input_len as usize);
10✔
322
        if !slice.is_empty() {
10✔
323
            // just check reserved bits are zeroed, except RSV1
324
            // as RSV1 is used for compression cf RFC 7692
325
            if slice[0] & 0x30 == 0 {
10✔
326
                return ALPROTO_WEBSOCKET;
10✔
UNCOV
327
            }
×
UNCOV
328
            return ALPROTO_FAILED;
×
329
        }
×
330
    }
2✔
331
    return ALPROTO_UNKNOWN;
2✔
332
}
12✔
333

334
extern "C" fn websocket_state_new(_orig_state: *mut c_void, _orig_proto: AppProto) -> *mut c_void {
6✔
335
    let state = WebSocketState::new();
6✔
336
    let boxed = Box::new(state);
6✔
337
    return Box::into_raw(boxed) as *mut c_void;
6✔
338
}
6✔
339

340
unsafe extern "C" fn websocket_state_free(state: *mut c_void) {
6✔
341
    std::mem::drop(Box::from_raw(state as *mut WebSocketState));
6✔
342
}
6✔
343

344
unsafe extern "C" fn websocket_state_tx_free(state: *mut c_void, tx_id: u64) {
60✔
345
    let state = cast_pointer!(state, WebSocketState);
60✔
346
    state.free_tx(tx_id);
60✔
347
}
60✔
348

349
unsafe extern "C" fn websocket_parse_request(
12✔
350
    flow: *mut Flow, state: *mut c_void, _pstate: *mut AppLayerParserState,
12✔
351
    stream_slice: StreamSlice, _data: *mut c_void,
12✔
352
) -> AppLayerResult {
12✔
353
    let state = cast_pointer!(state, WebSocketState);
12✔
354
    state.parse(stream_slice, Direction::ToServer, flow)
12✔
355
}
12✔
356

357
unsafe extern "C" fn websocket_parse_response(
32✔
358
    flow: *mut Flow, state: *mut c_void, _pstate: *mut AppLayerParserState,
32✔
359
    stream_slice: StreamSlice, _data: *mut c_void,
32✔
360
) -> AppLayerResult {
32✔
361
    let state = cast_pointer!(state, WebSocketState);
32✔
362
    state.parse(stream_slice, Direction::ToClient, flow)
32✔
363
}
32✔
364

365
unsafe extern "C" fn websocket_state_get_tx(state: *mut c_void, tx_id: u64) -> *mut c_void {
62✔
366
    let state = cast_pointer!(state, WebSocketState);
62✔
367
    match state.get_tx(tx_id) {
62✔
368
        Some(tx) => {
62✔
369
            return tx as *const _ as *mut _;
62✔
370
        }
371
        None => {
UNCOV
372
            return std::ptr::null_mut();
×
373
        }
374
    }
375
}
62✔
376

377
unsafe extern "C" fn websocket_state_get_tx_count(state: *mut c_void) -> u64 {
337✔
378
    let state = cast_pointer!(state, WebSocketState);
337✔
379
    return state.tx_id;
337✔
380
}
337✔
381

382
unsafe extern "C" fn websocket_tx_get_alstate_progress(_tx: *mut c_void, _direction: u8) -> c_int {
526✔
383
    return 1;
526✔
384
}
526✔
385

386
export_tx_data_get!(websocket_get_tx_data, WebSocketTransaction);
387
export_state_data_get!(websocket_get_state_data, WebSocketState);
388

389
// Parser name as a C style string.
390
const PARSER_NAME: &[u8] = b"websocket\0";
391

392
#[no_mangle]
393
pub unsafe extern "C" fn SCRegisterWebSocketParser() {
2,226✔
394
    let parser = RustParser {
2,226✔
395
        name: PARSER_NAME.as_ptr() as *const c_char,
2,226✔
396
        default_port: std::ptr::null(),
2,226✔
397
        ipproto: IPPROTO_TCP,
2,226✔
398
        probe_ts: Some(websocket_probing_parser),
2,226✔
399
        probe_tc: Some(websocket_probing_parser),
2,226✔
400
        min_depth: 0,
2,226✔
401
        max_depth: 16,
2,226✔
402
        state_new: websocket_state_new,
2,226✔
403
        state_free: websocket_state_free,
2,226✔
404
        tx_free: websocket_state_tx_free,
2,226✔
405
        parse_ts: websocket_parse_request,
2,226✔
406
        parse_tc: websocket_parse_response,
2,226✔
407
        get_tx_count: websocket_state_get_tx_count,
2,226✔
408
        get_tx: websocket_state_get_tx,
2,226✔
409
        tx_comp_st_ts: 1,
2,226✔
410
        tx_comp_st_tc: 1,
2,226✔
411
        tx_get_progress: websocket_tx_get_alstate_progress,
2,226✔
412
        get_eventinfo: Some(WebSocketEvent::get_event_info),
2,226✔
413
        get_eventinfo_byid: Some(WebSocketEvent::get_event_info_by_id),
2,226✔
414
        localstorage_new: None,
2,226✔
415
        localstorage_free: None,
2,226✔
416
        get_tx_files: None,
2,226✔
417
        get_tx_iterator: Some(
2,226✔
418
            applayer::state_get_tx_iterator::<WebSocketState, WebSocketTransaction>,
2,226✔
419
        ),
2,226✔
420
        get_tx_data: websocket_get_tx_data,
2,226✔
421
        get_state_data: websocket_get_state_data,
2,226✔
422
        apply_tx_config: None,
2,226✔
423
        flags: 0, // do not accept gaps as there is no good way to resync
2,226✔
424
        get_frame_id_by_name: Some(WebSocketFrameType::ffi_id_from_name),
2,226✔
425
        get_frame_name_by_id: Some(WebSocketFrameType::ffi_name_from_id),
2,226✔
426
        get_state_id_by_name: None,
2,226✔
427
        get_state_name_by_id: None,
2,226✔
428
    };
2,226✔
429

2,226✔
430
    let ip_proto_str = CString::new("tcp").unwrap();
2,226✔
431

2,226✔
432
    if SCAppLayerProtoDetectConfProtoDetectionEnabled(ip_proto_str.as_ptr(), parser.name) != 0 {
2,226✔
433
        let alproto = applayer_register_protocol_detection(&parser, 1);
2,226✔
434
        ALPROTO_WEBSOCKET = alproto;
2,226✔
435
        if SCAppLayerParserConfParserEnabled(ip_proto_str.as_ptr(), parser.name) != 0 {
2,226✔
436
            let _ = AppLayerRegisterParser(&parser, alproto);
2,226✔
437
        }
2,226✔
438
        SCLogDebug!("Rust websocket parser registered.");
439
        if let Some(val) = conf_get("app-layer.protocols.websocket.max-payload-size") {
2,226✔
440
            if let Ok(v) = val.parse::<u32>() {
×
441
                WEBSOCKET_MAX_PAYLOAD_SIZE = v;
×
442
            } else {
×
443
                SCLogError!("Invalid value for websocket.max-payload-size");
×
444
            }
445
        }
2,226✔
446
        SCAppLayerParserRegisterLogger(IPPROTO_TCP, ALPROTO_WEBSOCKET);
2,226✔
447
    } else {
×
448
        SCLogDebug!("Protocol detector and parser disabled for WEBSOCKET.");
×
449
    }
×
450
}
2,226✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc