• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

OISF / suricata / 22553697083

01 Mar 2026 09:58PM UTC coverage: 75.673% (+2.0%) from 73.687%
22553697083

Pull #14925

github

web-flow
Merge 288827f07 into 90823fa90
Pull Request #14925: hs: false positive coverity warning in a reference string v1

241615 of 319288 relevant lines covered (75.67%)

1333554.73 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

53.61
/src/tm-threads.h
1
/* Copyright (C) 2007-2024 Open Information Security Foundation
2
 *
3
 * You can copy, redistribute or modify this Program under the terms of
4
 * the GNU General Public License version 2 as published by the Free
5
 * Software Foundation.
6
 *
7
 * This program is distributed in the hope that it will be useful,
8
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10
 * GNU General Public License for more details.
11
 *
12
 * You should have received a copy of the GNU General Public License
13
 * version 2 along with this program; if not, write to the Free Software
14
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15
 * 02110-1301, USA.
16
 */
17

18
/**
19
 * \file
20
 *
21
 * \author Victor Julien <victor@inliniac.net>
22
 * \author Anoop Saldanha <anoopsaldanha@gmail.com>
23
 */
24

25
#ifndef SURICATA_TM_THREADS_H
26
#define SURICATA_TM_THREADS_H
27

28
#include "tmqh-packetpool.h"
29
#include "tm-threads-common.h"
30
#include "tm-modules.h"
31
#include "flow.h" // for the FlowQueue
32

33
#ifdef OS_WIN32
34
static inline void SleepUsec(uint64_t usec)
35
{
36
    uint64_t msec = 1;
37
    if (usec > 1000) {
38
        msec = usec / 1000;
39
    }
40
    Sleep(msec);
41
}
42
#define SleepMsec(msec) Sleep((msec))
43
#else
44
#define SleepUsec(usec) usleep((usec))
688,918✔
45
#define SleepMsec(msec) usleep((msec) * 1000)
22,971✔
46
#endif
47

48
#define TM_QUEUE_NAME_MAX 16
49
#define TM_THREAD_NAME_MAX 16
50

51
typedef TmEcode (*TmSlotFunc)(ThreadVars *, Packet *, void *);
52

53
typedef struct TmSlot_ {
54
    /* function pointers */
55
    union {
56
        TmSlotFunc SlotFunc;
57
        TmEcode (*PktAcqLoop)(ThreadVars *, void *, void *);
58
        TmEcode (*Management)(ThreadVars *, void *);
59
    };
60
    /** linked list of slots, used when a pipeline has multiple slots
61
     *  in a single thread. */
62
    struct TmSlot_ *slot_next;
63

64
    SC_ATOMIC_DECLARE(void *, slot_data);
65

66
    /** copy of the TmModule::flags */
67
    uint8_t tm_flags;
68

69
    /* store the thread module id */
70
    int tm_id;
71

72
    TmEcode (*SlotThreadInit)(ThreadVars *, const void *, void **);
73
    void (*SlotThreadExitPrintStats)(ThreadVars *, void *);
74
    TmEcode (*SlotThreadDeinit)(ThreadVars *, void *);
75

76
    /* data storage */
77
    const void *slot_initdata;
78

79
} TmSlot;
80

81
extern ThreadVars *tv_root[TVT_MAX];
82

83
extern SCMutex tv_root_lock;
84

85
void TmSlotSetFuncAppend(ThreadVars *, TmModule *, const void *);
86

87
ThreadVars *TmThreadCreate(const char *, const char *, const char *, const char *, const char *, const char *,
88
                           void *(fn_p)(void *), int);
89
ThreadVars *TmThreadCreatePacketHandler(const char *, const char *, const char *, const char *, const char *,
90
                                        const char *);
91
ThreadVars *TmThreadCreateMgmtThread(const char *name, void *(fn_p)(void *), int);
92
ThreadVars *TmThreadCreateMgmtThreadByName(const char *name, const char *module,
93
                                     int mucond);
94
ThreadVars *TmThreadCreateCmdThreadByName(const char *name, const char *module,
95
                                     int mucond);
96
TmEcode TmThreadSpawn(ThreadVars *);
97
TmEcode TmThreadLibSpawn(ThreadVars *);
98
int TmThreadTimeoutLoop(ThreadVars *tv, TmSlot *s);
99
void TmThreadKillThreadsFamily(int family);
100
void TmThreadKillThreads(void);
101
void TmThreadClearThreadsFamily(int family);
102
void TmThreadAppend(ThreadVars *, int);
103

104
TmEcode TmThreadSetCPUAffinity(ThreadVars *, uint16_t);
105
TmEcode TmThreadSetThreadPriority(ThreadVars *, int);
106
TmEcode TmThreadSetCPU(ThreadVars *, uint8_t);
107
TmEcode TmThreadSetupOptions(ThreadVars *);
108
void TmThreadSetPrio(ThreadVars *);
109
int TmThreadGetNbThreads(uint8_t type);
110

111
void TmThreadInitMC(ThreadVars *);
112
void TmThreadContinue(ThreadVars *);
113
void TmThreadContinueThreads(void);
114
void TmThreadCheckThreadState(void);
115
TmEcode TmThreadWaitOnThreadInit(void);
116

117
int TmThreadsCheckFlag(ThreadVars *, uint32_t);
118
void TmThreadsSetFlag(ThreadVars *, uint32_t);
119
void TmThreadsUnsetFlag(ThreadVars *, uint32_t);
120
void TmThreadWaitForFlag(ThreadVars *, uint32_t);
121

122
TmEcode TmThreadsSlotVarRun (ThreadVars *tv, Packet *p, TmSlot *slot);
123

124
void TmThreadDisablePacketThreads(
125
        const uint16_t set, const uint16_t check, const uint8_t module_flags);
126
void TmThreadDisableReceiveThreads(void);
127

128
uint32_t TmThreadCountThreadsByTmmFlags(uint8_t flags);
129

130
TmEcode TmThreadWaitOnThreadRunning(void);
131

132
TmEcode TmThreadsProcessDecodePseudoPackets(
133
        ThreadVars *tv, PacketQueueNoLock *decode_pq, TmSlot *slot);
134

135
static inline void TmThreadsCleanDecodePQ(PacketQueueNoLock *pq)
136
{
×
137
    while (1) {
×
138
        Packet *p = PacketDequeueNoLock(pq);
×
139
        if (unlikely(p == NULL))
×
140
            break;
×
141
        TmqhOutputPacketpool(NULL, p);
×
142
    }
×
143
}
×
144

145
static inline void TmThreadsSlotProcessPktFail(ThreadVars *tv, Packet *p)
146
{
×
147
    if (p != NULL) {
×
148
        TmqhOutputPacketpool(tv, p);
×
149
    }
×
150
    TmThreadsCleanDecodePQ(&tv->decode_pq);
×
151
    if (tv->stream_pq_local) {
×
152
        SCMutexLock(&tv->stream_pq_local->mutex_q);
×
153
        TmqhReleasePacketsToPacketPool(tv->stream_pq_local);
×
154
        SCMutexUnlock(&tv->stream_pq_local->mutex_q);
×
155
    }
×
156
    TmThreadsSetFlag(tv, THV_FAILED);
×
157
}
×
158

159
/**
160
 *  \brief Handle timeout from the capture layer. Checks
161
 *         stream_pq which may have been filled by the flow
162
 *         manager.
163
 *  \param s pipeline to run on these packets.
164
 */
165
static inline bool TmThreadsHandleInjectedPackets(ThreadVars *tv)
166
{
110,823,578✔
167
    PacketQueue *pq = tv->stream_pq_local;
110,823,578✔
168
    if (pq && pq->len > 0) {
113,348,901✔
169
        while (1) {
×
170
            SCMutexLock(&pq->mutex_q);
×
171
            Packet *extra_p = PacketDequeue(pq);
×
172
            SCMutexUnlock(&pq->mutex_q);
×
173
            if (extra_p == NULL)
×
174
                break;
×
175
#ifdef DEBUG_VALIDATION
176
            BUG_ON(extra_p->flow != NULL);
177
#endif
178
            TmEcode r = TmThreadsSlotVarRun(tv, extra_p, tv->tm_flowworker);
×
179
            if (r == TM_ECODE_FAILED) {
×
180
                TmThreadsSlotProcessPktFail(tv, extra_p);
×
181
                break;
×
182
            }
×
183
            tv->tmqh_out(tv, extra_p);
×
184
        }
×
185
        return true;
×
186
    } else {
110,823,578✔
187
        return false;
110,823,578✔
188
    }
110,823,578✔
189
}
110,823,578✔
190

191
/**
192
 *  \brief Process the rest of the functions (if any) and queue.
193
 */
194
static inline TmEcode TmThreadsSlotProcessPkt(ThreadVars *tv, TmSlot *s, Packet *p)
195
{
118,776,976✔
196
    if (s == NULL) {
118,776,976✔
197
        tv->tmqh_out(tv, p);
31,708✔
198
        return TM_ECODE_OK;
31,708✔
199
    }
31,708✔
200

201
    TmEcode r = TmThreadsSlotVarRun(tv, p, s);
118,745,268✔
202
    if (unlikely(r == TM_ECODE_FAILED)) {
118,745,268✔
203
        TmThreadsSlotProcessPktFail(tv, p);
×
204
        return TM_ECODE_FAILED;
×
205
    }
×
206

207
    tv->tmqh_out(tv, p);
118,745,268✔
208

209
    TmThreadsHandleInjectedPackets(tv);
118,745,268✔
210

211
    return TM_ECODE_OK;
118,745,268✔
212
}
118,745,268✔
213

214
/** \brief inject packet if THV_CAPTURE_INJECT_PKT is set
215
 *  Allow caller to supply their own packet
216
 *
217
 *  Meant for detect reload process that interrupts an sleeping capture thread
218
 *  to force a packet through the engine to complete a reload */
219
static inline void TmThreadsCaptureInjectPacket(ThreadVars *tv, Packet *p)
220
{
9✔
221
    TmThreadsUnsetFlag(tv, THV_CAPTURE_INJECT_PKT);
9✔
222
    if (p == NULL)
9✔
223
        p = PacketGetFromQueueOrAlloc();
9✔
224
    if (p != NULL) {
9✔
225
        p->flags |= PKT_PSEUDO_STREAM_END;
9✔
226
        PKT_SET_SRC(p, PKT_SRC_CAPTURE_TIMEOUT);
9✔
227
        if (TmThreadsSlotProcessPkt(tv, tv->tm_flowworker, p) != TM_ECODE_OK) {
9✔
228
            TmqhOutputPacketpool(tv, p);
×
229
        }
×
230
    }
9✔
231
}
9✔
232

233
/** \brief handle capture timeout
234
 *  When a capture method times out we check for house keeping
235
 *  tasks in the capture thread.
236
 *
237
 *  \param p packet. Capture method may have taken a packet from
238
 *           the pool prior to the timing out call. We will then
239
 *           use that packet. Otherwise we can get our own.
240
 */
241
static inline void TmThreadsCaptureHandleTimeout(ThreadVars *tv, Packet *p)
242
{
7,934✔
243
    if (TmThreadsCheckFlag(tv, THV_CAPTURE_INJECT_PKT)) {
7,934✔
244
        TmThreadsCaptureInjectPacket(tv, p); /* consumes 'p' */
9✔
245
        return;
9✔
246

247
    } else {
7,925✔
248
        if (!TmThreadsHandleInjectedPackets(tv)) {
7,925✔
249
            /* see if we have to do some house keeping */
250
            if (tv->flow_queue && SC_ATOMIC_GET(tv->flow_queue->non_empty)) {
7,925✔
251
                TmThreadsCaptureInjectPacket(tv, p); /* consumes 'p' */
×
252
                return;
×
253
            }
×
254
        }
7,925✔
255
    }
7,925✔
256

257
    /* packet could have been passed to us that we won't use
258
     * return it to the pool. */
259
    if (p != NULL)
7,925✔
260
        tv->tmqh_out(tv, p);
×
261
}
7,925✔
262

263
static inline void TmThreadsCaptureBreakLoop(ThreadVars *tv)
264
{
9✔
265
    if ((tv->tmm_flags & TM_FLAG_RECEIVE_TM) == 0) {
9✔
266
        return;
×
267
    }
×
268
    /* find the correct slot */
269
    TmSlot *s = tv->tm_slots;
9✔
270
    TmModule *tm = TmModuleGetById(s->tm_id);
9✔
271
    if (tm->flags & TM_FLAG_RECEIVE_TM) {
9✔
272
        /* if the method supports it, BreakLoop. Otherwise we rely on
273
         * the capture method's recv timeout */
274
        if (tm->PktAcqLoop && tm->PktAcqBreakLoop) {
9✔
275
            tm->PktAcqBreakLoop(tv, SC_ATOMIC_GET(s->slot_data));
1✔
276
        }
1✔
277
        TmThreadsSetFlag(tv, THV_CAPTURE_INJECT_PKT);
9✔
278
    }
9✔
279
}
9✔
280

281
void TmThreadsSealThreads(void);
282
void TmThreadsUnsealThreads(void);
283
void TmThreadsListThreads(void);
284
int TmThreadsRegisterThread(ThreadVars *tv, const int type);
285
void TmThreadsUnregisterThread(const int id);
286
void TmThreadsInjectFlowById(Flow *f, const int id);
287

288
void TmThreadsInitThreadsTimestamp(const SCTime_t ts);
289
void TmThreadsSetThreadTimestamp(const int id, const SCTime_t ts);
290
void TmThreadsGetMinimalTimestamp(struct timeval *ts);
291
SCTime_t TmThreadsGetThreadTime(const int idx);
292
uint16_t TmThreadsGetWorkerThreadMax(void);
293
bool TmThreadsTimeSubsysIsReady(void);
294
bool SCTmThreadsSlotPacketLoopFinish(ThreadVars *tv);
295

296
/** \brief Wait for a thread to become unpaused.
297
 *
298
 * Check if a thread should wait to be unpaused and wait if so, or
299
 * until the thread kill flag is set.
300
 *
301
 * \returns true if the thread was unpaused, false if killed.
302
 */
303
bool TmThreadsWaitForUnpause(ThreadVars *tv);
304

305
#endif /* SURICATA_TM_THREADS_H */
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc