• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

OISF / suricata / 22618661228

02 Mar 2026 09:33PM UTC coverage: 42.258% (-34.4%) from 76.611%
22618661228

push

github

victorjulien
github-actions: bump actions/download-artifact from 7.0.0 to 8.0.0

Bumps [actions/download-artifact](https://github.com/actions/download-artifact) from 7.0.0 to 8.0.0.
- [Release notes](https://github.com/actions/download-artifact/releases)
- [Commits](https://github.com/actions/download-artifact/compare/37930b1c2...70fc10c6e)

---
updated-dependencies:
- dependency-name: actions/download-artifact
  dependency-version: 8.0.0
  dependency-type: direct:production
  update-type: version-update:semver-major
...

Signed-off-by: dependabot[bot] <support@github.com>

91511 of 216553 relevant lines covered (42.26%)

3416852.41 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.82
/src/flow-worker.c
1
/* Copyright (C) 2016-2024 Open Information Security Foundation
2
 *
3
 * You can copy, redistribute or modify this Program under the terms of
4
 * the GNU General Public License version 2 as published by the Free
5
 * Software Foundation.
6
 *
7
 * This program is distributed in the hope that it will be useful,
8
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10
 * GNU General Public License for more details.
11
 *
12
 * You should have received a copy of the GNU General Public License
13
 * version 2 along with this program; if not, write to the Free Software
14
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15
 * 02110-1301, USA.
16
 */
17

18
/**
19
 * \file
20
 *
21
 * \author Victor Julien <victor@inliniac.net>
22
 *
23
 * Flow Workers are single thread modules taking care of (almost)
24
 * everything related to packets with flows:
25
 *
26
 * - Lookup/creation
27
 * - Stream tracking, reassembly
28
 * - Applayer update
29
 * - Detection
30
 *
31
 * This all while holding the flow lock.
32
 */
33

34
#include "suricata-common.h"
35
#include "suricata.h"
36

37
#include "action-globals.h"
38
#include "packet.h"
39
#include "decode.h"
40
#include "detect.h"
41
#include "stream-tcp.h"
42
#include "app-layer.h"
43
#include "detect-engine.h"
44
#include "output.h"
45
#include "app-layer-parser.h"
46
#include "app-layer-frames.h"
47

48
#include "util-profiling.h"
49
#include "util-validate.h"
50
#include "util-time.h"
51
#include "tmqh-packetpool.h"
52

53
#include "flow-util.h"
54
#include "flow-manager.h"
55
#include "flow-timeout.h"
56
#include "flow-spare-pool.h"
57
#include "flow-worker.h"
58

59
typedef DetectEngineThreadCtx *DetectEngineThreadCtxPtr;
60

61
typedef struct FlowTimeoutCounters {
62
    uint32_t flows_aside_needs_work;
63
    uint32_t flows_aside_pkt_inject;
64
} FlowTimeoutCounters;
65

66
typedef struct FlowWorkerThreadData_ {
67
    DecodeThreadVars *dtv;
68

69
    union {
70
        StreamTcpThread *stream_thread;
71
        void *stream_thread_ptr;
72
    };
73

74
    SC_ATOMIC_DECLARE(DetectEngineThreadCtxPtr, detect_thread);
75

76
    SC_ATOMIC_DECLARE(bool, flush_ack);
77

78
    void *output_thread; /* Output thread data. */
79
    void *output_thread_flow; /* Output thread data. */
80

81
    StatsCounterId local_bypass_pkts;
82
    StatsCounterId local_bypass_bytes;
83
    StatsCounterId both_bypass_pkts;
84
    StatsCounterId both_bypass_bytes;
85
    /** Queue to put pseudo packets that have been created by the stream (RST response) and by the
86
     * flush logic following a protocol change. */
87
    PacketQueueNoLock pq;
88
    FlowLookupStruct fls;
89

90
    struct {
91
        StatsCounterId flows_injected;
92
        StatsCounterMaxId flows_injected_max;
93
        StatsCounterId flows_removed;
94
        StatsCounterId flows_aside_needs_work;
95
        StatsCounterId flows_aside_pkt_inject;
96
    } cnt;
97
    FlowEndCounters fec;
98

99
} FlowWorkerThreadData;
100

101
static void FlowWorkerFlowTimeout(
102
        ThreadVars *tv, Packet *p, FlowWorkerThreadData *fw, DetectEngineThreadCtx *det_ctx);
103

104
/**
105
 * \internal
106
 * \brief Forces reassembly for flow if it needs it.
107
 *
108
 *        The function requires flow to be locked beforehand.
109
 *
110
 * \param f Pointer to the flow.
111
 *
112
 * \retval cnt number of packets injected
113
 */
114
static int FlowFinish(ThreadVars *tv, Flow *f, FlowWorkerThreadData *fw, void *detect_thread)
115
{
1,390✔
116
    const int server = f->ffr_tc;
1,390✔
117
    const int client = f->ffr_ts;
1,390✔
118
    int cnt = 0;
1,390✔
119

120
    /* Get the tcp session for the flow */
121
    const TcpSession *ssn = (TcpSession *)f->protoctx;
1,390✔
122

123
    /* insert a pseudo packet in the toserver direction */
124
    if (client == STREAM_HAS_UNPROCESSED_SEGMENTS_NEED_ONLY_DETECTION) {
1,390✔
125
        Packet *p = FlowPseudoPacketGet(0, f, ssn);
1,112✔
126
        if (p != NULL) {
1,112✔
127
            PKT_SET_SRC(p, PKT_SRC_FFR);
1,111✔
128
            if (server == STREAM_HAS_UNPROCESSED_SEGMENTS_NONE) {
1,111✔
129
                p->flowflags |= FLOW_PKT_LAST_PSEUDO;
206✔
130
            }
206✔
131
            FlowWorkerFlowTimeout(tv, p, fw, detect_thread);
1,111✔
132
            PacketPoolReturnPacket(p);
1,111✔
133
            cnt++;
1,111✔
134
        }
1,111✔
135
    }
1,112✔
136

137
    /* handle toclient */
138
    if (server == STREAM_HAS_UNPROCESSED_SEGMENTS_NEED_ONLY_DETECTION) {
1,390✔
139
        Packet *p = FlowPseudoPacketGet(1, f, ssn);
1,184✔
140
        if (p != NULL) {
1,184✔
141
            PKT_SET_SRC(p, PKT_SRC_FFR);
1,184✔
142
            p->flowflags |= FLOW_PKT_LAST_PSEUDO;
1,184✔
143
            FlowWorkerFlowTimeout(tv, p, fw, detect_thread);
1,184✔
144
            PacketPoolReturnPacket(p);
1,184✔
145
            f->flags |= FLOW_TIMEOUT_REASSEMBLY_DONE;
1,184✔
146
            cnt++;
1,184✔
147
        }
1,184✔
148
    }
1,184✔
149

150
    if (cnt > 0) {
1,390✔
151
        f->flags |= FLOW_TIMEOUT_REASSEMBLY_DONE;
1,390✔
152
    }
1,390✔
153
    return cnt;
1,390✔
154
}
1,390✔
155

156
/** \param[in] max_work Max flows to process. 0 if unlimited. */
157
static void CheckWorkQueue(ThreadVars *tv, FlowWorkerThreadData *fw, FlowTimeoutCounters *counters,
158
        FlowQueuePrivate *fq, const uint32_t max_work)
159
{
687✔
160
    FlowQueuePrivate ret_queue = { NULL, NULL, 0 };
687✔
161
    uint32_t i = 0;
687✔
162
    Flow *f;
687✔
163
    while ((f = FlowQueuePrivateGetFromTop(fq)) != NULL) {
2,180✔
164
        FLOWLOCK_WRLOCK(f);
1,493✔
165
        f->flow_end_flags |= FLOW_END_FLAG_TIMEOUT; //TODO emerg
1,493✔
166

167
        if (f->proto == IPPROTO_TCP) {
1,493✔
168
            if (!(f->flags & (FLOW_TIMEOUT_REASSEMBLY_DONE | FLOW_ACTION_DROP)) &&
1,394✔
169
                    !FlowIsBypassed(f) && FlowNeedsReassembly(f) && f->ffr != 0) {
1,394✔
170
                /* read detect thread in case we're doing a reload */
171
                void *detect_thread = SC_ATOMIC_GET(fw->detect_thread);
1,390✔
172
                int cnt = FlowFinish(tv, f, fw, detect_thread);
1,390✔
173
                counters->flows_aside_pkt_inject += cnt;
1,390✔
174
                counters->flows_aside_needs_work++;
1,390✔
175
            }
1,390✔
176
        }
1,394✔
177

178
        /* no one is referring to this flow, removed from hash
179
         * so we can unlock it and pass it to the flow recycler */
180

181
        if (fw->output_thread_flow != NULL)
1,493✔
182
            (void)OutputFlowLog(tv, fw->output_thread_flow, f);
1,494✔
183

184
        FlowEndCountersUpdate(tv, &fw->fec, f);
1,493✔
185
        if (f->proto == IPPROTO_TCP && f->protoctx != NULL) {
1,493✔
186
            StatsCounterDecr(&tv->stats, fw->dtv->counter_tcp_active_sessions);
1,390✔
187
        }
1,390✔
188
        StatsCounterDecr(&tv->stats, fw->dtv->counter_flow_active);
1,493✔
189

190
        FlowClearMemory (f, f->protomap);
1,493✔
191
        FLOWLOCK_UNLOCK(f);
1,493✔
192

193
        if (fw->fls.spare_queue.len >= (FLOW_SPARE_POOL_BLOCK_SIZE * 2)) {
1,493✔
194
            FlowQueuePrivatePrependFlow(&ret_queue, f);
×
195
            if (ret_queue.len == FLOW_SPARE_POOL_BLOCK_SIZE) {
×
196
                FlowSparePoolReturnFlows(&ret_queue);
×
197
            }
×
198
        } else {
1,493✔
199
            FlowQueuePrivatePrependFlow(&fw->fls.spare_queue, f);
1,493✔
200
        }
1,493✔
201

202
        if (max_work != 0 && ++i == max_work)
1,493✔
203
            break;
×
204
    }
1,493✔
205
    if (ret_queue.len > 0) {
687✔
206
        FlowSparePoolReturnFlows(&ret_queue);
×
207
    }
×
208

209
    StatsCounterAddI64(&tv->stats, fw->cnt.flows_removed, (int64_t)i);
687✔
210
}
687✔
211

212
/** \brief handle flow for packet
213
 *
214
 *  Handle flow creation/lookup
215
 */
216
static inline TmEcode FlowUpdate(ThreadVars *tv, FlowWorkerThreadData *fw, Packet *p)
217
{
1,099,682✔
218
    FlowHandlePacketUpdate(p->flow, p, tv, fw->dtv);
1,099,682✔
219

220
    int state = p->flow->flow_state;
1,099,682✔
221
    switch (state) {
1,099,682✔
222
#ifdef CAPTURE_OFFLOAD
223
        case FLOW_STATE_CAPTURE_BYPASSED: {
224
            StatsCounterAddI64(&tv->stats, fw->both_bypass_pkts, 1);
225
            StatsCounterAddI64(&tv->stats, fw->both_bypass_bytes, GET_PKT_LEN(p));
226
            Flow *f = p->flow;
227
            FlowDeReference(&p->flow);
228
            FLOWLOCK_UNLOCK(f);
229
            return TM_ECODE_DONE;
230
        }
231
#endif
232
        case FLOW_STATE_LOCAL_BYPASSED: {
×
233
            StatsCounterAddI64(&tv->stats, fw->local_bypass_pkts, 1);
×
234
            StatsCounterAddI64(&tv->stats, fw->local_bypass_bytes, GET_PKT_LEN(p));
×
235
            Flow *f = p->flow;
×
236
            FlowDeReference(&p->flow);
×
237
            FLOWLOCK_UNLOCK(f);
×
238
            return TM_ECODE_DONE;
×
239
        }
×
240
        default:
1,099,634✔
241
            return TM_ECODE_OK;
1,099,634✔
242
    }
1,099,682✔
243
}
1,099,682✔
244

245
static TmEcode FlowWorkerThreadDeinit(ThreadVars *tv, void *data);
246

247
static TmEcode FlowWorkerThreadInit(ThreadVars *tv, const void *initdata, void **data)
248
{
5,593✔
249
    FlowWorkerThreadData *fw = SCCalloc(1, sizeof(*fw));
5,593✔
250
    if (fw == NULL)
5,593✔
251
        return TM_ECODE_FAILED;
×
252

253
    SC_ATOMIC_INITPTR(fw->detect_thread);
5,593✔
254
    SC_ATOMIC_SET(fw->detect_thread, NULL);
5,593✔
255

256
    fw->local_bypass_pkts = StatsRegisterCounter("flow_bypassed.local_pkts", &tv->stats);
5,593✔
257
    fw->local_bypass_bytes = StatsRegisterCounter("flow_bypassed.local_bytes", &tv->stats);
5,593✔
258
    fw->both_bypass_pkts = StatsRegisterCounter("flow_bypassed.local_capture_pkts", &tv->stats);
5,593✔
259
    fw->both_bypass_bytes = StatsRegisterCounter("flow_bypassed.local_capture_bytes", &tv->stats);
5,593✔
260

261
    fw->cnt.flows_aside_needs_work =
5,593✔
262
            StatsRegisterCounter("flow.wrk.flows_evicted_needs_work", &tv->stats);
5,593✔
263
    fw->cnt.flows_aside_pkt_inject =
5,593✔
264
            StatsRegisterCounter("flow.wrk.flows_evicted_pkt_inject", &tv->stats);
5,593✔
265
    fw->cnt.flows_removed = StatsRegisterCounter("flow.wrk.flows_evicted", &tv->stats);
5,593✔
266
    fw->cnt.flows_injected = StatsRegisterCounter("flow.wrk.flows_injected", &tv->stats);
5,593✔
267
    fw->cnt.flows_injected_max = StatsRegisterMaxCounter("flow.wrk.flows_injected_max", &tv->stats);
5,593✔
268

269
    fw->fls.dtv = fw->dtv = DecodeThreadVarsAlloc(tv);
5,593✔
270
    if (fw->dtv == NULL) {
5,593✔
271
        FlowWorkerThreadDeinit(tv, fw);
×
272
        return TM_ECODE_FAILED;
×
273
    }
×
274

275
    /* setup TCP */
276
    if (StreamTcpThreadInit(tv, NULL, &fw->stream_thread_ptr) != TM_ECODE_OK) {
5,593✔
277
        FlowWorkerThreadDeinit(tv, fw);
×
278
        return TM_ECODE_FAILED;
×
279
    }
×
280

281
    if (DetectEngineEnabled()) {
5,593✔
282
        /* setup DETECT */
283
        void *detect_thread = NULL;
5,593✔
284
        if (DetectEngineThreadCtxInit(tv, NULL, &detect_thread) != TM_ECODE_OK) {
5,593✔
285
            FlowWorkerThreadDeinit(tv, fw);
×
286
            return TM_ECODE_FAILED;
×
287
        }
×
288
        SC_ATOMIC_SET(fw->detect_thread, detect_thread);
5,593✔
289
    }
5,593✔
290

291
    /* Setup outputs for this thread. */
292
    if (OutputLoggerThreadInit(tv, initdata, &fw->output_thread) != TM_ECODE_OK) {
5,593✔
293
        FlowWorkerThreadDeinit(tv, fw);
×
294
        return TM_ECODE_FAILED;
×
295
    }
×
296
    if (OutputFlowLogThreadInit(tv, &fw->output_thread_flow) != TM_ECODE_OK) {
5,593✔
297
        SCLogError("initializing flow log API for thread failed");
×
298
        FlowWorkerThreadDeinit(tv, fw);
×
299
        return TM_ECODE_FAILED;
×
300
    }
×
301

302
    DecodeRegisterPerfCounters(fw->dtv, tv);
5,593✔
303
    AppLayerRegisterThreadCounters(tv);
5,593✔
304
    FlowEndCountersRegister(tv, &fw->fec);
5,593✔
305

306
    /* setup pq for stream end pkts */
307
    memset(&fw->pq, 0, sizeof(PacketQueueNoLock));
5,593✔
308
    *data = fw;
5,593✔
309
    return TM_ECODE_OK;
5,593✔
310
}
5,593✔
311

312
static TmEcode FlowWorkerThreadDeinit(ThreadVars *tv, void *data)
313
{
5,593✔
314
    FlowWorkerThreadData *fw = data;
5,593✔
315

316
    DecodeThreadVarsFree(tv, fw->dtv);
5,593✔
317

318
    /* free TCP */
319
    StreamTcpThreadDeinit(tv, (void *)fw->stream_thread);
5,593✔
320

321
    /* free DETECT */
322
    void *detect_thread = SC_ATOMIC_GET(fw->detect_thread);
5,593✔
323
    if (detect_thread != NULL) {
5,593✔
324
        DetectEngineThreadCtxDeinit(tv, detect_thread);
5,593✔
325
        SC_ATOMIC_SET(fw->detect_thread, NULL);
5,593✔
326
    }
5,593✔
327

328
    /* Free output. */
329
    OutputLoggerThreadDeinit(tv, fw->output_thread);
5,593✔
330
    OutputFlowLogThreadDeinit(tv, fw->output_thread_flow);
5,593✔
331

332
    /* free pq */
333
    BUG_ON(fw->pq.len);
5,593✔
334

335
    Flow *f;
5,593✔
336
    while ((f = FlowQueuePrivateGetFromTop(&fw->fls.spare_queue)) != NULL) {
174,733✔
337
        FlowFree(f);
169,140✔
338
    }
169,140✔
339

340
    SCFree(fw);
5,593✔
341
    return TM_ECODE_OK;
5,593✔
342
}
5,593✔
343

344
TmEcode Detect(ThreadVars *tv, Packet *p, void *data);
345
TmEcode StreamTcp (ThreadVars *, Packet *, void *, PacketQueueNoLock *pq);
346

347
static inline void UpdateCounters(ThreadVars *tv,
348
        FlowWorkerThreadData *fw, const FlowTimeoutCounters *counters)
349
{
688✔
350
    if (counters->flows_aside_needs_work) {
688✔
351
        StatsCounterAddI64(&tv->stats, fw->cnt.flows_aside_needs_work,
584✔
352
                (int64_t)counters->flows_aside_needs_work);
584✔
353
    }
584✔
354
    if (counters->flows_aside_pkt_inject) {
688✔
355
        StatsCounterAddI64(&tv->stats, fw->cnt.flows_aside_pkt_inject,
584✔
356
                (int64_t)counters->flows_aside_pkt_inject);
584✔
357
    }
584✔
358
}
688✔
359

360
/** \brief update stream engine
361
 *
362
 *  We can be called from both the flow timeout path as well as from the
363
 *  "real" traffic path. If in the timeout path any additional packets we
364
 *  forge for flushing pipelines should not leave our scope. If the original
365
 *  packet is real (or related to a real packet) we need to push the packets
366
 *  on, so IPS logic stays valid.
367
 */
368
static inline void FlowWorkerStreamTCPUpdate(ThreadVars *tv, FlowWorkerThreadData *fw, Packet *p,
369
        DetectEngineThreadCtx *det_ctx, const bool timeout)
370
{
650,537✔
371
    if (det_ctx != NULL && det_ctx->de_ctx->PreStreamHook != NULL) {
650,538✔
372
        const uint8_t action = det_ctx->de_ctx->PreStreamHook(tv, det_ctx, p);
×
373
        if (action & ACTION_DROP) {
×
374
            PacketDrop(p, ACTION_DROP, PKT_DROP_REASON_STREAM_PRE_HOOK);
×
375
            return;
×
376
        }
×
377
    }
×
378

379
    FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_STREAM);
650,537✔
380
    StreamTcp(tv, p, fw->stream_thread, &fw->pq);
650,537✔
381
    FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_STREAM);
650,537✔
382

383
    // this is the first packet that sets no payload inspection
384
    bool setting_nopayload =
650,537✔
385
            p->flow->alparser &&
650,537✔
386
            SCAppLayerParserStateIssetFlag(p->flow->alparser, APP_LAYER_PARSER_NO_INSPECTION) &&
650,537✔
387
            !(p->flags & PKT_NOPAYLOAD_INSPECTION);
650,537✔
388
    if (FlowChangeProto(p->flow) || setting_nopayload) {
650,537✔
389
        StreamTcpDetectLogFlush(tv, fw->stream_thread, p->flow, p, &fw->pq);
272✔
390
        if (setting_nopayload) {
272✔
391
            FlowSetNoPayloadInspectionFlag(p->flow);
11✔
392
        }
11✔
393
        SCAppLayerParserStateSetFlag(p->flow->alparser, APP_LAYER_PARSER_EOF_TS);
272✔
394
        SCAppLayerParserStateSetFlag(p->flow->alparser, APP_LAYER_PARSER_EOF_TC);
272✔
395
    }
272✔
396

397
    /* Packets here can safely access p->flow as it's locked */
398
    SCLogDebug("packet %" PRIu64 ": extra packets %u", PcapPacketCntGet(p), fw->pq.len);
650,537✔
399
    Packet *x;
650,537✔
400
    while ((x = PacketDequeueNoLock(&fw->pq))) {
651,081✔
401
        SCLogDebug("packet %" PRIu64 " extra packet %p", PcapPacketCntGet(p), x);
544✔
402

403
        if (det_ctx != NULL) {
544✔
404
            FLOWWORKER_PROFILING_START(x, PROFILE_FLOWWORKER_DETECT);
544✔
405
            Detect(tv, x, det_ctx);
544✔
406
            FLOWWORKER_PROFILING_END(x, PROFILE_FLOWWORKER_DETECT);
544✔
407
        }
544✔
408

409
        OutputLoggerLog(tv, x, fw->output_thread);
544✔
410

411
        FramesPrune(x->flow, x);
544✔
412
        /*  Release tcp segments. Done here after alerting can use them. */
413
        FLOWWORKER_PROFILING_START(x, PROFILE_FLOWWORKER_TCPPRUNE);
544✔
414
        StreamTcpPruneSession(
544✔
415
                x->flow, x->flowflags & FLOW_PKT_TOSERVER ? STREAM_TOSERVER : STREAM_TOCLIENT);
544✔
416
        FLOWWORKER_PROFILING_END(x, PROFILE_FLOWWORKER_TCPPRUNE);
544✔
417

418
        /* no need to keep a flow ref beyond this point */
419
        FlowDeReference(&x->flow);
544✔
420

421
        /* no further work to do for this pseudo packet, so we can return
422
         * it to the pool immediately. */
423
        if (timeout) {
544✔
424
            PacketPoolReturnPacket(x);
4✔
425
        } else {
540✔
426
            /* to support IPS verdict logic, in the non-timeout case we need to do a bit more */
427
            TmqhOutputPacketpool(tv, x);
540✔
428
        }
540✔
429
    }
544✔
430
    if (FlowChangeProto(p->flow) && p->flow->flags & FLOW_ACTION_DROP) {
650,537✔
431
        // in case f->flags & FLOW_ACTION_DROP was set by one of the dequeued packets
432
        PacketDrop(p, ACTION_DROP, PKT_DROP_REASON_FLOW_DROP);
×
433
    }
×
434
}
650,537✔
435

436
static void FlowWorkerFlowTimeout(
437
        ThreadVars *tv, Packet *p, FlowWorkerThreadData *fw, DetectEngineThreadCtx *det_ctx)
438
{
2,295✔
439
    DEBUG_VALIDATE_BUG_ON(p->pkt_src != PKT_SRC_FFR);
2,295✔
440

441
    SCLogDebug("packet %" PRIu64 " is TCP. Direction %s", PcapPacketCntGet(p),
2,295✔
442
            PKT_IS_TOSERVER(p) ? "TOSERVER" : "TOCLIENT");
2,295✔
443
    DEBUG_VALIDATE_BUG_ON(!(p->flow && PacketIsTCP(p)));
2,295✔
444
    DEBUG_ASSERT_FLOW_LOCKED(p->flow);
2,295✔
445

446
    /* handle TCP and app layer */
447
    FlowWorkerStreamTCPUpdate(tv, fw, p, det_ctx, true);
2,295✔
448

449
    PacketUpdateEngineEventCounters(tv, fw->dtv, p);
2,295✔
450

451
    /* handle Detect */
452
    SCLogDebug("packet %" PRIu64 " calling Detect", PcapPacketCntGet(p));
2,295✔
453
    if (det_ctx != NULL) {
2,295✔
454
        FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_DETECT);
2,295✔
455
        Detect(tv, p, det_ctx);
2,295✔
456
        FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_DETECT);
2,295✔
457
    }
2,295✔
458

459
    // Outputs.
460
    OutputLoggerLog(tv, p, fw->output_thread);
2,295✔
461

462
    FramesPrune(p->flow, p);
2,295✔
463

464
    /*  Release tcp segments. Done here after alerting can use them. */
465
    FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_TCPPRUNE);
2,295✔
466
    StreamTcpPruneSession(p->flow, p->flowflags & FLOW_PKT_TOSERVER ?
2,295✔
467
            STREAM_TOSERVER : STREAM_TOCLIENT);
1,184✔
468
    FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_TCPPRUNE);
2,295✔
469

470
    /* run tx cleanup last */
471
    AppLayerParserTransactionsCleanup(p->flow, STREAM_FLAGS_FOR_PACKET(p));
2,295✔
472

473
    FlowDeReference(&p->flow);
2,295✔
474
    /* flow is unlocked later in FlowFinish() */
475
}
2,295✔
476

477
/** \internal
478
 *  \brief process flows injected into our queue by other threads
479
 */
480
static inline void FlowWorkerProcessInjectedFlows(
481
        ThreadVars *tv, FlowWorkerThreadData *fw, Packet *p)
482
{
1,180,293✔
483
    /* take injected flows and append to our work queue */
484
    FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_FLOW_INJECTED);
1,180,293✔
485
    FlowQueuePrivate injected = { NULL, NULL, 0 };
1,180,293✔
486
    if (SC_ATOMIC_GET(tv->flow_queue->non_empty))
1,180,293✔
487
        injected = FlowQueueExtractPrivate(tv->flow_queue);
579✔
488
    if (injected.len > 0) {
1,180,293✔
489
        StatsCounterAddI64(&tv->stats, fw->cnt.flows_injected, (int64_t)injected.len);
579✔
490
        if (p->pkt_src == PKT_SRC_WIRE)
579✔
491
            StatsCounterMaxUpdateI64(&tv->stats, fw->cnt.flows_injected_max, (int64_t)injected.len);
×
492

493
        /* move to local queue so we can process over the course of multiple packets */
494
        FlowQueuePrivateAppendPrivate(&fw->fls.work_queue, &injected);
579✔
495
    }
579✔
496
    FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_FLOW_INJECTED);
1,180,293✔
497
}
1,180,293✔
498

499
/** \internal
500
 *  \brief process flows set aside locally during flow lookup
501
 */
502
static inline void FlowWorkerProcessLocalFlows(ThreadVars *tv, FlowWorkerThreadData *fw, Packet *p)
503
{
1,180,296✔
504
    uint32_t max_work = 2;
1,180,296✔
505
    if (p->pkt_src == PKT_SRC_SHUTDOWN_FLUSH || p->pkt_src == PKT_SRC_CAPTURE_TIMEOUT)
1,180,296✔
506
        max_work = 0;
592✔
507

508
    FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_FLOW_EVICTED);
1,180,296✔
509
    if (fw->fls.work_queue.len) {
1,180,296✔
510
        FlowTimeoutCounters counters = { 0, 0, };
688✔
511
        CheckWorkQueue(tv, fw, &counters, &fw->fls.work_queue, max_work);
688✔
512
        UpdateCounters(tv, fw, &counters);
688✔
513
    }
688✔
514
    FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_FLOW_EVICTED);
1,180,296✔
515
}
1,180,296✔
516

517
/** \internal
518
 *  \brief apply Packet::app_update_direction to the flow flags
519
 */
520
static void PacketAppUpdate2FlowFlags(Packet *p)
521
{
664,876✔
522
    switch ((enum StreamUpdateDir)p->app_update_direction) {
664,876✔
523
        case UPDATE_DIR_NONE: // NONE implies pseudo packet
605,697✔
524
            SCLogDebug("pcap_cnt %" PRIu64 ", UPDATE_DIR_NONE", PcapPacketCntGet(p));
605,697✔
525
            break;
605,697✔
526
        case UPDATE_DIR_PACKET:
15,509✔
527
            if (PKT_IS_TOSERVER(p)) {
15,509✔
528
                p->flow->flags |= FLOW_TS_APP_UPDATED;
14,236✔
529
                SCLogDebug("pcap_cnt %" PRIu64 ", FLOW_TS_APP_UPDATED set", PcapPacketCntGet(p));
14,236✔
530
            } else {
14,236✔
531
                p->flow->flags |= FLOW_TC_APP_UPDATED;
1,273✔
532
                SCLogDebug("pcap_cnt %" PRIu64 ", FLOW_TC_APP_UPDATED set", PcapPacketCntGet(p));
1,273✔
533
            }
1,273✔
534
            break;
15,509✔
535
        case UPDATE_DIR_BOTH:
1,363✔
536
            if (PKT_IS_TOSERVER(p)) {
1,363✔
537
                p->flow->flags |= FLOW_TS_APP_UPDATED | FLOW_TC_APP_UPDATE_NEXT;
845✔
538
                SCLogDebug("pcap_cnt %" PRIu64 ", FLOW_TS_APP_UPDATED|FLOW_TC_APP_UPDATE_NEXT set",
845✔
539
                        PcapPacketCntGet(p));
845✔
540
            } else {
845✔
541
                p->flow->flags |= FLOW_TC_APP_UPDATED | FLOW_TS_APP_UPDATE_NEXT;
518✔
542
                SCLogDebug("pcap_cnt %" PRIu64 ", FLOW_TC_APP_UPDATED|FLOW_TS_APP_UPDATE_NEXT set",
518✔
543
                        PcapPacketCntGet(p));
518✔
544
            }
518✔
545
            /* fall through */
546
        case UPDATE_DIR_OPPOSING:
43,758✔
547
            if (PKT_IS_TOSERVER(p)) {
43,758✔
548
                p->flow->flags |= FLOW_TC_APP_UPDATED | FLOW_TS_APP_UPDATE_NEXT;
27,133✔
549
                SCLogDebug("pcap_cnt %" PRIu64 ", FLOW_TC_APP_UPDATED|FLOW_TS_APP_UPDATE_NEXT set",
27,133✔
550
                        PcapPacketCntGet(p));
27,133✔
551
            } else {
27,133✔
552
                p->flow->flags |= FLOW_TS_APP_UPDATED | FLOW_TC_APP_UPDATE_NEXT;
16,625✔
553
                SCLogDebug("pcap_cnt %" PRIu64 ", FLOW_TS_APP_UPDATED|FLOW_TC_APP_UPDATE_NEXT set",
16,625✔
554
                        PcapPacketCntGet(p));
16,625✔
555
            }
16,625✔
556
            break;
43,758✔
557
    }
664,876✔
558
}
664,876✔
559

560
static TmEcode FlowWorker(ThreadVars *tv, Packet *p, void *data)
561
{
1,180,258✔
562
    FlowWorkerThreadData *fw = data;
1,180,258✔
563
    DetectEngineThreadCtx *det_ctx = SC_ATOMIC_GET(fw->detect_thread);
1,180,258✔
564

565
    DEBUG_VALIDATE_BUG_ON(p == NULL);
1,180,258✔
566
    DEBUG_VALIDATE_BUG_ON(tv->flow_queue == NULL);
1,180,258✔
567

568
    SCLogDebug("packet %" PRIu64, PcapPacketCntGet(p));
1,180,258✔
569

570
    if ((PKT_IS_FLUSHPKT(p))) {
1,180,258✔
571
        SCLogDebug("thread %s flushing", tv->printable_name);
×
572
        OutputLoggerFlush(tv, p, fw->output_thread);
×
573
        /* Ack if a flush was requested */
574
        bool notset = false;
×
575
        SC_ATOMIC_CAS(&fw->flush_ack, notset, true);
×
576
        return TM_ECODE_OK;
×
577
    }
×
578

579
    /* handle Flow */
580
    if (det_ctx != NULL && det_ctx->de_ctx->PreFlowHook != NULL) {
1,180,285✔
581
        const uint8_t action = det_ctx->de_ctx->PreFlowHook(tv, det_ctx, p);
×
582
        if (action & ACTION_DROP) {
×
583
            PacketDrop(p, ACTION_DROP, PKT_DROP_REASON_FLOW_PRE_HOOK);
×
584
            goto pre_flow_drop;
×
585
        }
×
586
    }
×
587

588
    if (p->flags & PKT_WANTS_FLOW) {
1,180,258✔
589
        FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_FLOW);
1,099,818✔
590

591
        FlowHandlePacket(tv, &fw->fls, p);
1,099,818✔
592
        if (likely(p->flow != NULL)) {
1,099,818✔
593
            DEBUG_ASSERT_FLOW_LOCKED(p->flow);
1,099,776✔
594
            if (FlowUpdate(tv, fw, p) == TM_ECODE_DONE) {
1,099,776✔
595
                /* update time */
596
                if (!(PKT_IS_PSEUDOPKT(p))) {
×
597
                    TimeSetByThread(tv->id, p->ts);
×
598
                }
×
599
                goto housekeeping;
×
600
            }
×
601
        }
1,099,776✔
602
        /* Flow is now LOCKED */
603

604
        FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_FLOW);
1,099,818✔
605

606
    /* if PKT_WANTS_FLOW is not set, but PKT_HAS_FLOW is, then this is a
607
     * pseudo packet created by the flow manager. */
608
    } else if (p->flags & PKT_HAS_FLOW) {
1,118,142✔
609
        FLOWLOCK_WRLOCK(p->flow);
×
610
        DEBUG_VALIDATE_BUG_ON(p->pkt_src != PKT_SRC_FFR);
×
611
    }
×
612

613
    /* update time */
614
    if (!(PKT_IS_PSEUDOPKT(p))) {
1,180,258✔
615
        TimeSetByThread(tv->id, p->ts);
1,179,530✔
616
    }
1,179,530✔
617

618
    SCLogDebug("packet %" PRIu64 " has flow? %s", PcapPacketCntGet(p), p->flow ? "yes" : "no");
1,180,258✔
619

620
    /* handle TCP and app layer */
621
    if (p->flow) {
1,180,258✔
622
        SCLogDebug("packet %" PRIu64
1,099,810✔
623
                   ": direction %s FLOW_TS_APP_UPDATE_NEXT %s FLOW_TC_APP_UPDATE_NEXT %s",
1,099,810✔
624
                PcapPacketCntGet(p), PKT_IS_TOSERVER(p) ? "toserver" : "toclient",
1,099,810✔
625
                BOOL2STR((p->flow->flags & FLOW_TS_APP_UPDATE_NEXT) != 0),
1,099,810✔
626
                BOOL2STR((p->flow->flags & FLOW_TC_APP_UPDATE_NEXT) != 0));
1,099,810✔
627
        /* see if need to consider flags set by prev packets */
628
        if (PKT_IS_TOSERVER(p) && (p->flow->flags & FLOW_TS_APP_UPDATE_NEXT)) {
1,099,810✔
629
            p->flow->flags |= FLOW_TS_APP_UPDATED;
25,960✔
630
            p->flow->flags &= ~FLOW_TS_APP_UPDATE_NEXT;
25,960✔
631
            SCLogDebug("FLOW_TS_APP_UPDATED");
25,960✔
632
        } else if (PKT_IS_TOCLIENT(p) && (p->flow->flags & FLOW_TC_APP_UPDATE_NEXT)) {
1,073,850✔
633
            p->flow->flags |= FLOW_TC_APP_UPDATED;
15,948✔
634
            p->flow->flags &= ~FLOW_TC_APP_UPDATE_NEXT;
15,948✔
635
            SCLogDebug("FLOW_TC_APP_UPDATED");
15,948✔
636
        }
15,948✔
637

638
        if (PacketIsTCP(p)) {
1,099,810✔
639
            SCLogDebug("packet %" PRIu64 " is TCP. Direction %s", PcapPacketCntGet(p),
648,248✔
640
                    PKT_IS_TOSERVER(p) ? "TOSERVER" : "TOCLIENT");
648,248✔
641
            DEBUG_ASSERT_FLOW_LOCKED(p->flow);
648,248✔
642

643
            /* if detect is disabled, we need to apply file flags to the flow
644
             * here on the first packet. */
645
            if (det_ctx == NULL &&
648,248✔
646
                    ((PKT_IS_TOSERVER(p) && (p->flowflags & FLOW_PKT_TOSERVER_FIRST)) ||
648,248✔
647
                            (PKT_IS_TOCLIENT(p) && (p->flowflags & FLOW_PKT_TOCLIENT_FIRST)))) {
×
648
                DisableDetectFlowFileFlags(p->flow);
×
649
            }
×
650

651
            FlowWorkerStreamTCPUpdate(tv, fw, p, det_ctx, false);
648,248✔
652
            PacketAppUpdate2FlowFlags(p);
648,248✔
653

654
            /* handle the app layer part of the UDP packet payload */
655
        } else if (p->proto == IPPROTO_UDP && !PacketCheckAction(p, ACTION_DROP)) {
648,248✔
656
            FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_APPLAYERUDP);
16,763✔
657
            AppLayerHandleUdp(tv, fw->stream_thread->ra_ctx->app_tctx, p, p->flow);
16,763✔
658
            FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_APPLAYERUDP);
16,763✔
659
            PacketAppUpdate2FlowFlags(p);
16,763✔
660
        }
16,763✔
661
    }
1,099,810✔
662

663
    PacketUpdateEngineEventCounters(tv, fw->dtv, p);
1,180,258✔
664

665
    /* handle Detect */
666
    DEBUG_ASSERT_FLOW_LOCKED(p->flow);
1,180,258✔
667
    SCLogDebug("packet %" PRIu64 " calling Detect", PcapPacketCntGet(p));
1,180,258✔
668
    if (det_ctx != NULL) {
1,180,258✔
669
        FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_DETECT);
1,180,180✔
670
        Detect(tv, p, det_ctx);
1,180,180✔
671
        FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_DETECT);
1,180,180✔
672
    }
1,180,180✔
673

674
pre_flow_drop:
1,180,258✔
675
    // Outputs.
676
    OutputLoggerLog(tv, p, fw->output_thread);
1,180,124✔
677

678
    /*  Release tcp segments. Done here after alerting can use them. */
679
    if (p->flow != NULL) {
1,180,124✔
680
        DEBUG_ASSERT_FLOW_LOCKED(p->flow);
1,099,715✔
681

682
        if (FlowIsBypassed(p->flow)) {
1,099,715✔
683
            FlowCleanupAppLayer(p->flow);
×
684
            if (p->proto == IPPROTO_TCP) {
×
685
                StreamTcpSessionCleanup(p->flow->protoctx);
×
686
            }
×
687
        } else if (p->proto == IPPROTO_TCP && p->flow->protoctx && p->flags & PKT_STREAM_EST) {
1,099,715✔
688
            if ((p->flow->flags & FLOW_TS_APP_UPDATED) && PKT_IS_TOSERVER(p)) {
640,044✔
689
                FramesPrune(p->flow, p);
35,122✔
690
            } else if ((p->flow->flags & FLOW_TC_APP_UPDATED) && PKT_IS_TOCLIENT(p)) {
604,922✔
691
                FramesPrune(p->flow, p);
31,374✔
692
            }
31,374✔
693
            FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_TCPPRUNE);
640,044✔
694
            StreamTcpPruneSession(p->flow, p->flowflags & FLOW_PKT_TOSERVER ?
640,044✔
695
                    STREAM_TOSERVER : STREAM_TOCLIENT);
322,160✔
696
            FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_TCPPRUNE);
640,044✔
697
        } else if (p->proto == IPPROTO_UDP) {
640,077✔
698
            FramesPrune(p->flow, p);
16,767✔
699
        }
16,767✔
700

701
        if ((PKT_IS_PSEUDOPKT(p)) ||
1,099,715✔
702
                (p->flow->flags & (FLOW_TS_APP_UPDATED | FLOW_TC_APP_UPDATED))) {
1,099,715✔
703
            if ((p->flags & PKT_STREAM_EST) || p->proto != IPPROTO_TCP) {
89,927✔
704
                if (PKT_IS_TOSERVER(p)) {
89,869✔
705
                    if (PKT_IS_PSEUDOPKT(p) || (p->flow->flags & (FLOW_TS_APP_UPDATED))) {
51,667✔
706
                        AppLayerParserTransactionsCleanup(p->flow, STREAM_TOSERVER);
49,071✔
707
                        p->flow->flags &= ~FLOW_TS_APP_UPDATED;
49,071✔
708
                        SCLogDebug("~FLOW_TS_APP_UPDATED");
49,071✔
709
                    }
49,071✔
710
                } else {
51,667✔
711
                    if (PKT_IS_PSEUDOPKT(p) || (p->flow->flags & (FLOW_TC_APP_UPDATED))) {
38,203✔
712
                        AppLayerParserTransactionsCleanup(p->flow, STREAM_TOCLIENT);
32,493✔
713
                        p->flow->flags &= ~FLOW_TC_APP_UPDATED;
32,493✔
714
                        SCLogDebug("~FLOW_TC_APP_UPDATED");
32,493✔
715
                    }
32,493✔
716
                }
38,202✔
717
            }
89,869✔
718
        } else {
1,009,788✔
719
            SCLogDebug("not pseudo, no app update: skip");
1,009,788✔
720
        }
1,009,788✔
721

722
        if (p->flow->flags & FLOW_ACTION_DROP) {
1,099,715✔
723
            SCLogDebug("flow drop in place: remove app update flags");
×
724
            p->flow->flags &= ~(FLOW_TS_APP_UPDATED | FLOW_TC_APP_UPDATED);
×
725
        }
×
726

727
        Flow *f = p->flow;
1,099,715✔
728
        FlowDeReference(&p->flow);
1,099,715✔
729
        FLOWLOCK_UNLOCK(f);
1,099,715✔
730
    }
1,099,715✔
731

732
housekeeping:
1,180,275✔
733

734
    /* take injected flows and add them to our local queue */
735
    FlowWorkerProcessInjectedFlows(tv, fw, p);
1,180,274✔
736

737
    /* process local work queue */
738
    FlowWorkerProcessLocalFlows(tv, fw, p);
1,180,274✔
739

740
    return TM_ECODE_OK;
1,180,274✔
741
}
1,180,124✔
742

743
void FlowWorkerReplaceDetectCtx(void *flow_worker, void *detect_ctx)
744
{
×
745
    FlowWorkerThreadData *fw = flow_worker;
×
746

747
    SC_ATOMIC_SET(fw->detect_thread, detect_ctx);
×
748
}
×
749

750
void *FlowWorkerGetDetectCtxPtr(void *flow_worker)
751
{
×
752
    FlowWorkerThreadData *fw = flow_worker;
×
753

754
    return SC_ATOMIC_GET(fw->detect_thread);
×
755
}
×
756

757
void *FlowWorkerGetThreadData(void *flow_worker)
758
{
×
759
    return (FlowWorkerThreadData *)flow_worker;
×
760
}
×
761

762
bool FlowWorkerGetFlushAck(void *flow_worker)
763
{
×
764
    FlowWorkerThreadData *fw = flow_worker;
×
765
    return SC_ATOMIC_GET(fw->flush_ack) == true;
×
766
}
×
767

768
void FlowWorkerSetFlushAck(void *flow_worker)
769
{
×
770
    FlowWorkerThreadData *fw = flow_worker;
×
771
    SC_ATOMIC_SET(fw->flush_ack, false);
×
772
}
×
773

774
const char *ProfileFlowWorkerIdToString(enum ProfileFlowWorkerId fwi)
775
{
×
776
    switch (fwi) {
×
777
        case PROFILE_FLOWWORKER_FLOW:
×
778
            return "flow";
×
779
        case PROFILE_FLOWWORKER_STREAM:
×
780
            return "stream";
×
781
        case PROFILE_FLOWWORKER_APPLAYERUDP:
×
782
            return "app-layer";
×
783
        case PROFILE_FLOWWORKER_DETECT:
×
784
            return "detect";
×
785
        case PROFILE_FLOWWORKER_TCPPRUNE:
×
786
            return "tcp-prune";
×
787
        case PROFILE_FLOWWORKER_FLOW_INJECTED:
×
788
            return "flow-inject";
×
789
        case PROFILE_FLOWWORKER_FLOW_EVICTED:
×
790
            return "flow-evict";
×
791
        case PROFILE_FLOWWORKER_SIZE:
×
792
            return "size";
×
793
    }
×
794
    return "error";
×
795
}
×
796

797
static bool FlowWorkerIsBusy(ThreadVars *tv, void *flow_worker)
798
{
34,669✔
799
    FlowWorkerThreadData *fw = flow_worker;
34,669✔
800
    if (fw->pq.len)
34,669✔
801
        return true;
×
802
    if (fw->fls.work_queue.len)
34,669✔
803
        return true;
13✔
804

805
    if (tv->flow_queue) {
34,656✔
806
        FQLOCK_LOCK(tv->flow_queue);
34,656✔
807
        bool fq_done = (tv->flow_queue->qlen == 0);
34,656✔
808
        FQLOCK_UNLOCK(tv->flow_queue);
34,656✔
809
        if (!fq_done) {
34,656✔
810
            return true;
567✔
811
        }
567✔
812
    }
34,656✔
813

814
    return false;
34,089✔
815
}
34,656✔
816

817
void TmModuleFlowWorkerRegister (void)
818
{
7✔
819
    tmm_modules[TMM_FLOWWORKER].name = "FlowWorker";
7✔
820
    tmm_modules[TMM_FLOWWORKER].ThreadInit = FlowWorkerThreadInit;
7✔
821
    tmm_modules[TMM_FLOWWORKER].Func = FlowWorker;
7✔
822
    tmm_modules[TMM_FLOWWORKER].ThreadBusy = FlowWorkerIsBusy;
7✔
823
    tmm_modules[TMM_FLOWWORKER].ThreadDeinit = FlowWorkerThreadDeinit;
7✔
824
    tmm_modules[TMM_FLOWWORKER].cap_flags = 0;
7✔
825
    tmm_modules[TMM_FLOWWORKER].flags = TM_FLAG_FLOWWORKER_TM;
7✔
826
}
7✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc