• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jasonish / suricata / 23105300094

15 Mar 2026 06:48AM UTC coverage: 75.784% (-0.7%) from 76.495%
23105300094

push

github

jasonish
github-ci: ubuntu minimal build fixups

- Don't run on the GitHub provided VM, it contains a newer Rust than
  stock Ubuntu does.

252836 of 333628 relevant lines covered (75.78%)

1978514.46 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.35
/src/flow-worker.c
1
/* Copyright (C) 2016-2024 Open Information Security Foundation
2
 *
3
 * You can copy, redistribute or modify this Program under the terms of
4
 * the GNU General Public License version 2 as published by the Free
5
 * Software Foundation.
6
 *
7
 * This program is distributed in the hope that it will be useful,
8
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10
 * GNU General Public License for more details.
11
 *
12
 * You should have received a copy of the GNU General Public License
13
 * version 2 along with this program; if not, write to the Free Software
14
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15
 * 02110-1301, USA.
16
 */
17

18
/**
19
 * \file
20
 *
21
 * \author Victor Julien <victor@inliniac.net>
22
 *
23
 * Flow Workers are single thread modules taking care of (almost)
24
 * everything related to packets with flows:
25
 *
26
 * - Lookup/creation
27
 * - Stream tracking, reassembly
28
 * - Applayer update
29
 * - Detection
30
 *
31
 * This all while holding the flow lock.
32
 */
33

34
#include "suricata-common.h"
35
#include "suricata.h"
36

37
#include "action-globals.h"
38
#include "packet.h"
39
#include "decode.h"
40
#include "detect.h"
41
#include "stream-tcp.h"
42
#include "app-layer.h"
43
#include "detect-engine.h"
44
#include "output.h"
45
#include "app-layer-parser.h"
46
#include "app-layer-frames.h"
47

48
#include "util-profiling.h"
49
#include "util-validate.h"
50
#include "util-time.h"
51
#include "tmqh-packetpool.h"
52

53
#include "flow-util.h"
54
#include "flow-manager.h"
55
#include "flow-timeout.h"
56
#include "flow-spare-pool.h"
57
#include "flow-worker.h"
58

59
typedef DetectEngineThreadCtx *DetectEngineThreadCtxPtr;
60

61
typedef struct FlowTimeoutCounters {
62
    uint32_t flows_aside_needs_work;
63
    uint32_t flows_aside_pkt_inject;
64
} FlowTimeoutCounters;
65

66
typedef struct FlowWorkerThreadData_ {
67
    DecodeThreadVars *dtv;
68

69
    union {
70
        StreamTcpThread *stream_thread;
71
        void *stream_thread_ptr;
72
    };
73

74
    SC_ATOMIC_DECLARE(DetectEngineThreadCtxPtr, detect_thread);
75

76
    SC_ATOMIC_DECLARE(bool, flush_ack);
77

78
    void *output_thread; /* Output thread data. */
79
    void *output_thread_flow; /* Output thread data. */
80

81
    StatsCounterId local_bypass_pkts;
82
    StatsCounterId local_bypass_bytes;
83
    StatsCounterId both_bypass_pkts;
84
    StatsCounterId both_bypass_bytes;
85
    /** Queue to put pseudo packets that have been created by the stream (RST response) and by the
86
     * flush logic following a protocol change. */
87
    PacketQueueNoLock pq;
88
    FlowLookupStruct fls;
89

90
    struct {
91
        StatsCounterId flows_injected;
92
        StatsCounterMaxId flows_injected_max;
93
        StatsCounterId flows_removed;
94
        StatsCounterId flows_aside_needs_work;
95
        StatsCounterId flows_aside_pkt_inject;
96
    } cnt;
97
    FlowEndCounters fec;
98

99
} FlowWorkerThreadData;
100

101
static void FlowWorkerFlowTimeout(
102
        ThreadVars *tv, Packet *p, FlowWorkerThreadData *fw, DetectEngineThreadCtx *det_ctx);
103

104
/**
105
 * \internal
106
 * \brief Forces reassembly for flow if it needs it.
107
 *
108
 *        The function requires flow to be locked beforehand.
109
 *
110
 * \param f Pointer to the flow.
111
 *
112
 * \retval cnt number of packets injected
113
 */
114
static int FlowFinish(ThreadVars *tv, Flow *f, FlowWorkerThreadData *fw, void *detect_thread)
115
{
3,043✔
116
    const int server = f->ffr_tc;
3,043✔
117
    const int client = f->ffr_ts;
3,043✔
118
    int cnt = 0;
3,043✔
119

120
    /* Get the tcp session for the flow */
121
    const TcpSession *ssn = (TcpSession *)f->protoctx;
3,043✔
122

123
    /* insert a pseudo packet in the toserver direction */
124
    if (client == STREAM_HAS_UNPROCESSED_SEGMENTS_NEED_ONLY_DETECTION) {
3,043✔
125
        Packet *p = FlowPseudoPacketGet(0, f, ssn);
2,823✔
126
        if (p != NULL) {
2,823✔
127
            PKT_SET_SRC(p, PKT_SRC_FFR);
2,823✔
128
            if (server == STREAM_HAS_UNPROCESSED_SEGMENTS_NONE) {
2,823✔
129
                p->flowflags |= FLOW_PKT_LAST_PSEUDO;
53✔
130
            }
53✔
131
            FlowWorkerFlowTimeout(tv, p, fw, detect_thread);
2,823✔
132
            PacketPoolReturnPacket(p);
2,823✔
133
            cnt++;
2,823✔
134
        }
2,823✔
135
    }
2,823✔
136

137
    /* handle toclient */
138
    if (server == STREAM_HAS_UNPROCESSED_SEGMENTS_NEED_ONLY_DETECTION) {
3,043✔
139
        Packet *p = FlowPseudoPacketGet(1, f, ssn);
2,990✔
140
        if (p != NULL) {
2,990✔
141
            PKT_SET_SRC(p, PKT_SRC_FFR);
2,990✔
142
            p->flowflags |= FLOW_PKT_LAST_PSEUDO;
2,990✔
143
            FlowWorkerFlowTimeout(tv, p, fw, detect_thread);
2,990✔
144
            PacketPoolReturnPacket(p);
2,990✔
145
            f->flags |= FLOW_TIMEOUT_REASSEMBLY_DONE;
2,990✔
146
            cnt++;
2,990✔
147
        }
2,990✔
148
    }
2,990✔
149

150
    if (cnt > 0) {
3,043✔
151
        f->flags |= FLOW_TIMEOUT_REASSEMBLY_DONE;
3,043✔
152
    }
3,043✔
153
    return cnt;
3,043✔
154
}
3,043✔
155

156
/** \param[in] max_work Max flows to process. 0 if unlimited. */
157
static void CheckWorkQueue(ThreadVars *tv, FlowWorkerThreadData *fw, FlowTimeoutCounters *counters,
158
        FlowQueuePrivate *fq, const uint32_t max_work)
159
{
1,509✔
160
    FlowQueuePrivate ret_queue = { NULL, NULL, 0 };
1,509✔
161
    uint32_t i = 0;
1,509✔
162
    Flow *f;
1,509✔
163
    while ((f = FlowQueuePrivateGetFromTop(fq)) != NULL) {
5,138✔
164
        FLOWLOCK_WRLOCK(f);
3,658✔
165
        f->flow_end_flags |= FLOW_END_FLAG_TIMEOUT; //TODO emerg
3,658✔
166

167
        if (f->proto == IPPROTO_TCP) {
3,658✔
168
            if (!(f->flags & (FLOW_TIMEOUT_REASSEMBLY_DONE | FLOW_ACTION_DROP)) &&
3,107✔
169
                    !FlowIsBypassed(f) && FlowNeedsReassembly(f) && f->ffr != 0) {
3,107✔
170
                /* read detect thread in case we're doing a reload */
171
                void *detect_thread = SC_ATOMIC_GET(fw->detect_thread);
3,043✔
172
                int cnt = FlowFinish(tv, f, fw, detect_thread);
3,043✔
173
                counters->flows_aside_pkt_inject += cnt;
3,043✔
174
                counters->flows_aside_needs_work++;
3,043✔
175
            }
3,043✔
176
        }
3,107✔
177

178
        /* no one is referring to this flow, removed from hash
179
         * so we can unlock it and pass it to the flow recycler */
180

181
        if (fw->output_thread_flow != NULL)
3,658✔
182
            (void)OutputFlowLog(tv, fw->output_thread_flow, f);
3,658✔
183

184
        FlowEndCountersUpdate(tv, &fw->fec, f);
3,658✔
185
        if (f->proto == IPPROTO_TCP && f->protoctx != NULL) {
3,658✔
186
            StatsCounterDecr(&tv->stats, fw->dtv->counter_tcp_active_sessions);
3,102✔
187
        }
3,102✔
188
        StatsCounterDecr(&tv->stats, fw->dtv->counter_flow_active);
3,658✔
189

190
        FlowClearMemory (f, f->protomap);
3,658✔
191
        FLOWLOCK_UNLOCK(f);
3,658✔
192

193
        if (fw->fls.spare_queue.len >= (FLOW_SPARE_POOL_BLOCK_SIZE * 2)) {
3,658✔
194
            FlowQueuePrivatePrependFlow(&ret_queue, f);
×
195
            if (ret_queue.len == FLOW_SPARE_POOL_BLOCK_SIZE) {
×
196
                FlowSparePoolReturnFlows(&ret_queue);
×
197
            }
×
198
        } else {
3,658✔
199
            FlowQueuePrivatePrependFlow(&fw->fls.spare_queue, f);
3,658✔
200
        }
3,658✔
201

202
        if (max_work != 0 && ++i == max_work)
3,658✔
203
            break;
29✔
204
    }
3,658✔
205
    if (ret_queue.len > 0) {
1,509✔
206
        FlowSparePoolReturnFlows(&ret_queue);
×
207
    }
×
208

209
    StatsCounterAddI64(&tv->stats, fw->cnt.flows_removed, (int64_t)i);
1,509✔
210
}
1,509✔
211

212
/** \brief handle flow for packet
213
 *
214
 *  Handle flow creation/lookup
215
 */
216
static inline TmEcode FlowUpdate(ThreadVars *tv, FlowWorkerThreadData *fw, Packet *p)
217
{
1,587,334✔
218
    FlowHandlePacketUpdate(p->flow, p, tv, fw->dtv);
1,587,334✔
219

220
    int state = p->flow->flow_state;
1,587,334✔
221
    switch (state) {
1,587,334✔
222
#ifdef CAPTURE_OFFLOAD
223
        case FLOW_STATE_CAPTURE_BYPASSED: {
224
            StatsCounterAddI64(&tv->stats, fw->both_bypass_pkts, 1);
225
            StatsCounterAddI64(&tv->stats, fw->both_bypass_bytes, GET_PKT_LEN(p));
226
            Flow *f = p->flow;
227
            FlowDeReference(&p->flow);
228
            FLOWLOCK_UNLOCK(f);
229
            return TM_ECODE_DONE;
230
        }
231
#endif
232
        case FLOW_STATE_LOCAL_BYPASSED: {
1,360✔
233
            StatsCounterAddI64(&tv->stats, fw->local_bypass_pkts, 1);
1,360✔
234
            StatsCounterAddI64(&tv->stats, fw->local_bypass_bytes, GET_PKT_LEN(p));
1,360✔
235
            Flow *f = p->flow;
1,360✔
236
            FlowDeReference(&p->flow);
1,360✔
237
            FLOWLOCK_UNLOCK(f);
1,360✔
238
            return TM_ECODE_DONE;
1,360✔
239
        }
×
240
        default:
1,585,859✔
241
            return TM_ECODE_OK;
1,585,859✔
242
    }
1,587,334✔
243
}
1,587,334✔
244

245
static TmEcode FlowWorkerThreadDeinit(ThreadVars *tv, void *data);
246

247
static TmEcode FlowWorkerThreadInit(ThreadVars *tv, const void *initdata, void **data)
248
{
7,956✔
249
    FlowWorkerThreadData *fw = SCCalloc(1, sizeof(*fw));
7,956✔
250
    if (fw == NULL)
7,956✔
251
        return TM_ECODE_FAILED;
×
252

253
    SC_ATOMIC_INITPTR(fw->detect_thread);
7,956✔
254
    SC_ATOMIC_SET(fw->detect_thread, NULL);
7,956✔
255

256
    fw->local_bypass_pkts = StatsRegisterCounter("flow_bypassed.local_pkts", &tv->stats);
7,956✔
257
    fw->local_bypass_bytes = StatsRegisterCounter("flow_bypassed.local_bytes", &tv->stats);
7,956✔
258
    fw->both_bypass_pkts = StatsRegisterCounter("flow_bypassed.local_capture_pkts", &tv->stats);
7,956✔
259
    fw->both_bypass_bytes = StatsRegisterCounter("flow_bypassed.local_capture_bytes", &tv->stats);
7,956✔
260

261
    fw->cnt.flows_aside_needs_work =
7,956✔
262
            StatsRegisterCounter("flow.wrk.flows_evicted_needs_work", &tv->stats);
7,956✔
263
    fw->cnt.flows_aside_pkt_inject =
7,956✔
264
            StatsRegisterCounter("flow.wrk.flows_evicted_pkt_inject", &tv->stats);
7,956✔
265
    fw->cnt.flows_removed = StatsRegisterCounter("flow.wrk.flows_evicted", &tv->stats);
7,956✔
266
    fw->cnt.flows_injected = StatsRegisterCounter("flow.wrk.flows_injected", &tv->stats);
7,956✔
267
    fw->cnt.flows_injected_max = StatsRegisterMaxCounter("flow.wrk.flows_injected_max", &tv->stats);
7,956✔
268

269
    fw->fls.dtv = fw->dtv = DecodeThreadVarsAlloc(tv);
7,956✔
270
    if (fw->dtv == NULL) {
7,956✔
271
        FlowWorkerThreadDeinit(tv, fw);
×
272
        return TM_ECODE_FAILED;
×
273
    }
×
274

275
    /* setup TCP */
276
    if (StreamTcpThreadInit(tv, NULL, &fw->stream_thread_ptr) != TM_ECODE_OK) {
7,956✔
277
        FlowWorkerThreadDeinit(tv, fw);
×
278
        return TM_ECODE_FAILED;
×
279
    }
×
280

281
    if (DetectEngineEnabled()) {
7,956✔
282
        /* setup DETECT */
283
        void *detect_thread = NULL;
6,444✔
284
        if (DetectEngineThreadCtxInit(tv, NULL, &detect_thread) != TM_ECODE_OK) {
6,444✔
285
            FlowWorkerThreadDeinit(tv, fw);
×
286
            return TM_ECODE_FAILED;
×
287
        }
×
288
        SC_ATOMIC_SET(fw->detect_thread, detect_thread);
6,444✔
289
    }
6,444✔
290

291
    /* Setup outputs for this thread. */
292
    if (OutputLoggerThreadInit(tv, initdata, &fw->output_thread) != TM_ECODE_OK) {
7,956✔
293
        FlowWorkerThreadDeinit(tv, fw);
×
294
        return TM_ECODE_FAILED;
×
295
    }
×
296
    if (OutputFlowLogThreadInit(tv, &fw->output_thread_flow) != TM_ECODE_OK) {
7,956✔
297
        SCLogError("initializing flow log API for thread failed");
×
298
        FlowWorkerThreadDeinit(tv, fw);
×
299
        return TM_ECODE_FAILED;
×
300
    }
×
301

302
    DecodeRegisterPerfCounters(fw->dtv, tv);
7,956✔
303
    AppLayerRegisterThreadCounters(tv);
7,956✔
304
    FlowEndCountersRegister(tv, &fw->fec);
7,956✔
305

306
    /* setup pq for stream end pkts */
307
    memset(&fw->pq, 0, sizeof(PacketQueueNoLock));
7,956✔
308
    *data = fw;
7,956✔
309
    return TM_ECODE_OK;
7,956✔
310
}
7,956✔
311

312
static TmEcode FlowWorkerThreadDeinit(ThreadVars *tv, void *data)
313
{
7,956✔
314
    FlowWorkerThreadData *fw = data;
7,956✔
315

316
    DecodeThreadVarsFree(tv, fw->dtv);
7,956✔
317

318
    /* free TCP */
319
    StreamTcpThreadDeinit(tv, (void *)fw->stream_thread);
7,956✔
320

321
    /* free DETECT */
322
    void *detect_thread = SC_ATOMIC_GET(fw->detect_thread);
7,956✔
323
    if (detect_thread != NULL) {
7,956✔
324
        DetectEngineThreadCtxDeinit(tv, detect_thread);
6,444✔
325
        SC_ATOMIC_SET(fw->detect_thread, NULL);
6,444✔
326
    }
6,444✔
327

328
    /* Free output. */
329
    OutputLoggerThreadDeinit(tv, fw->output_thread);
7,956✔
330
    OutputFlowLogThreadDeinit(tv, fw->output_thread_flow);
7,956✔
331

332
    /* free pq */
333
    BUG_ON(fw->pq.len);
7,956✔
334

335
    Flow *f;
7,956✔
336
    while ((f = FlowQueuePrivateGetFromTop(&fw->fls.spare_queue)) != NULL) {
252,804✔
337
        FlowFree(f);
244,848✔
338
    }
244,848✔
339

340
    SCFree(fw);
7,956✔
341
    return TM_ECODE_OK;
7,956✔
342
}
7,956✔
343

344
TmEcode Detect(ThreadVars *tv, Packet *p, void *data);
345
TmEcode StreamTcp (ThreadVars *, Packet *, void *, PacketQueueNoLock *pq);
346

347
static inline void UpdateCounters(ThreadVars *tv,
348
        FlowWorkerThreadData *fw, const FlowTimeoutCounters *counters)
349
{
1,509✔
350
    if (counters->flows_aside_needs_work) {
1,509✔
351
        StatsCounterAddI64(&tv->stats, fw->cnt.flows_aside_needs_work,
899✔
352
                (int64_t)counters->flows_aside_needs_work);
899✔
353
    }
899✔
354
    if (counters->flows_aside_pkt_inject) {
1,509✔
355
        StatsCounterAddI64(&tv->stats, fw->cnt.flows_aside_pkt_inject,
899✔
356
                (int64_t)counters->flows_aside_pkt_inject);
899✔
357
    }
899✔
358
}
1,509✔
359

360
/** \brief update stream engine
361
 *
362
 *  We can be called from both the flow timeout path as well as from the
363
 *  "real" traffic path. If in the timeout path any additional packets we
364
 *  forge for flushing pipelines should not leave our scope. If the original
365
 *  packet is real (or related to a real packet) we need to push the packets
366
 *  on, so IPS logic stays valid.
367
 */
368
static inline void FlowWorkerStreamTCPUpdate(ThreadVars *tv, FlowWorkerThreadData *fw, Packet *p,
369
        DetectEngineThreadCtx *det_ctx, const bool timeout)
370
{
1,128,163✔
371
    if (det_ctx != NULL && det_ctx->de_ctx->PreStreamHook != NULL) {
1,128,163✔
372
        const uint8_t action = det_ctx->de_ctx->PreStreamHook(tv, det_ctx, p);
26✔
373
        if (action & ACTION_DROP) {
26✔
374
            PacketDrop(p, ACTION_DROP, PKT_DROP_REASON_STREAM_PRE_HOOK);
24✔
375
            return;
24✔
376
        }
24✔
377
    }
26✔
378

379
    FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_STREAM);
1,128,139✔
380
    StreamTcp(tv, p, fw->stream_thread, &fw->pq);
1,128,139✔
381
    FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_STREAM);
1,128,139✔
382

383
    // this is the first packet that sets no payload inspection
384
    bool setting_nopayload =
1,128,139✔
385
            p->flow->alparser &&
1,128,139✔
386
            SCAppLayerParserStateIssetFlag(p->flow->alparser, APP_LAYER_PARSER_NO_INSPECTION) &&
1,128,139✔
387
            !(p->flags & PKT_NOPAYLOAD_INSPECTION);
1,128,139✔
388
    if (FlowChangeProto(p->flow) || setting_nopayload) {
1,128,139✔
389
        StreamTcpDetectLogFlush(tv, fw->stream_thread, p->flow, p, &fw->pq);
2,658✔
390
        if (setting_nopayload) {
2,658✔
391
            FlowSetNoPayloadInspectionFlag(p->flow);
23✔
392
        }
23✔
393
        SCAppLayerParserStateSetFlag(p->flow->alparser, APP_LAYER_PARSER_EOF_TS);
2,658✔
394
        SCAppLayerParserStateSetFlag(p->flow->alparser, APP_LAYER_PARSER_EOF_TC);
2,658✔
395
    }
2,658✔
396

397
    /* Packets here can safely access p->flow as it's locked */
398
    SCLogDebug("packet %" PRIu64 ": extra packets %u", PcapPacketCntGet(p), fw->pq.len);
1,128,139✔
399
    Packet *x;
1,128,139✔
400
    while ((x = PacketDequeueNoLock(&fw->pq))) {
1,133,455✔
401
        SCLogDebug("packet %" PRIu64 " extra packet %p", PcapPacketCntGet(p), x);
5,316✔
402

403
        if (det_ctx != NULL) {
5,316✔
404
            FLOWWORKER_PROFILING_START(x, PROFILE_FLOWWORKER_DETECT);
2,244✔
405
            Detect(tv, x, det_ctx);
2,244✔
406
            FLOWWORKER_PROFILING_END(x, PROFILE_FLOWWORKER_DETECT);
2,244✔
407
        }
2,244✔
408

409
        OutputLoggerLog(tv, x, fw->output_thread);
5,316✔
410

411
        FramesPrune(x->flow, x);
5,316✔
412
        /*  Release tcp segments. Done here after alerting can use them. */
413
        FLOWWORKER_PROFILING_START(x, PROFILE_FLOWWORKER_TCPPRUNE);
5,316✔
414
        StreamTcpPruneSession(
5,316✔
415
                x->flow, x->flowflags & FLOW_PKT_TOSERVER ? STREAM_TOSERVER : STREAM_TOCLIENT);
5,316✔
416
        FLOWWORKER_PROFILING_END(x, PROFILE_FLOWWORKER_TCPPRUNE);
5,316✔
417

418
        /* no need to keep a flow ref beyond this point */
419
        FlowDeReference(&x->flow);
5,316✔
420

421
        /* no further work to do for this pseudo packet, so we can return
422
         * it to the pool immediately. */
423
        if (timeout) {
5,316✔
424
            PacketPoolReturnPacket(x);
26✔
425
        } else {
5,290✔
426
            /* to support IPS verdict logic, in the non-timeout case we need to do a bit more */
427
            TmqhOutputPacketpool(tv, x);
5,290✔
428
        }
5,290✔
429
    }
5,316✔
430
    if (FlowChangeProto(p->flow) && p->flow->flags & FLOW_ACTION_DROP) {
1,128,139✔
431
        // in case f->flags & FLOW_ACTION_DROP was set by one of the dequeued packets
432
        PacketDrop(p, ACTION_DROP, PKT_DROP_REASON_FLOW_DROP);
1✔
433
    }
1✔
434
}
1,128,139✔
435

436
static void FlowWorkerFlowTimeout(
437
        ThreadVars *tv, Packet *p, FlowWorkerThreadData *fw, DetectEngineThreadCtx *det_ctx)
438
{
5,813✔
439
    DEBUG_VALIDATE_BUG_ON(p->pkt_src != PKT_SRC_FFR);
5,813✔
440

441
    SCLogDebug("packet %" PRIu64 " is TCP. Direction %s", PcapPacketCntGet(p),
5,813✔
442
            PKT_IS_TOSERVER(p) ? "TOSERVER" : "TOCLIENT");
5,813✔
443
    DEBUG_VALIDATE_BUG_ON(!(p->flow && PacketIsTCP(p)));
5,813✔
444
    DEBUG_ASSERT_FLOW_LOCKED(p->flow);
5,813✔
445

446
    /* handle TCP and app layer */
447
    FlowWorkerStreamTCPUpdate(tv, fw, p, det_ctx, true);
5,813✔
448

449
    PacketUpdateEngineEventCounters(tv, fw->dtv, p);
5,813✔
450

451
    /* handle Detect */
452
    SCLogDebug("packet %" PRIu64 " calling Detect", PcapPacketCntGet(p));
5,813✔
453
    if (det_ctx != NULL) {
5,813✔
454
        FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_DETECT);
3,398✔
455
        Detect(tv, p, det_ctx);
3,398✔
456
        FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_DETECT);
3,398✔
457
    }
3,398✔
458

459
    // Outputs.
460
    OutputLoggerLog(tv, p, fw->output_thread);
5,813✔
461

462
    FramesPrune(p->flow, p);
5,813✔
463

464
    /*  Release tcp segments. Done here after alerting can use them. */
465
    FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_TCPPRUNE);
5,813✔
466
    StreamTcpPruneSession(p->flow, p->flowflags & FLOW_PKT_TOSERVER ?
5,813✔
467
            STREAM_TOSERVER : STREAM_TOCLIENT);
2,992✔
468
    FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_TCPPRUNE);
5,813✔
469

470
    /* run tx cleanup last */
471
    AppLayerParserTransactionsCleanup(p->flow, STREAM_FLAGS_FOR_PACKET(p));
5,813✔
472

473
    FlowDeReference(&p->flow);
5,813✔
474
    /* flow is unlocked later in FlowFinish() */
475
}
5,813✔
476

477
/** \internal
478
 *  \brief process flows injected into our queue by other threads
479
 */
480
static inline void FlowWorkerProcessInjectedFlows(
481
        ThreadVars *tv, FlowWorkerThreadData *fw, Packet *p)
482
{
1,669,265✔
483
    /* take injected flows and append to our work queue */
484
    FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_FLOW_INJECTED);
1,669,265✔
485
    FlowQueuePrivate injected = { NULL, NULL, 0 };
1,669,265✔
486
    if (SC_ATOMIC_GET(tv->flow_queue->non_empty))
1,669,265✔
487
        injected = FlowQueueExtractPrivate(tv->flow_queue);
938✔
488
    if (injected.len > 0) {
1,669,265✔
489
        StatsCounterAddI64(&tv->stats, fw->cnt.flows_injected, (int64_t)injected.len);
938✔
490
        if (p->pkt_src == PKT_SRC_WIRE)
938✔
491
            StatsCounterMaxUpdateI64(&tv->stats, fw->cnt.flows_injected_max, (int64_t)injected.len);
123✔
492

493
        /* move to local queue so we can process over the course of multiple packets */
494
        FlowQueuePrivateAppendPrivate(&fw->fls.work_queue, &injected);
938✔
495
    }
938✔
496
    FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_FLOW_INJECTED);
1,669,265✔
497
}
1,669,265✔
498

499
/** \internal
500
 *  \brief process flows set aside locally during flow lookup
501
 */
502
static inline void FlowWorkerProcessLocalFlows(ThreadVars *tv, FlowWorkerThreadData *fw, Packet *p)
503
{
1,669,189✔
504
    uint32_t max_work = 2;
1,669,189✔
505
    if (p->pkt_src == PKT_SRC_SHUTDOWN_FLUSH || p->pkt_src == PKT_SRC_CAPTURE_TIMEOUT)
1,669,190✔
506
        max_work = 0;
841✔
507

508
    FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_FLOW_EVICTED);
1,669,189✔
509
    if (fw->fls.work_queue.len) {
1,669,189✔
510
        FlowTimeoutCounters counters = { 0, 0, };
1,509✔
511
        CheckWorkQueue(tv, fw, &counters, &fw->fls.work_queue, max_work);
1,509✔
512
        UpdateCounters(tv, fw, &counters);
1,509✔
513
    }
1,509✔
514
    FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_FLOW_EVICTED);
1,669,189✔
515
}
1,669,189✔
516

517
/** \internal
518
 *  \brief apply Packet::app_update_direction to the flow flags
519
 */
520
static void PacketAppUpdate2FlowFlags(Packet *p)
521
{
1,149,987✔
522
    switch ((enum StreamUpdateDir)p->app_update_direction) {
1,149,987✔
523
        case UPDATE_DIR_NONE: // NONE implies pseudo packet
967,614✔
524
            SCLogDebug("pcap_cnt %" PRIu64 ", UPDATE_DIR_NONE", PcapPacketCntGet(p));
967,614✔
525
            break;
967,614✔
526
        case UPDATE_DIR_PACKET:
29,618✔
527
            if (PKT_IS_TOSERVER(p)) {
29,618✔
528
                p->flow->flags |= FLOW_TS_APP_UPDATED;
23,308✔
529
                SCLogDebug("pcap_cnt %" PRIu64 ", FLOW_TS_APP_UPDATED set", PcapPacketCntGet(p));
23,308✔
530
            } else {
23,308✔
531
                p->flow->flags |= FLOW_TC_APP_UPDATED;
6,310✔
532
                SCLogDebug("pcap_cnt %" PRIu64 ", FLOW_TC_APP_UPDATED set", PcapPacketCntGet(p));
6,310✔
533
            }
6,310✔
534
            break;
29,618✔
535
        case UPDATE_DIR_BOTH:
2,199✔
536
            if (PKT_IS_TOSERVER(p)) {
2,199✔
537
                p->flow->flags |= FLOW_TS_APP_UPDATED | FLOW_TC_APP_UPDATE_NEXT;
1,607✔
538
                SCLogDebug("pcap_cnt %" PRIu64 ", FLOW_TS_APP_UPDATED|FLOW_TC_APP_UPDATE_NEXT set",
1,607✔
539
                        PcapPacketCntGet(p));
1,607✔
540
            } else {
1,607✔
541
                p->flow->flags |= FLOW_TC_APP_UPDATED | FLOW_TS_APP_UPDATE_NEXT;
592✔
542
                SCLogDebug("pcap_cnt %" PRIu64 ", FLOW_TC_APP_UPDATED|FLOW_TS_APP_UPDATE_NEXT set",
592✔
543
                        PcapPacketCntGet(p));
592✔
544
            }
592✔
545
            /* fall through */
546
        case UPDATE_DIR_OPPOSING:
152,812✔
547
            if (PKT_IS_TOSERVER(p)) {
152,812✔
548
                p->flow->flags |= FLOW_TC_APP_UPDATED | FLOW_TS_APP_UPDATE_NEXT;
81,058✔
549
                SCLogDebug("pcap_cnt %" PRIu64 ", FLOW_TC_APP_UPDATED|FLOW_TS_APP_UPDATE_NEXT set",
81,058✔
550
                        PcapPacketCntGet(p));
81,058✔
551
            } else {
81,058✔
552
                p->flow->flags |= FLOW_TS_APP_UPDATED | FLOW_TC_APP_UPDATE_NEXT;
71,754✔
553
                SCLogDebug("pcap_cnt %" PRIu64 ", FLOW_TS_APP_UPDATED|FLOW_TC_APP_UPDATE_NEXT set",
71,754✔
554
                        PcapPacketCntGet(p));
71,754✔
555
            }
71,754✔
556
            break;
152,812✔
557
    }
1,149,987✔
558
}
1,149,987✔
559

560
static TmEcode FlowWorker(ThreadVars *tv, Packet *p, void *data)
561
{
1,669,293✔
562
    FlowWorkerThreadData *fw = data;
1,669,293✔
563
    DetectEngineThreadCtx *det_ctx = SC_ATOMIC_GET(fw->detect_thread);
1,669,293✔
564

565
    DEBUG_VALIDATE_BUG_ON(p == NULL);
1,669,293✔
566
    DEBUG_VALIDATE_BUG_ON(tv->flow_queue == NULL);
1,669,293✔
567

568
    SCLogDebug("packet %" PRIu64, PcapPacketCntGet(p));
1,669,293✔
569

570
    if ((PKT_IS_FLUSHPKT(p))) {
1,669,293✔
571
        SCLogDebug("thread %s flushing", tv->printable_name);
×
572
        OutputLoggerFlush(tv, p, fw->output_thread);
×
573
        /* Ack if a flush was requested */
574
        bool notset = false;
×
575
        SC_ATOMIC_CAS(&fw->flush_ack, notset, true);
×
576
        return TM_ECODE_OK;
×
577
    }
×
578

579
    /* handle Flow */
580
    if (det_ctx != NULL && det_ctx->de_ctx->PreFlowHook != NULL) {
1,669,293✔
581
        const uint8_t action = det_ctx->de_ctx->PreFlowHook(tv, det_ctx, p);
26✔
582
        if (action & ACTION_DROP) {
26✔
583
            PacketDrop(p, ACTION_DROP, PKT_DROP_REASON_FLOW_PRE_HOOK);
13✔
584
            goto pre_flow_drop;
13✔
585
        }
13✔
586
    }
26✔
587

588
    if (p->flags & PKT_WANTS_FLOW) {
1,669,280✔
589
        FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_FLOW);
1,587,416✔
590

591
        FlowHandlePacket(tv, &fw->fls, p);
1,587,416✔
592
        if (likely(p->flow != NULL)) {
1,587,416✔
593
            DEBUG_ASSERT_FLOW_LOCKED(p->flow);
1,587,368✔
594
            if (FlowUpdate(tv, fw, p) == TM_ECODE_DONE) {
1,587,368✔
595
                /* update time */
596
                if (!(PKT_IS_PSEUDOPKT(p))) {
1,360✔
597
                    TimeSetByThread(tv->id, p->ts);
1,360✔
598
                }
1,360✔
599
                goto housekeeping;
1,360✔
600
            }
1,360✔
601
        }
1,587,368✔
602
        /* Flow is now LOCKED */
603

604
        FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_FLOW);
1,586,056✔
605

606
    /* if PKT_WANTS_FLOW is not set, but PKT_HAS_FLOW is, then this is a
607
     * pseudo packet created by the flow manager. */
608
    } else if (p->flags & PKT_HAS_FLOW) {
1,604,367✔
609
        FLOWLOCK_WRLOCK(p->flow);
×
610
        DEBUG_VALIDATE_BUG_ON(p->pkt_src != PKT_SRC_FFR);
×
611
    }
×
612

613
    /* update time */
614
    if (!(PKT_IS_PSEUDOPKT(p))) {
1,667,920✔
615
        TimeSetByThread(tv->id, p->ts);
1,666,954✔
616
    }
1,666,954✔
617

618
    SCLogDebug("packet %" PRIu64 " has flow? %s", PcapPacketCntGet(p), p->flow ? "yes" : "no");
1,667,920✔
619

620
    /* handle TCP and app layer */
621
    if (p->flow) {
1,667,920✔
622
        SCLogDebug("packet %" PRIu64
1,585,905✔
623
                   ": direction %s FLOW_TS_APP_UPDATE_NEXT %s FLOW_TC_APP_UPDATE_NEXT %s",
1,585,905✔
624
                PcapPacketCntGet(p), PKT_IS_TOSERVER(p) ? "toserver" : "toclient",
1,585,905✔
625
                BOOL2STR((p->flow->flags & FLOW_TS_APP_UPDATE_NEXT) != 0),
1,585,905✔
626
                BOOL2STR((p->flow->flags & FLOW_TC_APP_UPDATE_NEXT) != 0));
1,585,905✔
627
        /* see if need to consider flags set by prev packets */
628
        if (PKT_IS_TOSERVER(p) && (p->flow->flags & FLOW_TS_APP_UPDATE_NEXT)) {
1,585,905✔
629
            p->flow->flags |= FLOW_TS_APP_UPDATED;
78,326✔
630
            p->flow->flags &= ~FLOW_TS_APP_UPDATE_NEXT;
78,326✔
631
            SCLogDebug("FLOW_TS_APP_UPDATED");
78,326✔
632
        } else if (PKT_IS_TOCLIENT(p) && (p->flow->flags & FLOW_TC_APP_UPDATE_NEXT)) {
1,507,579✔
633
            p->flow->flags |= FLOW_TC_APP_UPDATED;
70,791✔
634
            p->flow->flags &= ~FLOW_TC_APP_UPDATE_NEXT;
70,791✔
635
            SCLogDebug("FLOW_TC_APP_UPDATED");
70,791✔
636
        }
70,791✔
637

638
        if (PacketIsTCP(p)) {
1,585,905✔
639
            SCLogDebug("packet %" PRIu64 " is TCP. Direction %s", PcapPacketCntGet(p),
1,122,361✔
640
                    PKT_IS_TOSERVER(p) ? "TOSERVER" : "TOCLIENT");
1,122,361✔
641
            DEBUG_ASSERT_FLOW_LOCKED(p->flow);
1,122,361✔
642

643
            /* if detect is disabled, we need to apply file flags to the flow
644
             * here on the first packet. */
645
            if (det_ctx == NULL &&
1,122,361✔
646
                    ((PKT_IS_TOSERVER(p) && (p->flowflags & FLOW_PKT_TOSERVER_FIRST)) ||
1,122,361✔
647
                            (PKT_IS_TOCLIENT(p) && (p->flowflags & FLOW_PKT_TOCLIENT_FIRST)))) {
266,298✔
648
                DisableDetectFlowFileFlags(p->flow);
4,539✔
649
            }
4,539✔
650

651
            FlowWorkerStreamTCPUpdate(tv, fw, p, det_ctx, false);
1,122,361✔
652
            PacketAppUpdate2FlowFlags(p);
1,122,361✔
653

654
            /* handle the app layer part of the UDP packet payload */
655
        } else if (p->proto == IPPROTO_UDP && !PacketCheckAction(p, ACTION_DROP)) {
1,122,361✔
656
            FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_APPLAYERUDP);
27,728✔
657
            AppLayerHandleUdp(tv, fw->stream_thread->ra_ctx->app_tctx, p, p->flow);
27,728✔
658
            FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_APPLAYERUDP);
27,728✔
659
            PacketAppUpdate2FlowFlags(p);
27,728✔
660
        }
27,728✔
661
    }
1,585,905✔
662

663
    PacketUpdateEngineEventCounters(tv, fw->dtv, p);
1,667,920✔
664

665
    /* handle Detect */
666
    DEBUG_ASSERT_FLOW_LOCKED(p->flow);
1,667,920✔
667
    SCLogDebug("packet %" PRIu64 " calling Detect", PcapPacketCntGet(p));
1,667,920✔
668
    if (det_ctx != NULL) {
1,667,920✔
669
        FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_DETECT);
1,397,128✔
670
        Detect(tv, p, det_ctx);
1,397,128✔
671
        FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_DETECT);
1,397,128✔
672
    }
1,397,128✔
673

674
pre_flow_drop:
1,667,920✔
675
    // Outputs.
676
    OutputLoggerLog(tv, p, fw->output_thread);
1,667,795✔
677

678
    /*  Release tcp segments. Done here after alerting can use them. */
679
    if (p->flow != NULL) {
1,667,795✔
680
        DEBUG_ASSERT_FLOW_LOCKED(p->flow);
1,585,966✔
681

682
        if (FlowIsBypassed(p->flow)) {
1,585,966✔
683
            FlowCleanupAppLayer(p->flow);
14✔
684
            if (p->proto == IPPROTO_TCP) {
14✔
685
                StreamTcpSessionCleanup(p->flow->protoctx);
12✔
686
            }
12✔
687
        } else if (p->proto == IPPROTO_TCP && p->flow->protoctx && p->flags & PKT_STREAM_EST) {
1,585,952✔
688
            if ((p->flow->flags & FLOW_TS_APP_UPDATED) && PKT_IS_TOSERVER(p)) {
1,097,978✔
689
                FramesPrune(p->flow, p);
139,299✔
690
            } else if ((p->flow->flags & FLOW_TC_APP_UPDATED) && PKT_IS_TOCLIENT(p)) {
958,679✔
691
                FramesPrune(p->flow, p);
118,307✔
692
            }
118,307✔
693
            FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_TCPPRUNE);
1,097,978✔
694
            StreamTcpPruneSession(p->flow, p->flowflags & FLOW_PKT_TOSERVER ?
1,097,978✔
695
                    STREAM_TOSERVER : STREAM_TOCLIENT);
554,973✔
696
            FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_TCPPRUNE);
1,097,978✔
697
        } else if (p->proto == IPPROTO_UDP) {
1,098,007✔
698
            FramesPrune(p->flow, p);
27,764✔
699
        }
27,764✔
700

701
        if ((PKT_IS_PSEUDOPKT(p)) ||
1,585,966✔
702
                (p->flow->flags & (FLOW_TS_APP_UPDATED | FLOW_TC_APP_UPDATED))) {
1,585,966✔
703
            if ((p->flags & PKT_STREAM_EST) || p->proto != IPPROTO_TCP) {
301,639✔
704
                if (PKT_IS_TOSERVER(p)) {
300,172✔
705
                    if (PKT_IS_PSEUDOPKT(p) || (p->flow->flags & (FLOW_TS_APP_UPDATED))) {
167,539✔
706
                        AppLayerParserTransactionsCleanup(p->flow, STREAM_TOSERVER);
160,271✔
707
                        p->flow->flags &= ~FLOW_TS_APP_UPDATED;
160,271✔
708
                        SCLogDebug("~FLOW_TS_APP_UPDATED");
160,271✔
709
                    }
160,271✔
710
                } else {
167,520✔
711
                    if (PKT_IS_PSEUDOPKT(p) || (p->flow->flags & (FLOW_TC_APP_UPDATED))) {
132,656✔
712
                        AppLayerParserTransactionsCleanup(p->flow, STREAM_TOCLIENT);
121,855✔
713
                        p->flow->flags &= ~FLOW_TC_APP_UPDATED;
121,855✔
714
                        SCLogDebug("~FLOW_TC_APP_UPDATED");
121,855✔
715
                    }
121,855✔
716
                }
132,652✔
717
            }
300,172✔
718
        } else {
1,284,327✔
719
            SCLogDebug("not pseudo, no app update: skip");
1,284,327✔
720
        }
1,284,327✔
721

722
        if (p->flow->flags & FLOW_ACTION_DROP) {
1,585,966✔
723
            SCLogDebug("flow drop in place: remove app update flags");
2,670✔
724
            p->flow->flags &= ~(FLOW_TS_APP_UPDATED | FLOW_TC_APP_UPDATED);
2,670✔
725
        }
2,670✔
726

727
        Flow *f = p->flow;
1,585,966✔
728
        FlowDeReference(&p->flow);
1,585,966✔
729
        FLOWLOCK_UNLOCK(f);
1,585,966✔
730
    }
1,585,966✔
731

732
housekeeping:
1,669,301✔
733

734
    /* take injected flows and add them to our local queue */
735
    FlowWorkerProcessInjectedFlows(tv, fw, p);
1,669,296✔
736

737
    /* process local work queue */
738
    FlowWorkerProcessLocalFlows(tv, fw, p);
1,669,296✔
739

740
    return TM_ECODE_OK;
1,669,296✔
741
}
1,667,795✔
742

743
void FlowWorkerReplaceDetectCtx(void *flow_worker, void *detect_ctx)
744
{
×
745
    FlowWorkerThreadData *fw = flow_worker;
×
746

747
    SC_ATOMIC_SET(fw->detect_thread, detect_ctx);
×
748
}
×
749

750
void *FlowWorkerGetDetectCtxPtr(void *flow_worker)
751
{
×
752
    FlowWorkerThreadData *fw = flow_worker;
×
753

754
    return SC_ATOMIC_GET(fw->detect_thread);
×
755
}
×
756

757
void *FlowWorkerGetThreadData(void *flow_worker)
758
{
×
759
    return (FlowWorkerThreadData *)flow_worker;
×
760
}
×
761

762
bool FlowWorkerGetFlushAck(void *flow_worker)
763
{
×
764
    FlowWorkerThreadData *fw = flow_worker;
×
765
    return SC_ATOMIC_GET(fw->flush_ack) == true;
×
766
}
×
767

768
void FlowWorkerSetFlushAck(void *flow_worker)
769
{
×
770
    FlowWorkerThreadData *fw = flow_worker;
×
771
    SC_ATOMIC_SET(fw->flush_ack, false);
×
772
}
×
773

774
const char *ProfileFlowWorkerIdToString(enum ProfileFlowWorkerId fwi)
775
{
×
776
    switch (fwi) {
×
777
        case PROFILE_FLOWWORKER_FLOW:
×
778
            return "flow";
×
779
        case PROFILE_FLOWWORKER_STREAM:
×
780
            return "stream";
×
781
        case PROFILE_FLOWWORKER_APPLAYERUDP:
×
782
            return "app-layer";
×
783
        case PROFILE_FLOWWORKER_DETECT:
×
784
            return "detect";
×
785
        case PROFILE_FLOWWORKER_TCPPRUNE:
×
786
            return "tcp-prune";
×
787
        case PROFILE_FLOWWORKER_FLOW_INJECTED:
×
788
            return "flow-inject";
×
789
        case PROFILE_FLOWWORKER_FLOW_EVICTED:
×
790
            return "flow-evict";
×
791
        case PROFILE_FLOWWORKER_SIZE:
×
792
            return "size";
×
793
    }
×
794
    return "error";
×
795
}
×
796

797
static bool FlowWorkerIsBusy(ThreadVars *tv, void *flow_worker)
798
{
50,090✔
799
    FlowWorkerThreadData *fw = flow_worker;
50,090✔
800
    if (fw->pq.len)
50,090✔
801
        return true;
×
802
    if (fw->fls.work_queue.len)
50,090✔
803
        return true;
26✔
804

805
    if (tv->flow_queue) {
50,064✔
806
        FQLOCK_LOCK(tv->flow_queue);
50,064✔
807
        bool fq_done = (tv->flow_queue->qlen == 0);
50,064✔
808
        FQLOCK_UNLOCK(tv->flow_queue);
50,064✔
809
        if (!fq_done) {
50,064✔
810
            return true;
802✔
811
        }
802✔
812
    }
50,064✔
813

814
    return false;
49,262✔
815
}
50,064✔
816

817
void TmModuleFlowWorkerRegister (void)
818
{
2,191✔
819
    tmm_modules[TMM_FLOWWORKER].name = "FlowWorker";
2,191✔
820
    tmm_modules[TMM_FLOWWORKER].ThreadInit = FlowWorkerThreadInit;
2,191✔
821
    tmm_modules[TMM_FLOWWORKER].Func = FlowWorker;
2,191✔
822
    tmm_modules[TMM_FLOWWORKER].ThreadBusy = FlowWorkerIsBusy;
2,191✔
823
    tmm_modules[TMM_FLOWWORKER].ThreadDeinit = FlowWorkerThreadDeinit;
2,191✔
824
    tmm_modules[TMM_FLOWWORKER].cap_flags = 0;
2,191✔
825
    tmm_modules[TMM_FLOWWORKER].flags = TM_FLAG_FLOWWORKER_TM;
2,191✔
826
}
2,191✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc