• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

OISF / suricata / 23338889526

20 Mar 2026 10:29AM UTC coverage: 76.331% (-3.0%) from 79.315%
23338889526

Pull #15053

github

web-flow
Merge 00ac1dd14 into 6587e363a
Pull Request #15053: Flow queue/v3

106 of 127 new or added lines in 8 files covered. (83.46%)

9913 existing lines in 468 files now uncovered.

255689 of 334972 relevant lines covered (76.33%)

4170649.82 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.25
/src/flow-timeout.c
1
/* Copyright (C) 2007-2024 Open Information Security Foundation
2
 *
3
 * You can copy, redistribute or modify this Program under the terms of
4
 * the GNU General Public License version 2 as published by the Free
5
 * Software Foundation.
6
 *
7
 * This program is distributed in the hope that it will be useful,
8
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10
 * GNU General Public License for more details.
11
 *
12
 * You should have received a copy of the GNU General Public License
13
 * version 2 along with this program; if not, write to the Free Software
14
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15
 * 02110-1301, USA.
16
 */
17

18
/**
19
 * \file
20
 *
21
 * \author Anoop Saldanha <anoopsaldanha@gmail.com>
22
 */
23

24
#include "suricata-common.h"
25
#include "suricata.h"
26
#include "decode.h"
27
#include "conf.h"
28
#include "threadvars.h"
29
#include "tm-threads.h"
30
#include "runmodes.h"
31

32
#include "util-random.h"
33
#include "util-time.h"
34

35
#include "flow.h"
36
#include "flow-queue.h"
37
#include "flow-hash.h"
38
#include "flow-util.h"
39
#include "flow-var.h"
40
#include "flow-private.h"
41
#include "flow-manager.h"
42
#include "flow-timeout.h"
43
#include "pkt-var.h"
44
#include "host.h"
45

46
#include "stream-tcp-private.h"
47
#include "stream-tcp-reassemble.h"
48
#include "stream-tcp.h"
49

50
#include "util-unittest.h"
51
#include "util-unittest-helper.h"
52
#include "util-byte.h"
53

54
#include "util-debug.h"
55
#include "util-privs.h"
56
#include "util-datalink.h"
57

58
#include "detect.h"
59
#include "detect-engine-state.h"
60
#include "stream.h"
61

62
#include "rust.h"
63
#include "app-layer-frames.h"
64
#include "app-layer-parser.h"
65
#include "app-layer.h"
66

67
#include "util-profiling.h"
68

69
/**
70
 * \internal
71
 * \brief Pseudo packet setup to finish a flow when needed.
72
 *
73
 * \param p         a dummy pseudo packet from packet pool.  Not all pseudo
74
 *                  packets need to force reassembly, in which case we just
75
 *                  set dummy ack/seq values.
76
 * \param direction Direction of the packet.  0 indicates toserver and 1
77
 *                  indicates toclient.
78
 * \param f         Pointer to the flow.
79
 * \param ssn       Pointer to the tcp session.
80
 * \retval          pseudo packet with everything set up
81
 */
82
static inline Packet *FlowPseudoPacketSetup(
83
        Packet *p, int direction, Flow *f, const TcpSession *ssn)
84
{
8,070✔
85
    const int orig_dir = direction;
8,070✔
86
    p->tenant_id = f->tenant_id;
8,070✔
87
    p->datalink = DatalinkGetGlobalType();
8,070✔
88
    p->proto = IPPROTO_TCP;
8,070✔
89
    FlowReference(&p->flow, f);
8,070✔
90
    p->flags |= PKT_STREAM_EST;
8,070✔
91
    p->flags |= PKT_HAS_FLOW;
8,070✔
92
    p->flags |= PKT_PSEUDO_STREAM_END;
8,070✔
93
    memcpy(&p->vlan_id[0], &f->vlan_id[0], sizeof(p->vlan_id));
8,070✔
94
    p->vlan_idx = f->vlan_idx;
8,070✔
95
    p->livedev = (struct LiveDevice_ *)f->livedev;
8,070✔
96

97
    if (f->flags & FLOW_NOPAYLOAD_INSPECTION) {
8,070✔
98
        DecodeSetNoPayloadInspectionFlag(p);
3,383✔
99
    }
3,383✔
100

101
    if (direction == 0)
8,070✔
102
        p->flowflags |= FLOW_PKT_TOSERVER;
3,915✔
103
    else
4,155✔
104
        p->flowflags |= FLOW_PKT_TOCLIENT;
4,155✔
105
    p->flowflags |= FLOW_PKT_ESTABLISHED;
8,070✔
106
    p->payload = NULL;
8,070✔
107
    p->payload_len = 0;
8,070✔
108

109
    /* apply reversed flow logic after setting direction to the packet */
110
    direction ^= ((f->flags & FLOW_DIR_REVERSED) != 0);
8,070✔
111

112
    if (FLOW_IS_IPV4(f)) {
8,070✔
113
        if (direction == 0) {
7,960✔
114
            FLOW_COPY_IPV4_ADDR_TO_PACKET(&f->src, &p->src);
3,860✔
115
            FLOW_COPY_IPV4_ADDR_TO_PACKET(&f->dst, &p->dst);
3,860✔
116
            p->sp = f->sp;
3,860✔
117
            p->dp = f->dp;
3,860✔
118
        } else {
4,100✔
119
            FLOW_COPY_IPV4_ADDR_TO_PACKET(&f->src, &p->dst);
4,100✔
120
            FLOW_COPY_IPV4_ADDR_TO_PACKET(&f->dst, &p->src);
4,100✔
121
            p->sp = f->dp;
4,100✔
122
            p->dp = f->sp;
4,100✔
123
        }
4,100✔
124

125
        /* Check if we have enough room in direct data. We need ipv4 hdr + tcp hdr.
126
         * Force an allocation if it is not the case.
127
         */
128
        if (GET_PKT_DIRECT_MAX_SIZE(p) <  40) {
7,960✔
UNCOV
129
            if (PacketCallocExtPkt(p, 40) == -1) {
×
130
                goto error;
×
131
            }
×
UNCOV
132
        }
×
133
        /* set the ip header */
134
        IPV4Hdr *ip4h = PacketSetIPV4(p, GET_PKT_DATA(p));
7,960✔
135
        /* version 4 and length 20 bytes for the tcp header */
136
        ip4h->ip_verhl = 0x45;
7,960✔
137
        ip4h->ip_tos = 0;
7,960✔
138
        ip4h->ip_len = htons(40);
7,960✔
139
        ip4h->ip_id = 0;
7,960✔
140
        ip4h->ip_off = 0;
7,960✔
141
        ip4h->ip_ttl = 64;
7,960✔
142
        ip4h->ip_proto = IPPROTO_TCP;
7,960✔
143
        //p->ip4h->ip_csum =
144
        if (direction == 0) {
7,960✔
145
            ip4h->s_ip_src.s_addr = f->src.addr_data32[0];
3,860✔
146
            ip4h->s_ip_dst.s_addr = f->dst.addr_data32[0];
3,860✔
147
        } else {
4,100✔
148
            ip4h->s_ip_src.s_addr = f->dst.addr_data32[0];
4,100✔
149
            ip4h->s_ip_dst.s_addr = f->src.addr_data32[0];
4,100✔
150
        }
4,100✔
151

152
        /* set the tcp header */
153
        PacketSetTCP(p, GET_PKT_DATA(p) + 20);
7,960✔
154

155
        SET_PKT_LEN(p, 40); /* ipv4 hdr + tcp hdr */
7,960✔
156

157
    } else if (FLOW_IS_IPV6(f)) {
7,960✔
158
        if (direction == 0) {
110✔
159
            FLOW_COPY_IPV6_ADDR_TO_PACKET(&f->src, &p->src);
56✔
160
            FLOW_COPY_IPV6_ADDR_TO_PACKET(&f->dst, &p->dst);
56✔
161
            p->sp = f->sp;
56✔
162
            p->dp = f->dp;
56✔
163
        } else {
57✔
164
            FLOW_COPY_IPV6_ADDR_TO_PACKET(&f->src, &p->dst);
54✔
165
            FLOW_COPY_IPV6_ADDR_TO_PACKET(&f->dst, &p->src);
54✔
166
            p->sp = f->dp;
54✔
167
            p->dp = f->sp;
54✔
168
        }
54✔
169

170
        /* Check if we have enough room in direct data. We need ipv6 hdr + tcp hdr.
171
         * Force an allocation if it is not the case.
172
         */
173
        if (GET_PKT_DIRECT_MAX_SIZE(p) <  60) {
110✔
174
            if (PacketCallocExtPkt(p, 60) == -1) {
×
175
                goto error;
×
176
            }
×
177
        }
×
178
        /* set the ip header */
179
        IPV6Hdr *ip6h = PacketSetIPV6(p, GET_PKT_DATA(p));
110✔
180
        /* version 6 */
181
        ip6h->s_ip6_vfc = 0x60;
110✔
182
        ip6h->s_ip6_flow = 0;
110✔
183
        ip6h->s_ip6_nxt = IPPROTO_TCP;
110✔
184
        ip6h->s_ip6_plen = htons(20);
110✔
185
        ip6h->s_ip6_hlim = 64;
110✔
186
        if (direction == 0) {
110✔
187
            ip6h->s_ip6_src[0] = f->src.addr_data32[0];
56✔
188
            ip6h->s_ip6_src[1] = f->src.addr_data32[1];
56✔
189
            ip6h->s_ip6_src[2] = f->src.addr_data32[2];
56✔
190
            ip6h->s_ip6_src[3] = f->src.addr_data32[3];
56✔
191
            ip6h->s_ip6_dst[0] = f->dst.addr_data32[0];
56✔
192
            ip6h->s_ip6_dst[1] = f->dst.addr_data32[1];
56✔
193
            ip6h->s_ip6_dst[2] = f->dst.addr_data32[2];
56✔
194
            ip6h->s_ip6_dst[3] = f->dst.addr_data32[3];
56✔
195
        } else {
57✔
196
            ip6h->s_ip6_src[0] = f->dst.addr_data32[0];
54✔
197
            ip6h->s_ip6_src[1] = f->dst.addr_data32[1];
54✔
198
            ip6h->s_ip6_src[2] = f->dst.addr_data32[2];
54✔
199
            ip6h->s_ip6_src[3] = f->dst.addr_data32[3];
54✔
200
            ip6h->s_ip6_dst[0] = f->src.addr_data32[0];
54✔
201
            ip6h->s_ip6_dst[1] = f->src.addr_data32[1];
54✔
202
            ip6h->s_ip6_dst[2] = f->src.addr_data32[2];
54✔
203
            ip6h->s_ip6_dst[3] = f->src.addr_data32[3];
54✔
204
        }
54✔
205

206
        /* set the tcp header */
207
        PacketSetTCP(p, GET_PKT_DATA(p) + 40);
110✔
208

209
        SET_PKT_LEN(p, 60); /* ipv6 hdr + tcp hdr */
110✔
210
    }
110✔
211

212
    p->l4.hdrs.tcph->th_offx2 = 0x50;
8,070✔
213
    p->l4.hdrs.tcph->th_flags = 0;
8,070✔
214
    p->l4.hdrs.tcph->th_win = 10;
8,070✔
215
    p->l4.hdrs.tcph->th_urp = 0;
8,070✔
216

217
    /* to server */
218
    if (orig_dir == 0) {
8,070✔
219
        p->l4.hdrs.tcph->th_sport = htons(f->sp);
3,915✔
220
        p->l4.hdrs.tcph->th_dport = htons(f->dp);
3,915✔
221

222
        p->l4.hdrs.tcph->th_seq = htonl(ssn->client.next_seq);
3,915✔
223
        p->l4.hdrs.tcph->th_ack = 0;
3,915✔
224

225
        /* to client */
226
    } else {
4,155✔
227
        p->l4.hdrs.tcph->th_sport = htons(f->dp);
4,155✔
228
        p->l4.hdrs.tcph->th_dport = htons(f->sp);
4,155✔
229

230
        p->l4.hdrs.tcph->th_seq = htonl(ssn->server.next_seq);
4,155✔
231
        p->l4.hdrs.tcph->th_ack = 0;
4,155✔
232
    }
4,155✔
233

234
    if (FLOW_IS_IPV4(f)) {
8,070✔
235
        IPV4Hdr *ip4h = p->l3.hdrs.ip4h;
7,960✔
236
        p->l4.hdrs.tcph->th_sum = TCPChecksum(ip4h->s_ip_addrs, (uint16_t *)p->l4.hdrs.tcph, 20, 0);
7,960✔
237
        /* calc ipv4 csum as we may log it and barnyard might reject
238
         * a wrong checksum */
239
        ip4h->ip_csum = IPV4Checksum((uint16_t *)ip4h, IPV4_GET_RAW_HLEN(ip4h), 0);
7,960✔
240
    } else if (FLOW_IS_IPV6(f)) {
7,960✔
241
        const IPV6Hdr *ip6h = PacketGetIPv6(p);
110✔
242
        p->l4.hdrs.tcph->th_sum =
110✔
243
                TCPChecksum(ip6h->s_ip6_addrs, (uint16_t *)p->l4.hdrs.tcph, 20, 0);
110✔
244
    }
110✔
245

246
    p->ts = TimeGet();
8,070✔
247

248
    if (direction == 0) {
8,070✔
249
        if (f->alparser && !STREAM_HAS_SEEN_DATA(&ssn->client)) {
3,916✔
250
            SCAppLayerParserStateSetFlag(f->alparser, APP_LAYER_PARSER_EOF_TS);
41✔
251
        }
41✔
252
    } else {
4,154✔
253
        if (f->alparser && !STREAM_HAS_SEEN_DATA(&ssn->server)) {
4,154✔
254
            SCAppLayerParserStateSetFlag(f->alparser, APP_LAYER_PARSER_EOF_TC);
637✔
255
        }
637✔
256
    }
4,154✔
257

258
    return p;
8,070✔
259

260
error:
×
261
    FlowDeReference(&p->flow);
×
262
    return NULL;
×
263
}
8,070✔
264

265
Packet *FlowPseudoPacketGet(int direction, Flow *f, const TcpSession *ssn)
266
{
8,070✔
267
    PacketPoolWait();
8,070✔
268
    Packet *p = PacketPoolGetPacket();
8,070✔
269
    if (p == NULL) {
8,070✔
270
        return NULL;
×
271
    }
×
272

273
    PACKET_PROFILING_START(p);
8,070✔
274

275
    return FlowPseudoPacketSetup(p, direction, f, ssn);
8,070✔
276
}
8,070✔
277

278
/**
279
 *  \brief Check if a flow needs forced reassembly, or any other processing
280
 *
281
 *  \param f *LOCKED* flow
282
 *
283
 *  \retval false no
284
 *  \retval true yes
285
 */
286
bool FlowNeedsReassembly(Flow *f)
287
{
13,594✔
288
    if (f == NULL || f->protoctx == NULL) {
13,594✔
289
        return false;
15✔
290
    }
15✔
291

292
    TcpSession *ssn = (TcpSession *)f->protoctx;
13,579✔
293
    uint8_t client = StreamNeedsReassembly(ssn, STREAM_TOSERVER);
13,579✔
294
    uint8_t server = StreamNeedsReassembly(ssn, STREAM_TOCLIENT);
13,579✔
295

296
    /* if state is not fully closed we assume that we haven't fully
297
     * inspected the app layer state yet */
298
    if (ssn->state >= TCP_ESTABLISHED && ssn->state != TCP_CLOSED)
13,579✔
299
    {
4,870✔
300
        client = STREAM_HAS_UNPROCESSED_SEGMENTS_NEED_ONLY_DETECTION;
4,870✔
301
        server = STREAM_HAS_UNPROCESSED_SEGMENTS_NEED_ONLY_DETECTION;
4,870✔
302
    }
4,870✔
303

304
    /* if app layer still needs some love, push through */
305
    if (f->alproto != ALPROTO_UNKNOWN && f->alstate != NULL) {
13,579✔
306
        const uint64_t total_txs = AppLayerParserGetTxCnt(f, f->alstate);
11,096✔
307

308
        if (AppLayerParserGetTransactionActive(f, f->alparser, STREAM_TOCLIENT) < total_txs)
11,096✔
309
        {
6,914✔
310
            server = STREAM_HAS_UNPROCESSED_SEGMENTS_NEED_ONLY_DETECTION;
6,914✔
311
        }
6,914✔
312
        if (AppLayerParserGetTransactionActive(f, f->alparser, STREAM_TOSERVER) < total_txs)
11,096✔
313
        {
6,072✔
314
            client = STREAM_HAS_UNPROCESSED_SEGMENTS_NEED_ONLY_DETECTION;
6,072✔
315
        }
6,072✔
316
    }
11,096✔
317

318
    /* if any frame is present we assume it still needs work */
319
    FramesContainer *frames_container = AppLayerFramesGetContainer(f);
13,579✔
320
    if (frames_container) {
13,579✔
321
        if (frames_container->toserver.cnt)
57✔
322
            client = STREAM_HAS_UNPROCESSED_SEGMENTS_NEED_ONLY_DETECTION;
24✔
323
        if (frames_container->toclient.cnt)
57✔
324
            server = STREAM_HAS_UNPROCESSED_SEGMENTS_NEED_ONLY_DETECTION;
25✔
325
    }
57✔
326

327
    /* nothing to do */
328
    if (client == STREAM_HAS_UNPROCESSED_SEGMENTS_NONE &&
13,579✔
329
        server == STREAM_HAS_UNPROCESSED_SEGMENTS_NONE) {
13,579✔
330
        return false;
4,709✔
331
    }
4,709✔
332

333
    f->ffr_ts = client;
8,870✔
334
    f->ffr_tc = server;
8,870✔
335
    return true;
8,870✔
336
}
13,579✔
337

338
/**
339
 * \internal
340
 * \brief Sends the flow to its respective thread's flow queue.
341
 *
342
 *        The function requires flow to be locked beforehand.
343
 *
344
 * Normally, the first thread_id value should be used. This is when the flow is
345
 * created on seeing the first packet to the server; when the flow's reversed
346
 * flag is set, choose the second thread_id (to client/source).
347
 *
348
 * \param f Pointer to the flow.
349
 */
350
void FlowSendToLocalThread(Flow *f)
351
{
4,456✔
352
    // Choose the thread_id based on whether the flow has been
353
    // reversed.
354
    int idx = f->flags & FLOW_DIR_REVERSED ? 1 : 0;
4,456✔
355
    TmThreadsInjectFlowById(f, (const int)f->thread_id[idx]);
4,456✔
356
}
4,456✔
357

358
/**
359
 * \internal
360
 * \brief Remove flows from the hash bucket as they have more work to be done in
361
 *        in the detection engine.
362
 *
363
 * When this function is called we're running in virtually dead engine,
364
 * so locking the flows is not strictly required. The reasons it is still
365
 * done are:
366
 * - code consistency
367
 * - silence complaining profilers
368
 * - allow us to aggressively check using debug validation assertions
369
 * - be robust in case of future changes
370
 * - locking overhead is negligible when no other thread fights us
371
 */
372
static inline void FlowRemoveHash(void)
373
{
3,425✔
374
    for (uint32_t idx = 0; idx < flow_config.hash_size; idx++) {
224,464,225✔
375
        FlowBucket *fb = &flow_hash[idx];
224,460,800✔
376
        FBLOCK_LOCK(fb);
224,460,800✔
377

378
        Flow *f = fb->head;
224,460,800✔
379
        Flow *prev_f = NULL;
224,460,800✔
380

381
        /* we need to loop through all the flows in the queue */
382
        while (f != NULL) {
224,474,316✔
383
            Flow *next_f = f->next;
13,516✔
384

385
            FLOWLOCK_WRLOCK(f);
13,516✔
386

387
            /* Get the tcp session for the flow */
388
            TcpSession *ssn = (TcpSession *)f->protoctx;
13,516✔
389
            /* \todo Also skip flows that shouldn't be inspected */
390
            if (ssn == NULL) {
13,516✔
391
                FLOWLOCK_UNLOCK(f);
5,148✔
392
                prev_f = f;
5,148✔
393
                f = next_f;
5,148✔
394
                continue;
5,148✔
395
            }
5,148✔
396

397
            /* in case of additional work, we pull the flow out of the
398
             * hash and xfer ownership to the injected packet(s) */
399
            if (FlowNeedsReassembly(f)) {
8,368✔
400
                RemoveFromHash(f, prev_f);
4,301✔
401
                f->flow_end_flags |= FLOW_END_FLAG_SHUTDOWN;
4,301✔
402
                FlowSendToLocalThread(f);
4,301✔
403
                FLOWLOCK_UNLOCK(f);
4,301✔
404
                f = next_f;
4,301✔
405
                continue;
4,301✔
406
            }
4,301✔
407

408
            FLOWLOCK_UNLOCK(f);
4,067✔
409

410
            /* next flow in the queue */
411
            prev_f = f;
4,067✔
412
            f = f->next;
4,067✔
413
        }
4,067✔
414
        FBLOCK_UNLOCK(fb);
224,460,800✔
415
    }
224,460,800✔
416
}
3,425✔
417

418
/**
419
 * \brief Clean up all the flows that have unprocessed segments and have
420
 *        some work to do in the detection engine.
421
 */
422
void FlowWorkToDoCleanup(void)
423
{
3,425✔
424
    /* Carry out cleanup of unattended flows */
425
    FlowRemoveHash();
3,425✔
426
}
3,425✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc