• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

OISF / suricata / 23374838686

21 Mar 2026 07:29AM UTC coverage: 59.341% (-20.0%) from 79.315%
23374838686

Pull #15075

github

web-flow
Merge 90b4e834f into 6587e363a
Pull Request #15075: Stack 8001 v16.4

38 of 70 new or added lines in 10 files covered. (54.29%)

34165 existing lines in 563 files now uncovered.

119621 of 201584 relevant lines covered (59.34%)

650666.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.45
/src/flow-worker.c
1
/* Copyright (C) 2016-2024 Open Information Security Foundation
2
 *
3
 * You can copy, redistribute or modify this Program under the terms of
4
 * the GNU General Public License version 2 as published by the Free
5
 * Software Foundation.
6
 *
7
 * This program is distributed in the hope that it will be useful,
8
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10
 * GNU General Public License for more details.
11
 *
12
 * You should have received a copy of the GNU General Public License
13
 * version 2 along with this program; if not, write to the Free Software
14
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15
 * 02110-1301, USA.
16
 */
17

18
/**
19
 * \file
20
 *
21
 * \author Victor Julien <victor@inliniac.net>
22
 *
23
 * Flow Workers are single thread modules taking care of (almost)
24
 * everything related to packets with flows:
25
 *
26
 * - Lookup/creation
27
 * - Stream tracking, reassembly
28
 * - Applayer update
29
 * - Detection
30
 *
31
 * This all while holding the flow lock.
32
 */
33

34
#include "suricata-common.h"
35
#include "suricata.h"
36

37
#include "action-globals.h"
38
#include "packet.h"
39
#include "decode.h"
40
#include "detect.h"
41
#include "stream-tcp.h"
42
#include "app-layer.h"
43
#include "detect-engine.h"
44
#include "output.h"
45
#include "app-layer-parser.h"
46
#include "app-layer-frames.h"
47

48
#include "util-profiling.h"
49
#include "util-validate.h"
50
#include "util-time.h"
51
#include "tmqh-packetpool.h"
52

53
#include "flow-util.h"
54
#include "flow-manager.h"
55
#include "flow-timeout.h"
56
#include "flow-spare-pool.h"
57
#include "flow-worker.h"
58

59
typedef DetectEngineThreadCtx *DetectEngineThreadCtxPtr;
60

61
typedef struct FlowTimeoutCounters {
62
    uint32_t flows_aside_needs_work;
63
    uint32_t flows_aside_pkt_inject;
64
} FlowTimeoutCounters;
65

66
typedef struct FlowWorkerThreadData_ {
67
    DecodeThreadVars *dtv;
68

69
    union {
70
        StreamTcpThread *stream_thread;
71
        void *stream_thread_ptr;
72
    };
73

74
    SC_ATOMIC_DECLARE(DetectEngineThreadCtxPtr, detect_thread);
75

76
    SC_ATOMIC_DECLARE(bool, flush_ack);
77

78
    void *output_thread; /* Output thread data. */
79
    void *output_thread_flow; /* Output thread data. */
80

81
    StatsCounterId local_bypass_pkts;
82
    StatsCounterId local_bypass_bytes;
83
    StatsCounterId both_bypass_pkts;
84
    StatsCounterId both_bypass_bytes;
85
    /** Queue to put pseudo packets that have been created by the stream (RST response) and by the
86
     * flush logic following a protocol change. */
87
    PacketQueueNoLock pq;
88
    FlowLookupStruct fls;
89

90
    struct {
91
        StatsCounterId flows_injected;
92
        StatsCounterMaxId flows_injected_max;
93
        StatsCounterId flows_removed;
94
        StatsCounterId flows_aside_needs_work;
95
        StatsCounterId flows_aside_pkt_inject;
96
    } cnt;
97
    FlowEndCounters fec;
98

99
} FlowWorkerThreadData;
100

101
static void FlowWorkerFlowTimeout(
102
        ThreadVars *tv, Packet *p, FlowWorkerThreadData *fw, DetectEngineThreadCtx *det_ctx);
103

104
/**
105
 * \internal
106
 * \brief Forces reassembly for flow if it needs it.
107
 *
108
 *        The function requires flow to be locked beforehand.
109
 *
110
 * \param f Pointer to the flow.
111
 *
112
 * \retval cnt number of packets injected
113
 */
114
static int FlowFinish(ThreadVars *tv, Flow *f, FlowWorkerThreadData *fw, void *detect_thread)
115
{
5,715✔
116
    const int server = f->ffr_tc;
5,715✔
117
    const int client = f->ffr_ts;
5,715✔
118
    int cnt = 0;
5,715✔
119

120
    /* Get the tcp session for the flow */
121
    const TcpSession *ssn = (TcpSession *)f->protoctx;
5,715✔
122

123
    /* insert a pseudo packet in the toserver direction */
124
    if (client == STREAM_HAS_UNPROCESSED_SEGMENTS_NEED_ONLY_DETECTION) {
5,715✔
125
        Packet *p = FlowPseudoPacketGet(0, f, ssn);
5,651✔
126
        if (p != NULL) {
5,651✔
127
            PKT_SET_SRC(p, PKT_SRC_FFR);
5,651✔
128
            if (server == STREAM_HAS_UNPROCESSED_SEGMENTS_NONE) {
5,651✔
129
                p->flowflags |= FLOW_PKT_LAST_PSEUDO;
515✔
130
            }
515✔
131
            FlowWorkerFlowTimeout(tv, p, fw, detect_thread);
5,651✔
132
            PacketPoolReturnPacket(p);
5,651✔
133
            cnt++;
5,651✔
134
        }
5,651✔
135
    }
5,651✔
136

137
    /* handle toclient */
138
    if (server == STREAM_HAS_UNPROCESSED_SEGMENTS_NEED_ONLY_DETECTION) {
5,715✔
139
        Packet *p = FlowPseudoPacketGet(1, f, ssn);
5,200✔
140
        if (p != NULL) {
5,200✔
141
            PKT_SET_SRC(p, PKT_SRC_FFR);
5,200✔
142
            p->flowflags |= FLOW_PKT_LAST_PSEUDO;
5,200✔
143
            FlowWorkerFlowTimeout(tv, p, fw, detect_thread);
5,200✔
144
            PacketPoolReturnPacket(p);
5,200✔
145
            f->flags |= FLOW_TIMEOUT_REASSEMBLY_DONE;
5,200✔
146
            cnt++;
5,200✔
147
        }
5,200✔
148
    }
5,200✔
149

150
    if (cnt > 0) {
5,715✔
151
        f->flags |= FLOW_TIMEOUT_REASSEMBLY_DONE;
5,715✔
152
    }
5,715✔
153
    return cnt;
5,715✔
154
}
5,715✔
155

156
/** \param[in] max_work Max flows to process. 0 if unlimited. */
157
static void CheckWorkQueue(ThreadVars *tv, FlowWorkerThreadData *fw, FlowTimeoutCounters *counters,
158
        FlowQueuePrivate *fq, const uint32_t max_work)
159
{
9,361✔
160
    FlowQueuePrivate ret_queue = { NULL, NULL, 0 };
9,361✔
161
    uint32_t i = 0;
9,361✔
162
    Flow *f;
9,361✔
163
    while ((f = FlowQueuePrivateGetFromTop(fq)) != NULL) {
18,722✔
164
        FLOWLOCK_WRLOCK(f);
9,384✔
165
        f->flow_end_flags |= FLOW_END_FLAG_TIMEOUT; //TODO emerg
9,384✔
166

167
        if (f->proto == IPPROTO_TCP) {
9,384✔
168
            if (!(f->flags & (FLOW_TIMEOUT_REASSEMBLY_DONE | FLOW_ACTION_DROP)) &&
7,008✔
169
                    !FlowIsBypassed(f) && FlowNeedsReassembly(f) && f->ffr != 0) {
7,008✔
170
                /* read detect thread in case we're doing a reload */
171
                void *detect_thread = SC_ATOMIC_GET(fw->detect_thread);
5,715✔
172
                int cnt = FlowFinish(tv, f, fw, detect_thread);
5,715✔
173
                counters->flows_aside_pkt_inject += cnt;
5,715✔
174
                counters->flows_aside_needs_work++;
5,715✔
175
            }
5,715✔
176
        }
7,008✔
177

178
        /* no one is referring to this flow, removed from hash
179
         * so we can unlock it and pass it to the flow recycler */
180

181
        if (fw->output_thread_flow != NULL)
9,384✔
182
            (void)OutputFlowLog(tv, fw->output_thread_flow, f);
9,384✔
183

184
        FlowEndCountersUpdate(tv, &fw->fec, f);
9,384✔
185
        if (f->proto == IPPROTO_TCP && f->protoctx != NULL) {
9,384✔
186
            StatsCounterDecr(&tv->stats, fw->dtv->counter_tcp_active_sessions);
6,955✔
187
        }
6,955✔
188
        StatsCounterDecr(&tv->stats, fw->dtv->counter_flow_active);
9,384✔
189

190
        FlowClearMemory (f, f->protomap);
9,384✔
191
        FLOWLOCK_UNLOCK(f);
9,384✔
192

193
        if (fw->fls.spare_queue.len >= (FLOW_SPARE_POOL_BLOCK_SIZE * 2)) {
9,384✔
194
            FlowQueuePrivatePrependFlow(&ret_queue, f);
×
195
            if (ret_queue.len == FLOW_SPARE_POOL_BLOCK_SIZE) {
×
196
                FlowSparePoolReturnFlows(&ret_queue);
×
197
            }
×
198
        } else {
9,384✔
199
            FlowQueuePrivatePrependFlow(&fw->fls.spare_queue, f);
9,384✔
200
        }
9,384✔
201

202
        if (max_work != 0 && ++i == max_work)
9,384✔
203
            break;
23✔
204
    }
9,384✔
205
    if (ret_queue.len > 0) {
9,361✔
206
        FlowSparePoolReturnFlows(&ret_queue);
×
207
    }
×
208

209
    StatsCounterAddI64(&tv->stats, fw->cnt.flows_removed, (int64_t)i);
9,361✔
210
}
9,361✔
211

212
/** \brief handle flow for packet
213
 *
214
 *  Handle flow creation/lookup
215
 */
216
static inline TmEcode FlowUpdate(ThreadVars *tv, FlowWorkerThreadData *fw, Packet *p)
217
{
1,260,570✔
218
    FlowHandlePacketUpdate(p->flow, p, tv, fw->dtv);
1,260,570✔
219

220
    int state = p->flow->flow_state;
1,260,570✔
221
    switch (state) {
1,260,570✔
222
#ifdef CAPTURE_OFFLOAD
223
        case FLOW_STATE_CAPTURE_BYPASSED: {
224
            StatsCounterAddI64(&tv->stats, fw->both_bypass_pkts, 1);
225
            StatsCounterAddI64(&tv->stats, fw->both_bypass_bytes, GET_PKT_LEN(p));
226
            Flow *f = p->flow;
227
            FlowDeReference(&p->flow);
228
            FLOWLOCK_UNLOCK(f);
229
            return TM_ECODE_DONE;
230
        }
231
#endif
232
        case FLOW_STATE_LOCAL_BYPASSED: {
204✔
233
            StatsCounterAddI64(&tv->stats, fw->local_bypass_pkts, 1);
204✔
234
            StatsCounterAddI64(&tv->stats, fw->local_bypass_bytes, GET_PKT_LEN(p));
204✔
235
            Flow *f = p->flow;
204✔
236
            FlowDeReference(&p->flow);
204✔
237
            FLOWLOCK_UNLOCK(f);
204✔
238
            return TM_ECODE_DONE;
204✔
239
        }
×
240
        default:
1,260,366✔
241
            return TM_ECODE_OK;
1,260,366✔
242
    }
1,260,570✔
243
}
1,260,570✔
244

245
static TmEcode FlowWorkerThreadDeinit(ThreadVars *tv, void *data);
246

247
static TmEcode FlowWorkerThreadInit(ThreadVars *tv, const void *initdata, void **data)
248
{
1✔
249
    FlowWorkerThreadData *fw = SCCalloc(1, sizeof(*fw));
1✔
250
    if (fw == NULL)
1✔
251
        return TM_ECODE_FAILED;
×
252

253
    SC_ATOMIC_INITPTR(fw->detect_thread);
1✔
254
    SC_ATOMIC_SET(fw->detect_thread, NULL);
1✔
255

256
    fw->local_bypass_pkts = StatsRegisterCounter("flow_bypassed.local_pkts", &tv->stats);
1✔
257
    fw->local_bypass_bytes = StatsRegisterCounter("flow_bypassed.local_bytes", &tv->stats);
1✔
258
    fw->both_bypass_pkts = StatsRegisterCounter("flow_bypassed.local_capture_pkts", &tv->stats);
1✔
259
    fw->both_bypass_bytes = StatsRegisterCounter("flow_bypassed.local_capture_bytes", &tv->stats);
1✔
260

261
    fw->cnt.flows_aside_needs_work =
1✔
262
            StatsRegisterCounter("flow.wrk.flows_evicted_needs_work", &tv->stats);
1✔
263
    fw->cnt.flows_aside_pkt_inject =
1✔
264
            StatsRegisterCounter("flow.wrk.flows_evicted_pkt_inject", &tv->stats);
1✔
265
    fw->cnt.flows_removed = StatsRegisterCounter("flow.wrk.flows_evicted", &tv->stats);
1✔
266
    fw->cnt.flows_injected = StatsRegisterCounter("flow.wrk.flows_injected", &tv->stats);
1✔
267
    fw->cnt.flows_injected_max = StatsRegisterMaxCounter("flow.wrk.flows_injected_max", &tv->stats);
1✔
268

269
    fw->fls.dtv = fw->dtv = DecodeThreadVarsAlloc(tv);
1✔
270
    if (fw->dtv == NULL) {
1✔
271
        FlowWorkerThreadDeinit(tv, fw);
×
272
        return TM_ECODE_FAILED;
×
273
    }
×
274

275
    /* setup TCP */
276
    if (StreamTcpThreadInit(tv, NULL, &fw->stream_thread_ptr) != TM_ECODE_OK) {
1✔
277
        FlowWorkerThreadDeinit(tv, fw);
×
278
        return TM_ECODE_FAILED;
×
279
    }
×
280

281
    if (DetectEngineEnabled()) {
1✔
282
        /* setup DETECT */
283
        void *detect_thread = NULL;
1✔
284
        if (DetectEngineThreadCtxInit(tv, NULL, &detect_thread) != TM_ECODE_OK) {
1✔
285
            FlowWorkerThreadDeinit(tv, fw);
×
286
            return TM_ECODE_FAILED;
×
287
        }
×
288
        SC_ATOMIC_SET(fw->detect_thread, detect_thread);
1✔
289
    }
1✔
290

291
    /* Setup outputs for this thread. */
292
    if (OutputLoggerThreadInit(tv, initdata, &fw->output_thread) != TM_ECODE_OK) {
1✔
293
        FlowWorkerThreadDeinit(tv, fw);
×
294
        return TM_ECODE_FAILED;
×
295
    }
×
296
    if (OutputFlowLogThreadInit(tv, &fw->output_thread_flow) != TM_ECODE_OK) {
1✔
297
        SCLogError("initializing flow log API for thread failed");
×
298
        FlowWorkerThreadDeinit(tv, fw);
×
299
        return TM_ECODE_FAILED;
×
300
    }
×
301

302
    DecodeRegisterPerfCounters(fw->dtv, tv);
1✔
303
    AppLayerRegisterThreadCounters(tv);
1✔
304
    FlowEndCountersRegister(tv, &fw->fec);
1✔
305

306
    /* setup pq for stream end pkts */
307
    memset(&fw->pq, 0, sizeof(PacketQueueNoLock));
1✔
308
    *data = fw;
1✔
309
    return TM_ECODE_OK;
1✔
310
}
1✔
311

312
static TmEcode FlowWorkerThreadDeinit(ThreadVars *tv, void *data)
UNCOV
313
{
×
UNCOV
314
    FlowWorkerThreadData *fw = data;
×
315

UNCOV
316
    DecodeThreadVarsFree(tv, fw->dtv);
×
317

318
    /* free TCP */
UNCOV
319
    StreamTcpThreadDeinit(tv, (void *)fw->stream_thread);
×
320

321
    /* free DETECT */
UNCOV
322
    void *detect_thread = SC_ATOMIC_GET(fw->detect_thread);
×
UNCOV
323
    if (detect_thread != NULL) {
×
UNCOV
324
        DetectEngineThreadCtxDeinit(tv, detect_thread);
×
UNCOV
325
        SC_ATOMIC_SET(fw->detect_thread, NULL);
×
UNCOV
326
    }
×
327

328
    /* Free output. */
UNCOV
329
    OutputLoggerThreadDeinit(tv, fw->output_thread);
×
UNCOV
330
    OutputFlowLogThreadDeinit(tv, fw->output_thread_flow);
×
331

332
    /* free pq */
UNCOV
333
    BUG_ON(fw->pq.len);
×
334

UNCOV
335
    Flow *f;
×
UNCOV
336
    while ((f = FlowQueuePrivateGetFromTop(&fw->fls.spare_queue)) != NULL) {
×
UNCOV
337
        FlowFree(f);
×
UNCOV
338
    }
×
339

UNCOV
340
    SCFree(fw);
×
UNCOV
341
    return TM_ECODE_OK;
×
UNCOV
342
}
×
343

344
TmEcode Detect(ThreadVars *tv, Packet *p, void *data);
345
TmEcode StreamTcp (ThreadVars *, Packet *, void *, PacketQueueNoLock *pq);
346

347
static inline void UpdateCounters(ThreadVars *tv,
348
        FlowWorkerThreadData *fw, const FlowTimeoutCounters *counters)
349
{
9,361✔
350
    if (counters->flows_aside_needs_work) {
9,361✔
351
        StatsCounterAddI64(&tv->stats, fw->cnt.flows_aside_needs_work,
5,712✔
352
                (int64_t)counters->flows_aside_needs_work);
5,712✔
353
    }
5,712✔
354
    if (counters->flows_aside_pkt_inject) {
9,361✔
355
        StatsCounterAddI64(&tv->stats, fw->cnt.flows_aside_pkt_inject,
5,712✔
356
                (int64_t)counters->flows_aside_pkt_inject);
5,712✔
357
    }
5,712✔
358
}
9,361✔
359

360
/** \brief update stream engine
361
 *
362
 *  We can be called from both the flow timeout path as well as from the
363
 *  "real" traffic path. If in the timeout path any additional packets we
364
 *  forge for flushing pipelines should not leave our scope. If the original
365
 *  packet is real (or related to a real packet) we need to push the packets
366
 *  on, so IPS logic stays valid.
367
 */
368
static inline void FlowWorkerStreamTCPUpdate(ThreadVars *tv, FlowWorkerThreadData *fw, Packet *p,
369
        DetectEngineThreadCtx *det_ctx, const bool timeout)
370
{
1,146,929✔
371
    if (det_ctx != NULL && det_ctx->de_ctx->PreStreamHook != NULL) {
1,146,929✔
UNCOV
372
        const uint8_t action = det_ctx->de_ctx->PreStreamHook(tv, det_ctx, p);
×
UNCOV
373
        if (action & ACTION_DROP) {
×
UNCOV
374
            PacketDrop(p, ACTION_DROP, PKT_DROP_REASON_STREAM_PRE_HOOK);
×
UNCOV
375
            return;
×
UNCOV
376
        }
×
UNCOV
377
    }
×
378

379
    FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_STREAM);
1,146,929✔
380
    StreamTcp(tv, p, fw->stream_thread, &fw->pq);
1,146,929✔
381
    FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_STREAM);
1,146,929✔
382

383
    // this is the first packet that sets no payload inspection
384
    bool setting_nopayload =
1,146,929✔
385
            p->flow->alparser &&
1,146,929✔
386
            SCAppLayerParserStateIssetFlag(p->flow->alparser, APP_LAYER_PARSER_NO_INSPECTION) &&
1,146,929✔
387
            !(p->flags & PKT_NOPAYLOAD_INSPECTION);
1,146,929✔
388
    if (FlowChangeProto(p->flow) || setting_nopayload) {
1,146,929✔
389
        StreamTcpDetectLogFlush(tv, fw->stream_thread, p->flow, p, &fw->pq);
3,867✔
390
        if (setting_nopayload) {
3,867✔
391
            FlowSetNoPayloadInspectionFlag(p->flow);
34✔
392
        }
34✔
393
        SCAppLayerParserStateSetFlag(p->flow->alparser, APP_LAYER_PARSER_EOF_TS);
3,867✔
394
        SCAppLayerParserStateSetFlag(p->flow->alparser, APP_LAYER_PARSER_EOF_TC);
3,867✔
395
    }
3,867✔
396

397
    /* Packets here can safely access p->flow as it's locked */
398
    SCLogDebug("packet %" PRIu64 ": extra packets %u", PcapPacketCntGet(p), fw->pq.len);
1,146,929✔
399
    Packet *x;
1,146,929✔
400
    while ((x = PacketDequeueNoLock(&fw->pq))) {
1,154,663✔
401
        SCLogDebug("packet %" PRIu64 " extra packet %p", PcapPacketCntGet(p), x);
7,734✔
402

403
        if (det_ctx != NULL) {
7,734✔
404
            FLOWWORKER_PROFILING_START(x, PROFILE_FLOWWORKER_DETECT);
7,734✔
405
            Detect(tv, x, det_ctx);
7,734✔
406
            FLOWWORKER_PROFILING_END(x, PROFILE_FLOWWORKER_DETECT);
7,734✔
407
        }
7,734✔
408

409
        OutputLoggerLog(tv, x, fw->output_thread);
7,734✔
410

411
        FramesPrune(x->flow, x);
7,734✔
412
        /*  Release tcp segments. Done here after alerting can use them. */
413
        FLOWWORKER_PROFILING_START(x, PROFILE_FLOWWORKER_TCPPRUNE);
7,734✔
414
        StreamTcpPruneSession(
7,734✔
415
                x->flow, x->flowflags & FLOW_PKT_TOSERVER ? STREAM_TOSERVER : STREAM_TOCLIENT);
7,734✔
416
        FLOWWORKER_PROFILING_END(x, PROFILE_FLOWWORKER_TCPPRUNE);
7,734✔
417

418
        /* no need to keep a flow ref beyond this point */
419
        FlowDeReference(&x->flow);
7,734✔
420

421
        /* no further work to do for this pseudo packet, so we can return
422
         * it to the pool immediately. */
423
        if (timeout) {
7,734✔
424
            PacketPoolReturnPacket(x);
26✔
425
        } else {
7,708✔
426
            /* to support IPS verdict logic, in the non-timeout case we need to do a bit more */
427
            TmqhOutputPacketpool(tv, x);
7,708✔
428
        }
7,708✔
429
    }
7,734✔
430
    if (FlowChangeProto(p->flow) && p->flow->flags & FLOW_ACTION_DROP) {
1,146,929✔
431
        // in case f->flags & FLOW_ACTION_DROP was set by one of the dequeued packets
432
        PacketDrop(p, ACTION_DROP, PKT_DROP_REASON_FLOW_DROP);
27✔
433
    }
27✔
434
}
1,146,929✔
435

436
static void FlowWorkerFlowTimeout(
437
        ThreadVars *tv, Packet *p, FlowWorkerThreadData *fw, DetectEngineThreadCtx *det_ctx)
438
{
10,851✔
439
    DEBUG_VALIDATE_BUG_ON(p->pkt_src != PKT_SRC_FFR);
10,851✔
440

441
    SCLogDebug("packet %" PRIu64 " is TCP. Direction %s", PcapPacketCntGet(p),
10,851✔
442
            PKT_IS_TOSERVER(p) ? "TOSERVER" : "TOCLIENT");
10,851✔
443
    DEBUG_VALIDATE_BUG_ON(!(p->flow && PacketIsTCP(p)));
10,851✔
444
    DEBUG_ASSERT_FLOW_LOCKED(p->flow);
10,851✔
445

446
    /* handle TCP and app layer */
447
    FlowWorkerStreamTCPUpdate(tv, fw, p, det_ctx, true);
10,851✔
448

449
    PacketUpdateEngineEventCounters(tv, fw->dtv, p);
10,851✔
450

451
    /* handle Detect */
452
    SCLogDebug("packet %" PRIu64 " calling Detect", PcapPacketCntGet(p));
10,851✔
453
    if (det_ctx != NULL) {
10,851✔
454
        FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_DETECT);
10,851✔
455
        Detect(tv, p, det_ctx);
10,851✔
456
        FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_DETECT);
10,851✔
457
    }
10,851✔
458

459
    // Outputs.
460
    OutputLoggerLog(tv, p, fw->output_thread);
10,851✔
461

462
    FramesPrune(p->flow, p);
10,851✔
463

464
    /*  Release tcp segments. Done here after alerting can use them. */
465
    FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_TCPPRUNE);
10,851✔
466
    StreamTcpPruneSession(p->flow, p->flowflags & FLOW_PKT_TOSERVER ?
10,851✔
467
            STREAM_TOSERVER : STREAM_TOCLIENT);
5,501✔
468
    FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_TCPPRUNE);
10,851✔
469

470
    /* run tx cleanup last */
471
    AppLayerParserTransactionsCleanup(p->flow, STREAM_FLAGS_FOR_PACKET(p));
10,851✔
472

473
    FlowDeReference(&p->flow);
10,851✔
474
    /* flow is unlocked later in FlowFinish() */
475
}
10,851✔
476

477
/** \internal
478
 *  \brief process flows injected into our queue by other threads
479
 */
480
static inline void FlowWorkerProcessInjectedFlows(
481
        ThreadVars *tv, FlowWorkerThreadData *fw, Packet *p)
482
{
1,436,138✔
483
    /* take injected flows and append to our work queue */
484
    FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_FLOW_INJECTED);
1,436,138✔
485
    FlowQueuePrivate injected = { NULL, NULL, 0 };
1,436,138✔
486
    if (SC_ATOMIC_GET(tv->flow_queue->non_empty))
1,436,138✔
UNCOV
487
        injected = FlowQueueExtractPrivate(tv->flow_queue);
×
488
    if (injected.len > 0) {
1,436,138✔
UNCOV
489
        StatsCounterAddI64(&tv->stats, fw->cnt.flows_injected, (int64_t)injected.len);
×
UNCOV
490
        if (p->pkt_src == PKT_SRC_WIRE)
×
UNCOV
491
            StatsCounterMaxUpdateI64(&tv->stats, fw->cnt.flows_injected_max, (int64_t)injected.len);
×
492

493
        /* move to local queue so we can process over the course of multiple packets */
UNCOV
494
        FlowQueuePrivateAppendPrivate(&fw->fls.work_queue, &injected);
×
UNCOV
495
    }
×
496
    FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_FLOW_INJECTED);
1,436,138✔
497
}
1,436,138✔
498

499
/** \internal
500
 *  \brief process flows set aside locally during flow lookup
501
 */
502
static inline void FlowWorkerProcessLocalFlows(ThreadVars *tv, FlowWorkerThreadData *fw, Packet *p)
503
{
1,436,138✔
504
    uint32_t max_work = 2;
1,436,138✔
505
    if (p->pkt_src == PKT_SRC_SHUTDOWN_FLUSH || p->pkt_src == PKT_SRC_CAPTURE_TIMEOUT)
1,436,138✔
UNCOV
506
        max_work = 0;
×
507

508
    FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_FLOW_EVICTED);
1,436,138✔
509
    if (fw->fls.work_queue.len) {
1,436,138✔
510
        FlowTimeoutCounters counters = { 0, 0, };
9,361✔
511
        CheckWorkQueue(tv, fw, &counters, &fw->fls.work_queue, max_work);
9,361✔
512
        UpdateCounters(tv, fw, &counters);
9,361✔
513
    }
9,361✔
514
    FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_FLOW_EVICTED);
1,436,138✔
515
}
1,436,138✔
516

517
/** \internal
518
 *  \brief apply Packet::app_update_direction to the flow flags
519
 */
520
static void PacketAppUpdate2FlowFlags(Packet *p)
521
{
1,205,438✔
522
    switch ((enum StreamUpdateDir)p->app_update_direction) {
1,205,438✔
523
        case UPDATE_DIR_NONE: // NONE implies pseudo packet
672,835✔
524
            SCLogDebug("pcap_cnt %" PRIu64 ", UPDATE_DIR_NONE", PcapPacketCntGet(p));
672,835✔
525
            break;
672,835✔
526
        case UPDATE_DIR_PACKET:
58,782✔
527
            if (PKT_IS_TOSERVER(p)) {
58,782✔
528
                p->flow->flags |= FLOW_TS_APP_UPDATED;
46,902✔
529
                SCLogDebug("pcap_cnt %" PRIu64 ", FLOW_TS_APP_UPDATED set", PcapPacketCntGet(p));
46,902✔
530
            } else {
46,902✔
531
                p->flow->flags |= FLOW_TC_APP_UPDATED;
11,880✔
532
                SCLogDebug("pcap_cnt %" PRIu64 ", FLOW_TC_APP_UPDATED set", PcapPacketCntGet(p));
11,880✔
533
            }
11,880✔
534
            break;
58,782✔
535
        case UPDATE_DIR_BOTH:
3,700✔
536
            if (PKT_IS_TOSERVER(p)) {
3,700✔
537
                p->flow->flags |= FLOW_TS_APP_UPDATED | FLOW_TC_APP_UPDATE_NEXT;
2,086✔
538
                SCLogDebug("pcap_cnt %" PRIu64 ", FLOW_TS_APP_UPDATED|FLOW_TC_APP_UPDATE_NEXT set",
2,086✔
539
                        PcapPacketCntGet(p));
2,086✔
540
            } else {
2,086✔
541
                p->flow->flags |= FLOW_TC_APP_UPDATED | FLOW_TS_APP_UPDATE_NEXT;
1,614✔
542
                SCLogDebug("pcap_cnt %" PRIu64 ", FLOW_TC_APP_UPDATED|FLOW_TS_APP_UPDATE_NEXT set",
1,614✔
543
                        PcapPacketCntGet(p));
1,614✔
544
            }
1,614✔
545
            /* fall through */
546
        case UPDATE_DIR_OPPOSING:
473,821✔
547
            if (PKT_IS_TOSERVER(p)) {
473,821✔
548
                p->flow->flags |= FLOW_TC_APP_UPDATED | FLOW_TS_APP_UPDATE_NEXT;
214,816✔
549
                SCLogDebug("pcap_cnt %" PRIu64 ", FLOW_TC_APP_UPDATED|FLOW_TS_APP_UPDATE_NEXT set",
214,816✔
550
                        PcapPacketCntGet(p));
214,816✔
551
            } else {
259,005✔
552
                p->flow->flags |= FLOW_TS_APP_UPDATED | FLOW_TC_APP_UPDATE_NEXT;
259,005✔
553
                SCLogDebug("pcap_cnt %" PRIu64 ", FLOW_TS_APP_UPDATED|FLOW_TC_APP_UPDATE_NEXT set",
259,005✔
554
                        PcapPacketCntGet(p));
259,005✔
555
            }
259,005✔
556
            break;
473,821✔
557
    }
1,205,438✔
558
}
1,205,438✔
559

560
static TmEcode FlowWorker(ThreadVars *tv, Packet *p, void *data)
561
{
1,436,138✔
562
    FlowWorkerThreadData *fw = data;
1,436,138✔
563
    DetectEngineThreadCtx *det_ctx = SC_ATOMIC_GET(fw->detect_thread);
1,436,138✔
564

565
    DEBUG_VALIDATE_BUG_ON(p == NULL);
1,436,138✔
566
    DEBUG_VALIDATE_BUG_ON(tv->flow_queue == NULL);
1,436,138✔
567

568
    SCLogDebug("packet %" PRIu64, PcapPacketCntGet(p));
1,436,138✔
569

570
    if ((PKT_IS_FLUSHPKT(p))) {
1,436,138✔
571
        SCLogDebug("thread %s flushing", tv->printable_name);
×
572
        OutputLoggerFlush(tv, p, fw->output_thread);
×
573
        /* Ack if a flush was requested */
574
        bool notset = false;
×
575
        SC_ATOMIC_CAS(&fw->flush_ack, notset, true);
×
576
        return TM_ECODE_OK;
×
577
    }
×
578

579
    /* handle Flow */
580
    if (det_ctx != NULL && det_ctx->de_ctx->PreFlowHook != NULL) {
1,436,138✔
UNCOV
581
        const uint8_t action = det_ctx->de_ctx->PreFlowHook(tv, det_ctx, p);
×
UNCOV
582
        if (action & ACTION_DROP) {
×
UNCOV
583
            PacketDrop(p, ACTION_DROP, PKT_DROP_REASON_FLOW_PRE_HOOK);
×
UNCOV
584
            goto pre_flow_drop;
×
UNCOV
585
        }
×
UNCOV
586
    }
×
587

588
    if (p->flags & PKT_WANTS_FLOW) {
1,436,138✔
589
        FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_FLOW);
1,260,625✔
590

591
        FlowHandlePacket(tv, &fw->fls, p);
1,260,625✔
592
        if (likely(p->flow != NULL)) {
1,260,625✔
593
            DEBUG_ASSERT_FLOW_LOCKED(p->flow);
1,260,570✔
594
            if (FlowUpdate(tv, fw, p) == TM_ECODE_DONE) {
1,260,570✔
595
                /* update time */
596
                if (!(PKT_IS_PSEUDOPKT(p))) {
204✔
597
                    TimeSetByThread(tv->id, p->ts);
204✔
598
                }
204✔
599
                goto housekeeping;
204✔
600
            }
204✔
601
        }
1,260,570✔
602
        /* Flow is now LOCKED */
603

604
        FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_FLOW);
1,260,421✔
605

606
    /* if PKT_WANTS_FLOW is not set, but PKT_HAS_FLOW is, then this is a
607
     * pseudo packet created by the flow manager. */
608
    } else if (p->flags & PKT_HAS_FLOW) {
1,260,421✔
609
        FLOWLOCK_WRLOCK(p->flow);
×
610
        DEBUG_VALIDATE_BUG_ON(p->pkt_src != PKT_SRC_FFR);
×
611
    }
×
612

613
    /* update time */
614
    if (!(PKT_IS_PSEUDOPKT(p))) {
1,435,934✔
615
        TimeSetByThread(tv->id, p->ts);
1,435,934✔
616
    }
1,435,934✔
617

618
    SCLogDebug("packet %" PRIu64 " has flow? %s", PcapPacketCntGet(p), p->flow ? "yes" : "no");
1,435,934✔
619

620
    /* handle TCP and app layer */
621
    if (p->flow) {
1,435,934✔
622
        SCLogDebug("packet %" PRIu64
1,260,366✔
623
                   ": direction %s FLOW_TS_APP_UPDATE_NEXT %s FLOW_TC_APP_UPDATE_NEXT %s",
1,260,366✔
624
                PcapPacketCntGet(p), PKT_IS_TOSERVER(p) ? "toserver" : "toclient",
1,260,366✔
625
                BOOL2STR((p->flow->flags & FLOW_TS_APP_UPDATE_NEXT) != 0),
1,260,366✔
626
                BOOL2STR((p->flow->flags & FLOW_TC_APP_UPDATE_NEXT) != 0));
1,260,366✔
627
        /* see if need to consider flags set by prev packets */
628
        if (PKT_IS_TOSERVER(p) && (p->flow->flags & FLOW_TS_APP_UPDATE_NEXT)) {
1,260,366✔
629
            p->flow->flags |= FLOW_TS_APP_UPDATED;
204,460✔
630
            p->flow->flags &= ~FLOW_TS_APP_UPDATE_NEXT;
204,460✔
631
            SCLogDebug("FLOW_TS_APP_UPDATED");
204,460✔
632
        } else if (PKT_IS_TOCLIENT(p) && (p->flow->flags & FLOW_TC_APP_UPDATE_NEXT)) {
1,055,906✔
633
            p->flow->flags |= FLOW_TC_APP_UPDATED;
249,354✔
634
            p->flow->flags &= ~FLOW_TC_APP_UPDATE_NEXT;
249,354✔
635
            SCLogDebug("FLOW_TC_APP_UPDATED");
249,354✔
636
        }
249,354✔
637

638
        if (PacketIsTCP(p)) {
1,260,366✔
639
            SCLogDebug("packet %" PRIu64 " is TCP. Direction %s", PcapPacketCntGet(p),
1,136,078✔
640
                    PKT_IS_TOSERVER(p) ? "TOSERVER" : "TOCLIENT");
1,136,078✔
641
            DEBUG_ASSERT_FLOW_LOCKED(p->flow);
1,136,078✔
642

643
            /* if detect is disabled, we need to apply file flags to the flow
644
             * here on the first packet. */
645
            if (det_ctx == NULL &&
1,136,078✔
646
                    ((PKT_IS_TOSERVER(p) && (p->flowflags & FLOW_PKT_TOSERVER_FIRST)) ||
1,136,078✔
UNCOV
647
                            (PKT_IS_TOCLIENT(p) && (p->flowflags & FLOW_PKT_TOCLIENT_FIRST)))) {
×
UNCOV
648
                DisableDetectFlowFileFlags(p->flow);
×
UNCOV
649
            }
×
650

651
            FlowWorkerStreamTCPUpdate(tv, fw, p, det_ctx, false);
1,136,078✔
652
            PacketAppUpdate2FlowFlags(p);
1,136,078✔
653

654
            /* handle the app layer part of the UDP packet payload */
655
        } else if (p->proto == IPPROTO_UDP && !PacketCheckAction(p, ACTION_DROP)) {
1,136,078✔
656
            FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_APPLAYERUDP);
69,360✔
657
            AppLayerHandleUdp(tv, fw->stream_thread->ra_ctx->app_tctx, p, p->flow);
69,360✔
658
            FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_APPLAYERUDP);
69,360✔
659
            PacketAppUpdate2FlowFlags(p);
69,360✔
660
        }
69,360✔
661
    }
1,260,366✔
662

663
    PacketUpdateEngineEventCounters(tv, fw->dtv, p);
1,435,934✔
664

665
    /* handle Detect */
666
    DEBUG_ASSERT_FLOW_LOCKED(p->flow);
1,435,934✔
667
    SCLogDebug("packet %" PRIu64 " calling Detect", PcapPacketCntGet(p));
1,435,934✔
668
    if (det_ctx != NULL) {
1,435,934✔
669
        FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_DETECT);
1,435,934✔
670
        Detect(tv, p, det_ctx);
1,435,934✔
671
        FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_DETECT);
1,435,934✔
672
    }
1,435,934✔
673

674
pre_flow_drop:
1,435,934✔
675
    // Outputs.
676
    OutputLoggerLog(tv, p, fw->output_thread);
1,435,934✔
677

678
    /*  Release tcp segments. Done here after alerting can use them. */
679
    if (p->flow != NULL) {
1,435,934✔
680
        DEBUG_ASSERT_FLOW_LOCKED(p->flow);
1,260,366✔
681

682
        if (FlowIsBypassed(p->flow)) {
1,260,366✔
683
            FlowCleanupAppLayer(p->flow);
11✔
684
            if (p->proto == IPPROTO_TCP) {
11✔
UNCOV
685
                StreamTcpSessionCleanup(p->flow->protoctx);
×
UNCOV
686
            }
×
687
        } else if (p->proto == IPPROTO_TCP && p->flow->protoctx && p->flags & PKT_STREAM_EST) {
1,260,355✔
688
            if ((p->flow->flags & FLOW_TS_APP_UPDATED) && PKT_IS_TOSERVER(p)) {
1,057,548✔
689
                FramesPrune(p->flow, p);
342,738✔
690
            } else if ((p->flow->flags & FLOW_TC_APP_UPDATED) && PKT_IS_TOCLIENT(p)) {
714,810✔
691
                FramesPrune(p->flow, p);
335,600✔
692
            }
335,600✔
693
            FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_TCPPRUNE);
1,057,548✔
694
            StreamTcpPruneSession(p->flow, p->flowflags & FLOW_PKT_TOSERVER ?
1,057,548✔
695
                    STREAM_TOSERVER : STREAM_TOCLIENT);
547,065✔
696
            FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_TCPPRUNE);
1,057,548✔
697
        } else if (p->proto == IPPROTO_UDP) {
1,057,548✔
698
            FramesPrune(p->flow, p);
69,373✔
699
        }
69,373✔
700

701
        if ((PKT_IS_PSEUDOPKT(p)) ||
1,260,366✔
702
                (p->flow->flags & (FLOW_TS_APP_UPDATED | FLOW_TC_APP_UPDATED))) {
1,260,366✔
703
            if ((p->flags & PKT_STREAM_EST) || p->proto != IPPROTO_TCP) {
805,636✔
704
                if (PKT_IS_TOSERVER(p)) {
797,925✔
705
                    if (PKT_IS_PSEUDOPKT(p) || (p->flow->flags & (FLOW_TS_APP_UPDATED))) {
411,787✔
706
                        AppLayerParserTransactionsCleanup(p->flow, STREAM_TOSERVER);
388,949✔
707
                        p->flow->flags &= ~FLOW_TS_APP_UPDATED;
388,949✔
708
                        SCLogDebug("~FLOW_TS_APP_UPDATED");
388,949✔
709
                    }
388,949✔
710
                } else {
411,787✔
711
                    if (PKT_IS_PSEUDOPKT(p) || (p->flow->flags & (FLOW_TC_APP_UPDATED))) {
386,138✔
712
                        AppLayerParserTransactionsCleanup(p->flow, STREAM_TOCLIENT);
346,956✔
713
                        p->flow->flags &= ~FLOW_TC_APP_UPDATED;
346,956✔
714
                        SCLogDebug("~FLOW_TC_APP_UPDATED");
346,956✔
715
                    }
346,956✔
716
                }
386,138✔
717
            }
797,925✔
718
        } else {
805,636✔
719
            SCLogDebug("not pseudo, no app update: skip");
454,730✔
720
        }
454,730✔
721

722
        if (p->flow->flags & FLOW_ACTION_DROP) {
1,260,366✔
723
            SCLogDebug("flow drop in place: remove app update flags");
3,092✔
724
            p->flow->flags &= ~(FLOW_TS_APP_UPDATED | FLOW_TC_APP_UPDATED);
3,092✔
725
        }
3,092✔
726

727
        Flow *f = p->flow;
1,260,366✔
728
        FlowDeReference(&p->flow);
1,260,366✔
729
        FLOWLOCK_UNLOCK(f);
1,260,366✔
730
    }
1,260,366✔
731

732
housekeeping:
1,436,138✔
733

734
    /* take injected flows and add them to our local queue */
735
    FlowWorkerProcessInjectedFlows(tv, fw, p);
1,436,138✔
736

737
    /* process local work queue */
738
    FlowWorkerProcessLocalFlows(tv, fw, p);
1,436,138✔
739

740
    return TM_ECODE_OK;
1,436,138✔
741
}
1,435,934✔
742

743
void FlowWorkerReplaceDetectCtx(void *flow_worker, void *detect_ctx)
744
{
35,437✔
745
    FlowWorkerThreadData *fw = flow_worker;
35,437✔
746

747
    SC_ATOMIC_SET(fw->detect_thread, detect_ctx);
35,437✔
748
}
35,437✔
749

750
void *FlowWorkerGetDetectCtxPtr(void *flow_worker)
751
{
35,437✔
752
    FlowWorkerThreadData *fw = flow_worker;
35,437✔
753

754
    return SC_ATOMIC_GET(fw->detect_thread);
35,437✔
755
}
35,437✔
756

757
void *FlowWorkerGetThreadData(void *flow_worker)
758
{
×
759
    return (FlowWorkerThreadData *)flow_worker;
×
760
}
×
761

762
bool FlowWorkerGetFlushAck(void *flow_worker)
763
{
×
764
    FlowWorkerThreadData *fw = flow_worker;
×
765
    return SC_ATOMIC_GET(fw->flush_ack) == true;
×
766
}
×
767

768
void FlowWorkerSetFlushAck(void *flow_worker)
769
{
×
770
    FlowWorkerThreadData *fw = flow_worker;
×
771
    SC_ATOMIC_SET(fw->flush_ack, false);
×
772
}
×
773

774
const char *ProfileFlowWorkerIdToString(enum ProfileFlowWorkerId fwi)
775
{
×
776
    switch (fwi) {
×
777
        case PROFILE_FLOWWORKER_FLOW:
×
778
            return "flow";
×
779
        case PROFILE_FLOWWORKER_STREAM:
×
780
            return "stream";
×
781
        case PROFILE_FLOWWORKER_APPLAYERUDP:
×
782
            return "app-layer";
×
783
        case PROFILE_FLOWWORKER_DETECT:
×
784
            return "detect";
×
785
        case PROFILE_FLOWWORKER_TCPPRUNE:
×
786
            return "tcp-prune";
×
787
        case PROFILE_FLOWWORKER_FLOW_INJECTED:
×
788
            return "flow-inject";
×
789
        case PROFILE_FLOWWORKER_FLOW_EVICTED:
×
790
            return "flow-evict";
×
791
        case PROFILE_FLOWWORKER_SIZE:
×
792
            return "size";
×
793
    }
×
794
    return "error";
×
795
}
×
796

797
static bool FlowWorkerIsBusy(ThreadVars *tv, void *flow_worker)
UNCOV
798
{
×
UNCOV
799
    FlowWorkerThreadData *fw = flow_worker;
×
UNCOV
800
    if (fw->pq.len)
×
801
        return true;
×
UNCOV
802
    if (fw->fls.work_queue.len)
×
UNCOV
803
        return true;
×
804

UNCOV
805
    if (tv->flow_queue) {
×
UNCOV
806
        FQLOCK_LOCK(tv->flow_queue);
×
UNCOV
807
        bool fq_done = (tv->flow_queue->qlen == 0);
×
UNCOV
808
        FQLOCK_UNLOCK(tv->flow_queue);
×
UNCOV
809
        if (!fq_done) {
×
UNCOV
810
            return true;
×
UNCOV
811
        }
×
UNCOV
812
    }
×
813

UNCOV
814
    return false;
×
UNCOV
815
}
×
816

817
void TmModuleFlowWorkerRegister (void)
818
{
2✔
819
    tmm_modules[TMM_FLOWWORKER].name = "FlowWorker";
2✔
820
    tmm_modules[TMM_FLOWWORKER].ThreadInit = FlowWorkerThreadInit;
2✔
821
    tmm_modules[TMM_FLOWWORKER].Func = FlowWorker;
2✔
822
    tmm_modules[TMM_FLOWWORKER].ThreadBusy = FlowWorkerIsBusy;
2✔
823
    tmm_modules[TMM_FLOWWORKER].ThreadDeinit = FlowWorkerThreadDeinit;
2✔
824
    tmm_modules[TMM_FLOWWORKER].cap_flags = 0;
2✔
825
    tmm_modules[TMM_FLOWWORKER].flags = TM_FLAG_FLOWWORKER_TM;
2✔
826
}
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc