• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

OISF / suricata / 23350122333

20 Mar 2026 03:33PM UTC coverage: 76.492% (-2.8%) from 79.315%
23350122333

Pull #15053

github

web-flow
Merge f5bf69f97 into 6587e363a
Pull Request #15053: Flow queue/v3

113 of 129 new or added lines in 9 files covered. (87.6%)

9534 existing lines in 453 files now uncovered.

256601 of 335461 relevant lines covered (76.49%)

4680806.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.03
/src/tm-threads.h
1
/* Copyright (C) 2007-2024 Open Information Security Foundation
2
 *
3
 * You can copy, redistribute or modify this Program under the terms of
4
 * the GNU General Public License version 2 as published by the Free
5
 * Software Foundation.
6
 *
7
 * This program is distributed in the hope that it will be useful,
8
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10
 * GNU General Public License for more details.
11
 *
12
 * You should have received a copy of the GNU General Public License
13
 * version 2 along with this program; if not, write to the Free Software
14
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15
 * 02110-1301, USA.
16
 */
17

18
/**
19
 * \file
20
 *
21
 * \author Victor Julien <victor@inliniac.net>
22
 * \author Anoop Saldanha <anoopsaldanha@gmail.com>
23
 */
24

25
#ifndef SURICATA_TM_THREADS_H
26
#define SURICATA_TM_THREADS_H
27

28
#include "tmqh-packetpool.h"
29
#include "tm-threads-common.h"
30
#include "tm-queuehandlers.h"
31
#include "tm-modules.h"
32
#include "flow.h" // for the FlowQueue
33

34
#ifdef OS_WIN32
35
static inline void SleepUsec(uint64_t usec)
36
{
37
    uint64_t msec = 1;
38
    if (usec > 1000) {
39
        msec = usec / 1000;
40
    }
41
    Sleep(msec);
42
}
43
#define SleepMsec(msec) Sleep((msec))
44
#else
45
#define SleepUsec(usec) usleep((usec))
8,024,490✔
46
#define SleepMsec(msec) usleep((msec) * 1000)
83,230✔
47
#endif
48

49
#define TM_QUEUE_NAME_MAX 16
50
#define TM_THREAD_NAME_MAX 16
51

52
typedef TmEcode (*TmSlotFunc)(ThreadVars *, Packet *, void *);
53

54
typedef struct TmSlot_ {
55
    /* function pointers */
56
    union {
57
        TmSlotFunc SlotFunc;
58
        TmEcode (*PktAcqLoop)(ThreadVars *, void *, void *);
59
        TmEcode (*Management)(ThreadVars *, void *);
60
    };
61
    /** linked list of slots, used when a pipeline has multiple slots
62
     *  in a single thread. */
63
    struct TmSlot_ *slot_next;
64

65
    SC_ATOMIC_DECLARE(void *, slot_data);
66

67
    /** copy of the TmModule::flags */
68
    uint8_t tm_flags;
69

70
    /* store the thread module id */
71
    int tm_id;
72

73
    TmEcode (*SlotThreadInit)(ThreadVars *, const void *, void **);
74
    void (*SlotThreadExitPrintStats)(ThreadVars *, void *);
75
    TmEcode (*SlotThreadDeinit)(ThreadVars *, void *);
76

77
    /* data storage */
78
    const void *slot_initdata;
79

80
} TmSlot;
81

82
extern ThreadVars *tv_root[TVT_MAX];
83

84
extern SCMutex tv_root_lock;
85

86
void TmSlotSetFuncAppend(ThreadVars *, TmModule *, const void *);
87

88
ThreadVars *TmThreadCreate(const char *, const char *, const char *, const char *, const char *, const char *,
89
                           void *(fn_p)(void *), int);
90
ThreadVars *TmThreadCreatePacketHandler(const char *, const char *, const char *, const char *, const char *,
91
                                        const char *);
92
ThreadVars *TmThreadCreateMgmtThread(const char *name, void *(fn_p)(void *), int);
93
ThreadVars *TmThreadCreateMgmtThreadByName(const char *name, const char *module,
94
                                     int mucond);
95
ThreadVars *TmThreadCreateCmdThreadByName(const char *name, const char *module,
96
                                     int mucond);
97
TmEcode TmThreadSpawn(ThreadVars *);
98
TmEcode TmThreadLibSpawn(ThreadVars *);
99
int TmThreadTimeoutLoop(ThreadVars *tv, TmSlot *s);
100
void TmThreadKillThreadsFamily(int family);
101
void TmThreadKillThreads(void);
102
void TmThreadClearThreadsFamily(int family);
103
void TmThreadAppend(ThreadVars *, int);
104

105
TmEcode TmThreadSetCPUAffinity(ThreadVars *, uint16_t);
106
TmEcode TmThreadSetThreadPriority(ThreadVars *, int);
107
TmEcode TmThreadSetCPU(ThreadVars *, uint8_t);
108
TmEcode TmThreadSetupOptions(ThreadVars *);
109
void TmThreadSetPrio(ThreadVars *);
110
int TmThreadGetNbThreads(uint8_t type);
111

112
void TmThreadInitMC(ThreadVars *);
113
void TmThreadContinue(ThreadVars *);
114
void TmThreadContinueThreads(void);
115
void TmThreadCheckThreadState(void);
116
TmEcode TmThreadWaitOnThreadInit(void);
117

118
int TmThreadsCheckFlag(ThreadVars *, uint32_t);
119
void TmThreadsSetFlag(ThreadVars *, uint32_t);
120
void TmThreadsUnsetFlag(ThreadVars *, uint32_t);
121
void TmThreadWaitForFlag(ThreadVars *, uint32_t);
122

123
TmEcode TmThreadsSlotVarRun (ThreadVars *tv, Packet *p, TmSlot *slot);
124

125
void TmThreadDisablePacketThreads(
126
        const uint16_t set, const uint16_t check, const uint8_t module_flags);
127
void TmThreadDisableReceiveThreads(void);
128

129
uint32_t TmThreadCountThreadsByTmmFlags(uint8_t flags);
130

131
TmEcode TmThreadWaitOnThreadRunning(void);
132

133
TmEcode TmThreadsProcessDecodePseudoPackets(
134
        ThreadVars *tv, PacketQueueNoLock *decode_pq, TmSlot *slot);
135

136
static inline void TmThreadsCleanDecodePQ(PacketQueueNoLock *pq)
137
{
×
138
    while (1) {
×
139
        Packet *p = PacketDequeueNoLock(pq);
×
140
        if (unlikely(p == NULL))
×
141
            break;
×
142
        TmqhOutputPacketpool(NULL, p);
×
143
    }
×
144
}
×
145

146
static inline void TmThreadsSlotProcessPktFail(ThreadVars *tv, Packet *p)
147
{
×
148
    if (p != NULL) {
×
149
        TmqhOutputPacketpool(tv, p);
×
150
    }
×
151
    TmThreadsCleanDecodePQ(&tv->decode_pq);
×
152
    if (tv->stream_pq_local) {
×
153
        SCMutexLock(&tv->stream_pq_local->mutex_q);
×
154
        TmqhReleasePacketsToPacketPool(tv->stream_pq_local);
×
155
        SCMutexUnlock(&tv->stream_pq_local->mutex_q);
×
156
    }
×
157
    TmThreadsSetFlag(tv, THV_FAILED);
×
158
}
×
159

160
/**
161
 *  \brief Handle timeout from the capture layer. Checks
162
 *         stream_pq which may have been filled by the flow
163
 *         manager.
164
 *  \param s pipeline to run on these packets.
165
 */
166
static inline bool TmThreadsHandleInjectedPackets(ThreadVars *tv)
167
{
127,706,143✔
168
    PacketQueue *pq = tv->stream_pq_local;
127,706,143✔
169
    if (pq && pq->len > 0) {
129,548,355✔
170
        while (1) {
6✔
171
            SCMutexLock(&pq->mutex_q);
6✔
172
            Packet *extra_p = PacketDequeue(pq);
6✔
173
            SCMutexUnlock(&pq->mutex_q);
6✔
174
            if (extra_p == NULL)
6✔
175
                break;
3✔
176
#ifdef DEBUG_VALIDATION
177
            BUG_ON(extra_p->flow != NULL);
178
#endif
179
            TmEcode r = TmThreadsSlotVarRun(tv, extra_p, tv->tm_flowworker);
3✔
180
            if (r == TM_ECODE_FAILED) {
3✔
181
                TmThreadsSlotProcessPktFail(tv, extra_p);
×
182
                break;
×
183
            }
×
184
            tv->TmqhOutFn(tv, extra_p);
3✔
185
        }
3✔
186
        return true;
3✔
187
    } else {
127,706,140✔
188
        return false;
127,706,140✔
189
    }
127,706,140✔
190
}
127,706,143✔
191

192
/**
193
 *  \brief Process the rest of the functions (if any) and queue.
194
 */
195
static inline TmEcode TmThreadsSlotProcessPkt(ThreadVars *tv, TmSlot *s, Packet *p)
196
{
128,143,569✔
197
    if (s == NULL) {
128,143,569✔
198
        tv->TmqhOutFn(tv, p);
3,096✔
199
        return TM_ECODE_OK;
3,096✔
200
    }
3,096✔
201

202
    TmEcode r = TmThreadsSlotVarRun(tv, p, s);
128,140,473✔
203
    if (unlikely(r == TM_ECODE_FAILED)) {
128,140,473✔
204
        TmThreadsSlotProcessPktFail(tv, p);
×
205
        return TM_ECODE_FAILED;
×
206
    }
×
207

208
    tv->TmqhOutFn(tv, p);
128,140,473✔
209

210
    TmThreadsHandleInjectedPackets(tv);
128,140,473✔
211

212
    return TM_ECODE_OK;
128,140,473✔
213
}
128,140,473✔
214

215
/** \brief inject packet if THV_CAPTURE_INJECT_PKT is set
216
 *  Allow caller to supply their own packet
217
 *
218
 *  Meant for detect reload process that interrupts an sleeping capture thread
219
 *  to force a packet through the engine to complete a reload */
220
static inline void TmThreadsCaptureInjectPacket(ThreadVars *tv, Packet *p)
221
{
16✔
222
    TmThreadsUnsetFlag(tv, THV_CAPTURE_INJECT_PKT);
16✔
223
    if (p == NULL)
16✔
224
        p = PacketGetFromQueueOrAlloc();
16✔
225
    if (p != NULL) {
16✔
226
        p->flags |= PKT_PSEUDO_STREAM_END;
16✔
227
        PKT_SET_SRC(p, PKT_SRC_CAPTURE_TIMEOUT);
16✔
228
        if (TmThreadsSlotProcessPkt(tv, tv->tm_flowworker, p) != TM_ECODE_OK) {
16✔
229
            TmqhOutputPacketpool(tv, p);
×
230
        }
×
231
    }
16✔
232
}
16✔
233

234
/** \brief handle capture timeout
235
 *  When a capture method times out we check for house keeping
236
 *  tasks in the capture thread.
237
 *
238
 *  \param p packet. Capture method may have taken a packet from
239
 *           the pool prior to the timing out call. We will then
240
 *           use that packet. Otherwise we can get our own.
241
 */
242
static inline void TmThreadsCaptureHandleTimeout(ThreadVars *tv, Packet *p)
243
{
22,942✔
244
    if (TmThreadsCheckFlag(tv, THV_CAPTURE_INJECT_PKT)) {
22,942✔
245
        TmThreadsCaptureInjectPacket(tv, p); /* consumes 'p' */
9✔
246
        return;
9✔
247

248
    } else {
22,933✔
249
        if (!TmThreadsHandleInjectedPackets(tv)) {
22,967✔
250
            /* see if we have to do some house keeping */
251
            if (tv->flow_queue && SC_ATOMIC_GET(tv->flow_queue->non_empty)) {
22,967✔
252
                TmThreadsCaptureInjectPacket(tv, p); /* consumes 'p' */
7✔
253
                return;
7✔
254
            }
7✔
255
        }
22,967✔
256
    }
22,933✔
257

258
    /* packet could have been passed to us that we won't use
259
     * return it to the pool. */
260
    if (p != NULL)
22,926✔
NEW
261
        tv->TmqhOutFn(tv, p);
×
262
}
22,926✔
263

264
static inline void TmThreadsCaptureBreakLoop(ThreadVars *tv)
265
{
9✔
266
    if ((tv->tmm_flags & TM_FLAG_RECEIVE_TM) == 0) {
9✔
267
        return;
×
268
    }
×
269
    /* find the correct slot */
270
    TmSlot *s = tv->tm_slots;
9✔
271
    TmModule *tm = TmModuleGetById(s->tm_id);
9✔
272
    if (tm->flags & TM_FLAG_RECEIVE_TM) {
9✔
273
        /* if the method supports it, BreakLoop. Otherwise we rely on
274
         * the capture method's recv timeout */
275
        if (tm->PktAcqLoop && tm->PktAcqBreakLoop) {
9✔
276
            tm->PktAcqBreakLoop(tv, SC_ATOMIC_GET(s->slot_data));
1✔
277
        }
1✔
278
        TmThreadsSetFlag(tv, THV_CAPTURE_INJECT_PKT);
9✔
279
    }
9✔
280
}
9✔
281

282
static inline void TmThreadFlushOutQueue(ThreadVars *tv)
283
{
61,403✔
284
    if (tv->outctx != NULL && tv->outq_id != TMQH_NOT_SET) {
61,403✔
285
        Tmqh *qh = TmqhGetQueueHandlerByID(tv->outq_id);
59,368✔
286
        if (qh != NULL) {
59,388✔
287
            if (qh->OutFlush != NULL) {
59,405✔
288
                qh->OutFlush(tv);
59,405✔
289
            }
59,405✔
290
        }
59,388✔
291
    }
59,368✔
292
}
61,403✔
293

294
void TmThreadsSealThreads(void);
295
void TmThreadsUnsealThreads(void);
296
void TmThreadsListThreads(void);
297
int TmThreadsRegisterThread(ThreadVars *tv, const int type);
298
void TmThreadsUnregisterThread(const int id);
299
void TmThreadsInjectFlowById(Flow *f, const int id);
300

301
void TmThreadsInitThreadsTimestamp(const SCTime_t ts);
302
void TmThreadsSetThreadTimestamp(const int id, const SCTime_t ts);
303
void TmThreadsGetMinimalTimestamp(struct timeval *ts);
304
SCTime_t TmThreadsGetThreadTime(const int idx);
305
uint16_t TmThreadsGetWorkerThreadMax(void);
306
bool TmThreadsTimeSubsysIsReady(void);
307
bool SCTmThreadsSlotPacketLoopFinish(ThreadVars *tv);
308

309
/** \brief Wait for a thread to become unpaused.
310
 *
311
 * Check if a thread should wait to be unpaused and wait if so, or
312
 * until the thread kill flag is set.
313
 *
314
 * \returns true if the thread was unpaused, false if killed.
315
 */
316
bool TmThreadsWaitForUnpause(ThreadVars *tv);
317

318
#endif /* SURICATA_TM_THREADS_H */
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc