• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

OISF / suricata / 23350122333

20 Mar 2026 03:33PM UTC coverage: 76.492% (-2.8%) from 79.315%
23350122333

Pull #15053

github

web-flow
Merge f5bf69f97 into 6587e363a
Pull Request #15053: Flow queue/v3

113 of 129 new or added lines in 9 files covered. (87.6%)

9534 existing lines in 453 files now uncovered.

256601 of 335461 relevant lines covered (76.49%)

4680806.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.19
/src/tm-threads.c
1
/* Copyright (C) 2007-2024 Open Information Security Foundation
2
 *
3
 * You can copy, redistribute or modify this Program under the terms of
4
 * the GNU General Public License version 2 as published by the Free
5
 * Software Foundation.
6
 *
7
 * This program is distributed in the hope that it will be useful,
8
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10
 * GNU General Public License for more details.
11
 *
12
 * You should have received a copy of the GNU General Public License
13
 * version 2 along with this program; if not, write to the Free Software
14
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15
 * 02110-1301, USA.
16
 */
17

18
/**
19
 * \file
20
 *
21
 * \author Victor Julien <victor@inliniac.net>
22
 * \author Anoop Saldanha <anoopsaldanha@gmail.com>
23
 * \author Eric Leblond <eric@regit.org>
24
 *
25
 * Thread management functions.
26
 */
27

28
#include "suricata-common.h"
29
#include "suricata.h"
30
#include "stream.h"
31
#include "runmodes.h"
32
#include "thread-callbacks.h"
33
#include "threadvars.h"
34
#include "thread-storage.h"
35
#include "tm-queues.h"
36
#include "tm-queuehandlers.h"
37
#include "tm-threads.h"
38
#include "capture-hooks.h"
39
#include "tmqh-packetpool.h"
40
#include "threads.h"
41
#include "util-affinity.h"
42
#include "util-debug.h"
43
#include "util-privs.h"
44
#include "util-cpu.h"
45
#include "util-optimize.h"
46
#include "util-profiling.h"
47
#include "util-signal.h"
48
#include "queue.h"
49
#include "util-validate.h"
50
#include "source-pcap-file-helper.h"
51

52
#ifdef PROFILE_LOCKING
53
thread_local uint64_t mutex_lock_contention;
54
thread_local uint64_t mutex_lock_wait_ticks;
55
thread_local uint64_t mutex_lock_cnt;
56

57
thread_local uint64_t spin_lock_contention;
58
thread_local uint64_t spin_lock_wait_ticks;
59
thread_local uint64_t spin_lock_cnt;
60

61
thread_local uint64_t rww_lock_contention;
62
thread_local uint64_t rww_lock_wait_ticks;
63
thread_local uint64_t rww_lock_cnt;
64

65
thread_local uint64_t rwr_lock_contention;
66
thread_local uint64_t rwr_lock_wait_ticks;
67
thread_local uint64_t rwr_lock_cnt;
68
#endif
69

70
#ifdef OS_FREEBSD
71
#include <sched.h>
72
#include <sys/param.h>
73
#include <sys/resource.h>
74
#include <sys/cpuset.h>
75
#include <sys/thr.h>
76
#define cpu_set_t cpuset_t
77
#endif /* OS_FREEBSD */
78

79
/* prototypes */
80
static int SetCPUAffinity(uint16_t cpu);
81
static void TmThreadDeinitMC(ThreadVars *tv);
82

83
/* root of the threadvars list */
84
ThreadVars *tv_root[TVT_MAX] = { NULL };
85

86
/* lock to protect tv_root */
87
SCMutex tv_root_lock = SCMUTEX_INITIALIZER;
88

89
/**
90
 * \brief Check if a thread flag is set.
91
 *
92
 * \retval 1 flag is set.
93
 * \retval 0 flag is not set.
94
 */
95
int TmThreadsCheckFlag(ThreadVars *tv, uint32_t flag)
96
{
3,043,109,206✔
97
    return (SC_ATOMIC_GET(tv->flags) & flag) ? 1 : 0;
3,043,109,206✔
98
}
3,043,109,206✔
99

100
/**
101
 * \brief Set a thread flag.
102
 */
103
void TmThreadsSetFlag(ThreadVars *tv, uint32_t flag)
104
{
693,062✔
105
    SC_ATOMIC_OR(tv->flags, flag);
693,062✔
106
}
693,062✔
107

108
/**
109
 * \brief Unset a thread flag.
110
 */
111
void TmThreadsUnsetFlag(ThreadVars *tv, uint32_t flag)
112
{
58,866✔
113
    SC_ATOMIC_AND(tv->flags, ~flag);
58,866✔
114
}
58,866✔
115

116
TmEcode TmThreadsProcessDecodePseudoPackets(
117
        ThreadVars *tv, PacketQueueNoLock *decode_pq, TmSlot *slot)
118
{
120,942,966✔
119
    while (decode_pq->top != NULL) {
120,946,113✔
120
        Packet *extra_p = PacketDequeueNoLock(decode_pq);
3,147✔
121
        if (unlikely(extra_p == NULL))
3,147✔
122
            continue;
×
123
        DEBUG_VALIDATE_BUG_ON(extra_p->flow != NULL);
3,147✔
124

125
        if (TmThreadsSlotProcessPkt(tv, slot, extra_p) != TM_ECODE_OK) {
3,147✔
126
            SCReturnInt(TM_ECODE_FAILED);
×
127
        }
×
128
    }
3,147✔
129
    SCReturnInt(TM_ECODE_OK);
120,942,966✔
130
}
120,942,966✔
131

132
/**
133
 * \brief Separate run function so we can call it recursively.
134
 */
135
TmEcode TmThreadsSlotVarRun(ThreadVars *tv, Packet *p, TmSlot *slot)
136
{
129,428,903✔
137
    for (TmSlot *s = slot; s != NULL; s = s->slot_next) {
493,221,372✔
138
        PACKET_PROFILING_TMM_START(p, s->tm_id);
363,792,469✔
139
        TmEcode r = s->SlotFunc(tv, p, SC_ATOMIC_GET(s->slot_data));
363,792,469✔
140
        PACKET_PROFILING_TMM_END(p, s->tm_id);
363,792,469✔
141
        DEBUG_VALIDATE_BUG_ON(p->flow != NULL);
363,792,469✔
142

143
        /* handle error */
144
        if (unlikely(r == TM_ECODE_FAILED)) {
363,792,469✔
145
            /* Encountered error.  Return packets to packetpool and return */
146
            TmThreadsSlotProcessPktFail(tv, NULL);
×
147
            return TM_ECODE_FAILED;
×
148
        }
×
149
        if (s->tm_flags & TM_FLAG_DECODE_TM) {
363,792,469✔
150
            if (TmThreadsProcessDecodePseudoPackets(tv, &tv->decode_pq, s->slot_next) !=
121,342,308✔
151
                    TM_ECODE_OK) {
121,342,308✔
152
                return TM_ECODE_FAILED;
×
153
            }
×
154
        }
121,342,308✔
155
    }
363,792,469✔
156

157
    return TM_ECODE_OK;
129,428,903✔
158
}
129,428,903✔
159

160
/** \internal
161
 *
162
 *  \brief Process flow timeout packets
163
 *
164
 *  Process flow timeout pseudo packets. During shutdown this loop
165
 *  is run until the flow engine kills the thread and the queue is
166
 *  empty.
167
 */
168
int TmThreadTimeoutLoop(ThreadVars *tv, TmSlot *s)
169
{
16,977✔
170
    TmSlot *fw_slot = tv->tm_flowworker;
16,977✔
171
    int r = TM_ECODE_OK;
16,977✔
172

173
    if (tv->stream_pq == NULL || fw_slot == NULL) {
16,977✔
174
        SCLogDebug("not running TmThreadTimeoutLoop %p/%p", tv->stream_pq, fw_slot);
3,380✔
175
        return r;
3,380✔
176
    }
3,380✔
177

178
    SCLogDebug("flow end loop starting");
13,597✔
179
    while (1) {
2,117,539✔
180
        SCMutexLock(&tv->stream_pq->mutex_q);
2,117,539✔
181
        uint32_t len = tv->stream_pq->len;
2,117,539✔
182
        SCMutexUnlock(&tv->stream_pq->mutex_q);
2,117,539✔
183
        if (len > 0) {
2,117,539✔
184
            while (len--) {
2,816✔
185
                SCMutexLock(&tv->stream_pq->mutex_q);
1,408✔
186
                Packet *p = PacketDequeue(tv->stream_pq);
1,408✔
187
                SCMutexUnlock(&tv->stream_pq->mutex_q);
1,408✔
188
                if (likely(p)) {
1,408✔
189
                    DEBUG_VALIDATE_BUG_ON(p->flow != NULL);
1,408✔
190
                    r = TmThreadsSlotProcessPkt(tv, fw_slot, p);
1,408✔
191
                    if (r == TM_ECODE_FAILED) {
1,408✔
192
                        break;
×
193
                    }
×
194
                }
1,408✔
195
            }
1,408✔
196
        } else {
2,116,131✔
197
            if (TmThreadsCheckFlag(tv, THV_KILL)) {
2,116,131✔
198
                break;
13,597✔
199
            }
13,597✔
200
            SleepUsec(1);
2,102,534✔
201
        }
2,102,534✔
202
    }
2,117,539✔
203
    SCLogDebug("flow end loop complete");
13,597✔
204
    StatsSyncCounters(&tv->stats);
13,597✔
205

206
    return r;
13,597✔
207
}
16,977✔
208

209
static bool TmThreadsSlotPktAcqLoopInit(ThreadVars *tv)
210
{
3,472✔
211
    TmSlot *s = tv->tm_slots;
3,472✔
212

213
    SCSetThreadName(tv->name);
3,472✔
214

215
    if (tv->thread_setup_flags != 0)
3,472✔
216
        TmThreadSetupOptions(tv);
24✔
217

218
    CaptureStatsSetup(tv);
3,472✔
219
    PacketPoolInit();
3,472✔
220

221
    for (TmSlot *slot = s; slot != NULL; slot = slot->slot_next) {
10,564✔
222
        if (slot->SlotThreadInit != NULL) {
7,092✔
223
            void *slot_data = NULL;
7,038✔
224
            TmEcode r = slot->SlotThreadInit(tv, slot->slot_initdata, &slot_data);
7,038✔
225
            if (r != TM_ECODE_OK) {
7,038✔
226
                if (r == TM_ECODE_DONE) {
×
227
                    EngineDone();
×
228
                    TmThreadsSetFlag(tv, THV_CLOSED | THV_INIT_DONE | THV_RUNNING_DONE);
×
229
                    goto error;
×
230
                } else {
×
231
                    TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE);
×
232
                    goto error;
×
233
                }
×
234
            }
×
235
            (void)SC_ATOMIC_SET(slot->slot_data, slot_data);
7,038✔
236
        }
7,038✔
237

238
        /* if the flowworker module is the first, get the threads input queue */
239
        if (slot == (TmSlot *)tv->tm_slots && (slot->tm_id == TMM_FLOWWORKER)) {
7,092✔
240
            tv->stream_pq = tv->inq->pq;
×
241
            tv->tm_flowworker = slot;
×
242
            SCLogDebug("pre-stream packetqueue %p (inq)", tv->stream_pq);
×
243
            tv->flow_queue = FlowQueueNew();
×
244
            if (tv->flow_queue == NULL) {
×
245
                TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE);
×
246
                goto error;
×
247
            }
×
248
        /* setup a queue */
249
        } else if (slot->tm_id == TMM_FLOWWORKER) {
7,092✔
250
            tv->stream_pq_local = SCCalloc(1, sizeof(PacketQueue));
93✔
251
            if (tv->stream_pq_local == NULL)
93✔
252
                FatalError("failed to alloc PacketQueue");
×
253
            SCMutexInit(&tv->stream_pq_local->mutex_q, NULL);
93✔
254
            tv->stream_pq = tv->stream_pq_local;
93✔
255
            tv->tm_flowworker = slot;
93✔
256
            SCLogDebug("pre-stream packetqueue %p (local)", tv->stream_pq);
93✔
257
            tv->flow_queue = FlowQueueNew();
93✔
258
            if (tv->flow_queue == NULL) {
93✔
259
                TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE);
×
260
                goto error;
×
261
            }
×
262
        }
93✔
263
    }
7,092✔
264

265
    StatsSetupPrivate(&tv->stats, tv->printable_name ? tv->printable_name : tv->name);
3,472✔
266

267
    TmThreadsSetFlag(tv, THV_INIT_DONE);
3,472✔
268

269
    return true;
3,472✔
270

271
error:
×
272
    return false;
×
273
}
3,472✔
274

275
bool SCTmThreadsSlotPacketLoopFinish(ThreadVars *tv)
276
{
16,977✔
277
    TmSlot *s = tv->tm_slots;
16,977✔
278
    bool rc = true;
16,977✔
279

280
    StatsSyncCounters(&tv->stats);
16,977✔
281

282
    TmThreadsSetFlag(tv, THV_FLOW_LOOP);
16,977✔
283

284
    /* process all pseudo packets the flow timeout may throw at us */
285
    TmThreadTimeoutLoop(tv, s);
16,977✔
286

287
    TmThreadsSetFlag(tv, THV_RUNNING_DONE);
16,977✔
288
    TmThreadWaitForFlag(tv, THV_DEINIT);
16,977✔
289

290
    PacketPoolDestroy();
16,977✔
291

292
    for (TmSlot *slot = s; slot != NULL; slot = slot->slot_next) {
37,595✔
293
        if (slot->SlotThreadExitPrintStats != NULL) {
20,618✔
294
            slot->SlotThreadExitPrintStats(tv, SC_ATOMIC_GET(slot->slot_data));
3,448✔
295
        }
3,448✔
296

297
        if (slot->SlotThreadDeinit != NULL) {
20,618✔
298
            TmEcode r = slot->SlotThreadDeinit(tv, SC_ATOMIC_GET(slot->slot_data));
20,618✔
299
            if (r != TM_ECODE_OK) {
20,618✔
300
                TmThreadsSetFlag(tv, THV_CLOSED);
×
301
                rc = false;
×
302
                break;
×
303
            }
×
304
        }
20,618✔
305
    }
20,618✔
306

307
    tv->stream_pq = NULL;
16,977✔
308
    TmThreadsSetFlag(tv, THV_CLOSED);
16,977✔
309
    return rc;
16,977✔
310
}
16,977✔
311

312
static void *TmThreadsSlotPktAcqLoop(void *td)
313
{
3,472✔
314
    ThreadVars *tv = (ThreadVars *)td;
3,472✔
315
    TmSlot *s = tv->tm_slots;
3,472✔
316
    TmEcode r = TM_ECODE_OK;
3,472✔
317

318
    /* check if we are setup properly */
319
    if (s == NULL || s->PktAcqLoop == NULL || tv->TmqhInFn == NULL || tv->TmqhOutFn == NULL) {
3,472✔
320
        SCLogError("TmSlot or ThreadVars badly setup: s=%p,"
×
NEW
321
                   " PktAcqLoop=%p, TmqhInFn=%p,"
×
NEW
322
                   " TmqhOutFn=%p",
×
NEW
323
                s, s ? s->PktAcqLoop : NULL, tv->TmqhInFn, tv->TmqhOutFn);
×
324
        TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE);
×
325
        pthread_exit(NULL);
×
326
        return NULL;
×
327
    }
×
328

329
    if (!TmThreadsSlotPktAcqLoopInit(td)) {
3,472✔
330
        goto error;
×
331
    }
×
332

333
    bool run = TmThreadsWaitForUnpause(tv);
3,472✔
334

335
    while (run) {
6,944✔
336
        r = s->PktAcqLoop(tv, SC_ATOMIC_GET(s->slot_data), s);
3,472✔
337

338
        if (r == TM_ECODE_FAILED) {
3,472✔
339
            TmThreadsSetFlag(tv, THV_FAILED);
×
340
            run = false;
×
341
        }
×
342
        if (TmThreadsCheckFlag(tv, THV_REQ_FLOW_LOOP) || suricata_ctl_flags) {
3,472✔
343
            run = false;
2,076✔
344
        }
2,076✔
345
        if (r == TM_ECODE_DONE) {
3,472✔
346
            run = false;
3,374✔
347
        }
3,374✔
348
    }
3,472✔
349
    if (!SCTmThreadsSlotPacketLoopFinish(tv)) {
3,472✔
350
        goto error;
×
351
    }
×
352

353
    SCLogDebug("%s ending", tv->name);
3,472✔
354
    pthread_exit((void *) 0);
3,472✔
355
    return NULL;
×
356

357
error:
×
358
    pthread_exit(NULL);
×
359
    return NULL;
×
360
}
3,472✔
361

362
/**
363
 * Also returns if the kill flag is set.
364
 */
365
bool TmThreadsWaitForUnpause(ThreadVars *tv)
366
{
29,716✔
367
    if (TmThreadsCheckFlag(tv, THV_PAUSE)) {
29,716✔
368
        TmThreadsSetFlag(tv, THV_PAUSED);
27,739✔
369

370
        while (TmThreadsCheckFlag(tv, THV_PAUSE)) {
1,539,228✔
371
            SleepUsec(100);
1,511,489✔
372

373
            if (TmThreadsCheckFlag(tv, THV_KILL))
1,511,489✔
UNCOV
374
                return false;
×
375
        }
1,511,489✔
376

377
        TmThreadsUnsetFlag(tv, THV_PAUSED);
27,739✔
378
    }
27,739✔
379

380
    return true;
29,716✔
381
}
29,716✔
382

383
static void *TmThreadsLib(void *td)
384
{
×
385
    ThreadVars *tv = (ThreadVars *)td;
×
386
    TmSlot *s = tv->tm_slots;
×
387

388
    /* check if we are setup properly */
NEW
389
    if (s == NULL || tv->TmqhInFn == NULL || tv->TmqhOutFn == NULL) {
×
NEW
390
        SCLogError("TmSlot or ThreadVars badly setup: s=%p, TmqhInFn=%p,"
×
NEW
391
                   " TmqhOutFn=%p",
×
NEW
392
                s, tv->TmqhInFn, tv->TmqhOutFn);
×
393
        TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE);
×
394
        return NULL;
×
395
    }
×
396

397
    if (!TmThreadsSlotPktAcqLoopInit(tv)) {
×
398
        goto error;
×
399
    }
×
400

401
    if (!TmThreadsWaitForUnpause(tv)) {
×
402
        goto error;
×
403
    }
×
404

405
    return NULL;
×
406

407
error:
×
408
    tv->stream_pq = NULL;
×
409
    return (void *)-1;
×
410
}
×
411

412
static void *TmThreadsSlotVar(void *td)
413
{
13,505✔
414
    ThreadVars *tv = (ThreadVars *)td;
13,505✔
415
    TmSlot *s = (TmSlot *)tv->tm_slots;
13,505✔
416
    Packet *p = NULL;
13,505✔
417
    TmEcode r = TM_ECODE_OK;
13,505✔
418

419
    CaptureStatsSetup(tv);
13,505✔
420
    PacketPoolInit();//Empty();
13,505✔
421

422
    SCSetThreadName(tv->name);
13,505✔
423

424
    if (tv->thread_setup_flags != 0)
13,505✔
425
        TmThreadSetupOptions(tv);
×
426

427
    /* Drop the capabilities for this thread */
428
    SCDropCaps(tv);
13,505✔
429

430
    /* check if we are setup properly */
431
    if (s == NULL || tv->TmqhInFn == NULL || tv->TmqhOutFn == NULL) {
13,505✔
432
        TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE);
×
433
        pthread_exit(NULL);
×
434
        return NULL;
×
435
    }
×
436

437
    for (; s != NULL; s = s->slot_next) {
27,031✔
438
        if (s->SlotThreadInit != NULL) {
13,526✔
439
            void *slot_data = NULL;
13,505✔
440
            r = s->SlotThreadInit(tv, s->slot_initdata, &slot_data);
13,505✔
441
            if (r != TM_ECODE_OK) {
13,505✔
442
                TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE);
×
443
                goto error;
×
444
            }
×
445
            (void)SC_ATOMIC_SET(s->slot_data, slot_data);
13,505✔
446
        }
13,505✔
447

448
        /* special case: we need to access the stream queue
449
         * from the flow timeout code */
450

451
        /* if the flowworker module is the first, get the threads input queue */
452
        if (s == (TmSlot *)tv->tm_slots && (s->tm_id == TMM_FLOWWORKER)) {
13,526✔
453
            tv->stream_pq = tv->inq->pq;
13,504✔
454
            tv->tm_flowworker = s;
13,504✔
455
            SCLogDebug("pre-stream packetqueue %p (inq)", tv->stream_pq);
13,504✔
456
            tv->flow_queue = FlowQueueNew();
13,504✔
457
            if (tv->flow_queue == NULL) {
13,504✔
458
                TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE);
×
459
                pthread_exit(NULL);
×
460
                return NULL;
×
461
            }
×
462
        /* setup a queue */
463
        } else if (s->tm_id == TMM_FLOWWORKER) {
13,504✔
464
            tv->stream_pq_local = SCCalloc(1, sizeof(PacketQueue));
×
465
            if (tv->stream_pq_local == NULL)
×
466
                FatalError("failed to alloc PacketQueue");
×
467
            SCMutexInit(&tv->stream_pq_local->mutex_q, NULL);
×
468
            tv->stream_pq = tv->stream_pq_local;
×
469
            tv->tm_flowworker = s;
×
470
            SCLogDebug("pre-stream packetqueue %p (local)", tv->stream_pq);
×
471
            tv->flow_queue = FlowQueueNew();
×
472
            if (tv->flow_queue == NULL) {
×
473
                TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE);
×
474
                pthread_exit(NULL);
×
475
                return NULL;
×
476
            }
×
477
        }
×
478
    }
13,526✔
479

480
    StatsSetupPrivate(&tv->stats, tv->printable_name ? tv->printable_name : tv->name);
13,505✔
481

482
    // Each 'worker' thread uses this func to process/decode the packet read.
483
    // Each decode method is different to receive methods in that they do not
484
    // enter infinite loops. They use this as the core loop. As a result, at this
485
    // point the worker threads can be considered both initialized and running.
486
    TmThreadsSetFlag(tv, THV_INIT_DONE | THV_RUNNING);
13,505✔
487
    bool run = TmThreadsWaitForUnpause(tv);
13,505✔
488

489
    s = (TmSlot *)tv->tm_slots;
13,505✔
490

491
    while (run) {
77,229✔
492
        /* get packet(s) from the queue. Can get more than one so should loop
493
         * over it. */
494
        PacketQueueNoLock q = tv->TmqhInFn(tv);
63,724✔
495
        do {
2,724,851✔
496
            p = PacketDequeueNoLock(&q);
2,724,851✔
497

498
            /* if we didn't get a packet see if we need to do some housekeeping */
499
            if (unlikely(p == NULL)) {
2,724,851✔
500
                if (tv->flow_queue && SC_ATOMIC_GET(tv->flow_queue->non_empty)) {
13,893✔
501
                    p = PacketGetFromQueueOrAlloc();
5✔
502
                    if (p != NULL) {
5✔
503
                        p->flags |= PKT_PSEUDO_STREAM_END;
5✔
504
                        PKT_SET_SRC(p, PKT_SRC_CAPTURE_TIMEOUT);
5✔
505
                    }
5✔
506
                }
5✔
507
            }
13,893✔
508

509
            if (p != NULL) {
2,724,851✔
510
                /* run the thread module(s) */
511
                r = TmThreadsSlotVarRun(tv, p, s);
2,710,306✔
512
                if (r == TM_ECODE_FAILED) {
2,710,306✔
NEW
513
                    TmqhOutputPacketpool(tv, p);
×
NEW
514
                    TmThreadsSetFlag(tv, THV_FAILED);
×
NEW
515
                    break;
×
NEW
516
                }
×
517

518
                /* output the packet */
519
                tv->TmqhOutFn(tv, p);
2,710,306✔
520

521
                /* now handle the stream pq packets */
522
                TmThreadsHandleInjectedPackets(tv);
2,710,306✔
523
            }
2,710,306✔
524

525
            if (TmThreadsCheckFlag(tv, (THV_KILL | THV_REQ_FLOW_LOOP))) {
2,724,851✔
526
                run = false;
166,604✔
527
            }
166,604✔
528
        } while (q.len != 0);
2,724,851✔
529
    }
63,724✔
530
    if (!SCTmThreadsSlotPacketLoopFinish(tv)) {
13,505✔
531
        goto error;
×
532
    }
×
533
    StatsSyncCounters(&tv->stats);
13,505✔
534

535
    pthread_exit(NULL);
13,505✔
536
    return NULL;
×
537

538
error:
×
539
    tv->stream_pq = NULL;
×
540
    pthread_exit(NULL);
×
541
    return NULL;
×
542
}
13,505✔
543

544
static void *TmThreadsManagement(void *td)
545
{
6,912✔
546
    ThreadVars *tv = (ThreadVars *)td;
6,912✔
547
    TmSlot *s = (TmSlot *)tv->tm_slots;
6,912✔
548
    TmEcode r = TM_ECODE_OK;
6,912✔
549

550
    BUG_ON(s == NULL);
6,912✔
551

552
    SCSetThreadName(tv->name);
6,912✔
553

554
    if (tv->thread_setup_flags != 0)
6,912✔
555
        TmThreadSetupOptions(tv);
50✔
556

557
    /* Drop the capabilities for this thread */
558
    SCDropCaps(tv);
6,912✔
559

560
    SCLogDebug("%s starting", tv->name);
6,912✔
561

562
    if (s->SlotThreadInit != NULL) {
6,912✔
563
        void *slot_data = NULL;
6,912✔
564
        r = s->SlotThreadInit(tv, s->slot_initdata, &slot_data);
6,912✔
565
        if (r != TM_ECODE_OK) {
6,912✔
566
            TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE);
×
567
            pthread_exit(NULL);
×
568
            return NULL;
×
569
        }
×
570
        (void)SC_ATOMIC_SET(s->slot_data, slot_data);
6,912✔
571
    }
6,912✔
572

573
    StatsSetupPrivate(&tv->stats, tv->printable_name ? tv->printable_name : tv->name);
6,912✔
574

575
    TmThreadsSetFlag(tv, THV_INIT_DONE);
6,912✔
576

577
    r = s->Management(tv, SC_ATOMIC_GET(s->slot_data));
6,912✔
578
    /* handle error */
579
    if (r == TM_ECODE_FAILED) {
6,912✔
580
        TmThreadsSetFlag(tv, THV_FAILED);
×
581
    }
×
582

583
    if (TmThreadsCheckFlag(tv, THV_KILL)) {
6,912✔
584
        StatsSyncCounters(&tv->stats);
6,872✔
585
    }
6,872✔
586

587
    TmThreadsSetFlag(tv, THV_RUNNING_DONE);
6,912✔
588
    TmThreadWaitForFlag(tv, THV_DEINIT);
6,912✔
589

590
    if (s->SlotThreadExitPrintStats != NULL) {
6,912✔
591
        s->SlotThreadExitPrintStats(tv, SC_ATOMIC_GET(s->slot_data));
×
592
    }
×
593

594
    if (s->SlotThreadDeinit != NULL) {
6,912✔
595
        r = s->SlotThreadDeinit(tv, SC_ATOMIC_GET(s->slot_data));
6,899✔
596
        if (r != TM_ECODE_OK) {
6,899✔
597
            TmThreadsSetFlag(tv, THV_CLOSED);
×
598
            pthread_exit(NULL);
×
599
            return NULL;
×
600
        }
×
601
    }
6,899✔
602

603
    TmThreadsSetFlag(tv, THV_CLOSED);
6,912✔
604
    pthread_exit((void *) 0);
6,912✔
605
    return NULL;
×
606
}
6,912✔
607

608
/**
609
 * \brief We set the slot functions.
610
 *
611
 * \param tv   Pointer to the TV to set the slot function for.
612
 * \param name Name of the slot variant.
613
 * \param fn_p Pointer to a custom slot function.  Used only if slot variant
614
 *             "name" is "custom".
615
 *
616
 * \retval TmEcode TM_ECODE_OK on success; TM_ECODE_FAILED on failure.
617
 */
618
static TmEcode TmThreadSetSlots(ThreadVars *tv, const char *name, void *(*fn_p)(void *))
619
{
29,781✔
620
    if (name == NULL) {
29,781✔
621
        if (fn_p == NULL) {
×
622
            printf("Both slot name and function pointer can't be NULL inside "
×
623
                   "TmThreadSetSlots\n");
×
624
            goto error;
×
625
        } else {
×
626
            name = "custom";
×
627
        }
×
628
    }
×
629

630
    if (strcmp(name, "varslot") == 0) {
29,781✔
631
        tv->tm_func = TmThreadsSlotVar;
13,505✔
632
    } else if (strcmp(name, "pktacqloop") == 0) {
16,276✔
633
        tv->tm_func = TmThreadsSlotPktAcqLoop;
3,472✔
634
    } else if (strcmp(name, "management") == 0) {
12,804✔
635
        tv->tm_func = TmThreadsManagement;
6,862✔
636
    } else if (strcmp(name, "command") == 0) {
6,899✔
637
        tv->tm_func = TmThreadsManagement;
50✔
638
    } else if (strcmp(name, "lib") == 0) {
5,892✔
639
        tv->tm_func = TmThreadsLib;
×
640
    } else if (strcmp(name, "custom") == 0) {
5,892✔
641
        if (fn_p == NULL)
5,892✔
642
            goto error;
×
643
        tv->tm_func = fn_p;
5,892✔
644
    } else {
5,892✔
645
        printf("Error: Slot \"%s\" not supported\n", name);
×
646
        goto error;
×
647
    }
×
648

649
    return TM_ECODE_OK;
29,781✔
650

651
error:
×
652
    return TM_ECODE_FAILED;
×
653
}
29,781✔
654

655
/**
656
 * \brief Appends a new entry to the slots.
657
 *
658
 * \param tv   TV the slot is attached to.
659
 * \param tm   TM to append.
660
 * \param data Data to be passed on to the slot init function.
661
 *
662
 * \retval The allocated TmSlot or NULL if there is an error
663
 */
664
void TmSlotSetFuncAppend(ThreadVars *tv, TmModule *tm, const void *data)
665
{
27,530✔
666
    TmSlot *slot = SCCalloc(1, sizeof(TmSlot));
27,530✔
667
    if (unlikely(slot == NULL))
27,530✔
668
        return;
×
669
    SC_ATOMIC_INITPTR(slot->slot_data);
27,530✔
670
    slot->SlotThreadInit = tm->ThreadInit;
27,530✔
671
    slot->slot_initdata = data;
27,530✔
672
    if (tm->Func) {
27,530✔
673
        slot->SlotFunc = tm->Func;
17,146✔
674
    } else if (tm->PktAcqLoop) {
17,146✔
675
        slot->PktAcqLoop = tm->PktAcqLoop;
3,472✔
676
        if (tm->PktAcqBreakLoop) {
3,472✔
677
            tv->break_loop = true;
3✔
678
        }
3✔
679
    } else if (tm->Management) {
6,920✔
680
        slot->Management = tm->Management;
6,912✔
681
    }
6,912✔
682
    slot->SlotThreadExitPrintStats = tm->ThreadExitPrintStats;
27,530✔
683
    slot->SlotThreadDeinit = tm->ThreadDeinit;
27,530✔
684
    /* we don't have to check for the return value "-1".  We wouldn't have
685
     * received a TM as arg, if it didn't exist */
686
    slot->tm_id = TmModuleGetIDForTM(tm);
27,530✔
687
    slot->tm_flags |= tm->flags;
27,530✔
688

689
    tv->tmm_flags |= tm->flags;
27,530✔
690
    tv->cap_flags |= tm->cap_flags;
27,530✔
691

692
    if (tv->tm_slots == NULL) {
27,530✔
693
        tv->tm_slots = slot;
23,889✔
694
    } else {
23,910✔
695
        TmSlot *a = (TmSlot *)tv->tm_slots, *b = NULL;
3,641✔
696

697
        /* get the last slot */
698
        for ( ; a != NULL; a = a->slot_next) {
7,486✔
699
             b = a;
3,845✔
700
        }
3,845✔
701
        /* append the new slot */
702
        if (b != NULL) {
3,641✔
703
            b->slot_next = slot;
3,641✔
704
        }
3,641✔
705
    }
3,641✔
706
}
27,530✔
707

708
#if !defined __CYGWIN__ && !defined OS_WIN32 && !defined __OpenBSD__ && !defined sun
709
static int SetCPUAffinitySet(cpu_set_t *cs)
710
{
104✔
711
#if defined OS_FREEBSD
712
    int r = cpuset_setaffinity(CPU_LEVEL_WHICH, CPU_WHICH_TID,
713
                               SCGetThreadIdLong(), sizeof(cpu_set_t),cs);
714
#elif OS_DARWIN
715
    int r = thread_policy_set(mach_thread_self(), THREAD_AFFINITY_POLICY,
716
                              (void*)cs, THREAD_AFFINITY_POLICY_COUNT);
717
#else
718
    pid_t tid = (pid_t)syscall(SYS_gettid);
104✔
719
    int r = sched_setaffinity(tid, sizeof(cpu_set_t), cs);
104✔
720
#endif /* OS_FREEBSD */
104✔
721

722
    if (r != 0) {
104✔
723
        printf("Warning: sched_setaffinity failed (%" PRId32 "): %s\n", r,
×
724
               strerror(errno));
×
725
        return -1;
×
726
    }
×
727

728
    return 0;
104✔
729
}
104✔
730
#endif
731

732

733
/**
734
 * \brief Set the thread affinity on the calling thread.
735
 *
736
 * \param cpuid Id of the core/cpu to setup the affinity.
737
 *
738
 * \retval 0 If all goes well; -1 if something is wrong.
739
 */
740
static int SetCPUAffinity(uint16_t cpuid)
741
{
24✔
742
#if defined __OpenBSD__ || defined sun
743
    return 0;
744
#else
745
    int cpu = (int)cpuid;
24✔
746

747
#if defined OS_WIN32 || defined __CYGWIN__
748
    DWORD cs = 1 << cpu;
749

750
    int r = (0 == SetThreadAffinityMask(GetCurrentThread(), cs));
751
    if (r != 0) {
752
        printf("Warning: sched_setaffinity failed (%" PRId32 "): %s\n", r,
753
               strerror(errno));
754
        return -1;
755
    }
756
    SCLogDebug("CPU Affinity for thread %lu set to CPU %" PRId32,
757
               SCGetThreadIdLong(), cpu);
758

759
    return 0;
760

761
#else
762
    cpu_set_t cs;
24✔
763
    memset(&cs, 0, sizeof(cs));
24✔
764

765
    CPU_ZERO(&cs);
24✔
766
    CPU_SET(cpu, &cs);
24✔
767
    return SetCPUAffinitySet(&cs);
24✔
768
#endif /* windows */
24✔
769
#endif /* not supported */
24✔
770
}
24✔
771

772

773
/**
774
 * \brief Set the thread options (thread priority).
775
 *
776
 * \param tv Pointer to the ThreadVars to setup the thread priority.
777
 *
778
 * \retval TM_ECODE_OK.
779
 */
780
TmEcode TmThreadSetThreadPriority(ThreadVars *tv, int prio)
781
{
×
782
    tv->thread_setup_flags |= THREAD_SET_PRIORITY;
×
783
    tv->thread_priority = prio;
×
784

785
    return TM_ECODE_OK;
×
786
}
×
787

788
/**
789
 * \brief Adjusting nice value for threads.
790
 */
791
void TmThreadSetPrio(ThreadVars *tv)
792
{
104✔
793
    SCEnter();
104✔
794
#ifndef __CYGWIN__
104✔
795
#ifdef OS_WIN32
796
        if (0 == SetThreadPriority(GetCurrentThread(), tv->thread_priority)) {
797
            SCLogError("Error setting priority for "
798
                       "thread %s: %s",
799
                    tv->name, strerror(errno));
800
    } else {
801
        SCLogDebug("Priority set to %"PRId32" for thread %s",
802
                   tv->thread_priority, tv->name);
803
    }
804
#else
805
    int ret = nice(tv->thread_priority);
104✔
806
    if (ret == -1) {
104✔
807
        SCLogError("Error setting nice value %d "
×
808
                   "for thread %s: %s",
×
809
                tv->thread_priority, tv->name, strerror(errno));
×
810
    } else {
104✔
811
        SCLogDebug("Nice value set to %"PRId32" for thread %s",
104✔
812
                   tv->thread_priority, tv->name);
104✔
813
    }
104✔
814
#endif /* OS_WIN32 */
104✔
815
#endif
104✔
816
    SCReturn;
104✔
817
}
104✔
818

819

820
/**
821
 * \brief Set the thread options (cpu affinity).
822
 *
823
 * \param tv pointer to the ThreadVars to setup the affinity.
824
 * \param cpu cpu on which affinity is set.
825
 *
826
 * \retval TM_ECODE_OK
827
 */
828
TmEcode TmThreadSetCPUAffinity(ThreadVars *tv, uint16_t cpu)
829
{
×
830
    tv->thread_setup_flags |= THREAD_SET_AFFINITY;
×
831
    tv->cpu_affinity = cpu;
×
832

833
    return TM_ECODE_OK;
×
834
}
×
835

836

837
TmEcode TmThreadSetCPU(ThreadVars *tv, uint8_t type)
838
{
29,781✔
839
    if (!threading_set_cpu_affinity)
29,781✔
840
        return TM_ECODE_OK;
29,677✔
841

842
    if (type > MAX_CPU_SET) {
104✔
843
        SCLogError("invalid cpu type family");
×
844
        return TM_ECODE_FAILED;
×
845
    }
×
846

847
    tv->thread_setup_flags |= THREAD_SET_AFFTYPE;
104✔
848
    tv->cpu_affinity = type;
104✔
849

850
    return TM_ECODE_OK;
104✔
851
}
104✔
852

853
int TmThreadGetNbThreads(uint8_t type)
854
{
3,397✔
855
    if (type >= MAX_CPU_SET) {
3,397✔
856
        SCLogError("invalid cpu type family");
×
857
        return 0;
×
858
    }
×
859

860
    return thread_affinity[type].nb_threads;
3,397✔
861
}
3,397✔
862

863
/**
864
 * \brief Set the thread options (cpu affinitythread).
865
 *        Priority should be already set by pthread_create.
866
 *
867
 * \param tv pointer to the ThreadVars of the calling thread.
868
 */
869
TmEcode TmThreadSetupOptions(ThreadVars *tv)
870
{
104✔
871
    if (tv->thread_setup_flags & THREAD_SET_AFFINITY) {
104✔
872
        SCLogPerf("Setting affinity for thread \"%s\"to cpu/core "
×
873
                  "%"PRIu16", thread id %lu", tv->name, tv->cpu_affinity,
×
874
                  SCGetThreadIdLong());
×
875
        SetCPUAffinity(tv->cpu_affinity);
×
876
    }
×
877

878
#if !defined __CYGWIN__ && !defined OS_WIN32 && !defined __OpenBSD__ && !defined sun
104✔
879
    if (tv->thread_setup_flags & THREAD_SET_PRIORITY)
104✔
880
        TmThreadSetPrio(tv);
×
881
    if (tv->thread_setup_flags & THREAD_SET_AFFTYPE) {
104✔
882
        ThreadsAffinityType *taf = &thread_affinity[tv->cpu_affinity];
104✔
883
        bool use_iface_affinity = RunmodeIsAutofp() && tv->cpu_affinity == RECEIVE_CPU_SET &&
104✔
884
                                  FindAffinityByInterface(taf, tv->iface_name) != NULL;
104✔
885
        use_iface_affinity |= RunmodeIsWorkers() && tv->cpu_affinity == WORKER_CPU_SET &&
104✔
886
                              FindAffinityByInterface(taf, tv->iface_name) != NULL;
104✔
887

888
        if (use_iface_affinity) {
104✔
889
            taf = FindAffinityByInterface(taf, tv->iface_name);
×
890
        }
×
891

892
        if (UtilAffinityGetAffinedCPUNum(taf) == 0) {
104✔
893
            if (!taf->nocpu_warned) {
×
894
                SCLogWarning("No CPU affinity set for %s", AffinityGetYamlPath(taf));
×
895
                taf->nocpu_warned = true;
×
896
            }
×
897
        }
×
898

899
        if (taf->mode_flag == EXCLUSIVE_AFFINITY) {
104✔
900
            uint16_t cpu = AffinityGetNextCPU(tv, taf);
24✔
901
            SetCPUAffinity(cpu);
24✔
902
            /* If CPU is in a set overwrite the default thread prio */
903
            if (CPU_ISSET(cpu, &taf->lowprio_cpu)) {
24✔
904
                tv->thread_priority = PRIO_LOW;
×
905
            } else if (CPU_ISSET(cpu, &taf->medprio_cpu)) {
24✔
906
                tv->thread_priority = PRIO_MEDIUM;
×
907
            } else if (CPU_ISSET(cpu, &taf->hiprio_cpu)) {
24✔
908
                tv->thread_priority = PRIO_HIGH;
×
909
            } else {
24✔
910
                tv->thread_priority = taf->prio;
24✔
911
            }
24✔
912
            SCLogPerf("Setting prio %d for thread \"%s\" to cpu/core "
24✔
913
                      "%d, thread id %lu", tv->thread_priority,
24✔
914
                      tv->name, cpu, SCGetThreadIdLong());
24✔
915
        } else {
80✔
916
            SetCPUAffinitySet(&taf->cpu_set);
80✔
917
            tv->thread_priority = taf->prio;
80✔
918
            SCLogPerf("Setting prio %d for thread \"%s\", "
80✔
919
                      "thread id %lu", tv->thread_priority,
80✔
920
                      tv->name, SCGetThreadIdLong());
80✔
921
        }
80✔
922
        TmThreadSetPrio(tv);
104✔
923
    }
104✔
924
#endif
104✔
925

926
    return TM_ECODE_OK;
104✔
927
}
104✔
928

929
/**
930
 * \brief Creates and returns the TV instance for a new thread.
931
 *
932
 * \param name       Name of this TV instance
933
 * \param inq_name   Incoming queue name
934
 * \param inqh_name  Incoming queue handler name as set by TmqhSetup()
935
 * \param outq_name  Outgoing queue name
936
 * \param outqh_name Outgoing queue handler as set by TmqhSetup()
937
 * \param slots      String representation for the slot function to be used
938
 * \param fn_p       Pointer to function when \"slots\" is of type \"custom\"
939
 * \param mucond     Flag to indicate whether to initialize the condition
940
 *                   and the mutex variables for this newly created TV.
941
 *
942
 * \retval the newly created TV instance, or NULL on error
943
 */
944
ThreadVars *TmThreadCreate(const char *name, const char *inq_name, const char *inqh_name,
945
                           const char *outq_name, const char *outqh_name, const char *slots,
946
                           void * (*fn_p)(void *), int mucond)
947
{
29,781✔
948
    ThreadVars *tv = NULL;
29,781✔
949
    Tmq *tmq = NULL;
29,781✔
950
    Tmqh *tmqh = NULL;
29,781✔
951

952
    SCLogDebug("creating thread \"%s\"...", name);
29,781✔
953

954
    /* XXX create separate function for this: allocate a thread container */
955
    tv = SCCalloc(1, sizeof(ThreadVars) + ThreadStorageSize());
29,781✔
956
    if (unlikely(tv == NULL))
29,781✔
957
        goto error;
×
958

959
    SC_ATOMIC_INIT(tv->flags);
29,781✔
960
    StatsThreadInit(&tv->stats);
29,781✔
961

962
    strlcpy(tv->name, name, sizeof(tv->name));
29,781✔
963

964
    /* default state for every newly created thread */
965
    TmThreadsSetFlag(tv, THV_PAUSE);
29,781✔
966

967
    /* set the incoming queue */
968
    if (inq_name != NULL && strcmp(inq_name, "packetpool") != 0) {
29,781✔
969
        SCLogDebug("inq_name \"%s\"", inq_name);
13,505✔
970

971
        tmq = TmqGetQueueByName(inq_name);
13,505✔
972
        if (tmq == NULL) {
13,505✔
973
            tmq = TmqCreateQueue(inq_name);
×
974
            if (tmq == NULL)
×
975
                goto error;
×
976
        }
×
977
        SCLogDebug("tmq %p", tmq);
13,505✔
978

979
        tv->inq = tmq;
13,505✔
980
        tv->inq->reader_cnt++;
13,505✔
981
        SCLogDebug("tv->inq %p", tv->inq);
13,505✔
982
    }
13,505✔
983
    if (inqh_name != NULL) {
29,781✔
984
        SCLogDebug("inqh_name \"%s\"", inqh_name);
16,977✔
985

986
        int id = TmqhNameToID(inqh_name);
16,977✔
987
        if (id <= 0) {
16,977✔
988
            goto error;
×
989
        }
×
990
        tmqh = TmqhGetQueueHandlerByName(inqh_name);
16,977✔
991
        if (tmqh == NULL)
16,977✔
992
            goto error;
×
993

994
        tv->TmqhInFn = tmqh->InHandler;
16,977✔
995
        tv->inq_id = (uint8_t)id;
16,977✔
996
        SCLogDebug("tv->TmqhInFn %p", tv->TmqhInFn);
16,977✔
997
    }
16,977✔
998

999
    /* set the outgoing queue */
1000
    if (outqh_name != NULL) {
29,781✔
1001
        SCLogDebug("outqh_name \"%s\"", outqh_name);
16,977✔
1002

1003
        int id = TmqhNameToID(outqh_name);
16,977✔
1004
        if (id <= 0) {
16,977✔
1005
            goto error;
×
1006
        }
×
1007

1008
        tmqh = TmqhGetQueueHandlerByName(outqh_name);
16,977✔
1009
        if (tmqh == NULL)
16,977✔
1010
            goto error;
×
1011

1012
        tv->TmqhOutFn = tmqh->OutHandler;
16,977✔
1013
        tv->outq_id = (uint8_t)id;
16,977✔
1014

1015
        if (outq_name != NULL && strcmp(outq_name, "packetpool") != 0) {
16,977✔
1016
            SCLogDebug("outq_name \"%s\"", outq_name);
3,383✔
1017

1018
            if (tmqh->OutHandlerCtxSetup != NULL) {
3,383✔
1019
                tv->outctx = tmqh->OutHandlerCtxSetup(outq_name);
3,379✔
1020
                if (tv->outctx == NULL)
3,379✔
1021
                    goto error;
×
1022
                tv->outq = NULL;
3,379✔
1023
            } else {
3,379✔
1024
                tmq = TmqGetQueueByName(outq_name);
4✔
1025
                if (tmq == NULL) {
4✔
1026
                    tmq = TmqCreateQueue(outq_name);
1✔
1027
                    if (tmq == NULL)
1✔
1028
                        goto error;
×
1029
                }
1✔
1030
                SCLogDebug("tmq %p", tmq);
4✔
1031

1032
                tv->outq = tmq;
4✔
1033
                tv->outctx = NULL;
4✔
1034
                tv->outq->writer_cnt++;
4✔
1035
            }
4✔
1036
        }
3,383✔
1037
    }
16,977✔
1038

1039
    if (TmThreadSetSlots(tv, slots, fn_p) != TM_ECODE_OK) {
29,781✔
1040
        goto error;
×
1041
    }
×
1042

1043
    if (mucond != 0)
29,781✔
1044
        TmThreadInitMC(tv);
5,907✔
1045

1046
    SCThreadRunInitCallbacks(tv);
29,781✔
1047

1048
    return tv;
29,781✔
1049

1050
error:
×
1051
    SCLogError("failed to setup a thread");
×
1052

1053
    if (tv != NULL)
×
1054
        SCFree(tv);
×
1055
    return NULL;
×
1056
}
29,781✔
1057

1058
/**
1059
 * \brief Creates and returns a TV instance for a Packet Processing Thread.
1060
 *        This function doesn't support custom slots, and hence shouldn't be
1061
 *        supplied \"custom\" as its slot type.  All PPT threads are created
1062
 *        with a mucond(see TmThreadCreate declaration) of 0. Hence the tv
1063
 *        conditional variables are not used to kill the thread.
1064
 *
1065
 * \param name       Name of this TV instance
1066
 * \param inq_name   Incoming queue name
1067
 * \param inqh_name  Incoming queue handler name as set by TmqhSetup()
1068
 * \param outq_name  Outgoing queue name
1069
 * \param outqh_name Outgoing queue handler as set by TmqhSetup()
1070
 * \param slots      String representation for the slot function to be used
1071
 *
1072
 * \retval the newly created TV instance, or NULL on error
1073
 */
1074
ThreadVars *TmThreadCreatePacketHandler(const char *name, const char *inq_name,
1075
                                        const char *inqh_name, const char *outq_name,
1076
                                        const char *outqh_name, const char *slots)
1077
{
16,977✔
1078
    ThreadVars *tv = NULL;
16,977✔
1079

1080
    tv = TmThreadCreate(name, inq_name, inqh_name, outq_name, outqh_name,
16,977✔
1081
                        slots, NULL, 0);
16,977✔
1082

1083
    if (tv != NULL) {
16,977✔
1084
        tv->type = TVT_PPT;
16,977✔
1085
        tv->id = TmThreadsRegisterThread(tv, tv->type);
16,977✔
1086
    }
16,977✔
1087

1088
    return tv;
16,977✔
1089
}
16,977✔
1090

1091
/**
1092
 * \brief Creates and returns the TV instance for a Management thread(MGMT).
1093
 *        This function supports only custom slot functions and hence a
1094
 *        function pointer should be sent as an argument.
1095
 *
1096
 * \param name       Name of this TV instance
1097
 * \param fn_p       Pointer to function when \"slots\" is of type \"custom\"
1098
 * \param mucond     Flag to indicate whether to initialize the condition
1099
 *                   and the mutex variables for this newly created TV.
1100
 *
1101
 * \retval the newly created TV instance, or NULL on error
1102
 */
1103
ThreadVars *TmThreadCreateMgmtThread(const char *name, void *(fn_p)(void *),
1104
                                     int mucond)
1105
{
5,892✔
1106
    ThreadVars *tv = NULL;
5,892✔
1107

1108
    tv = TmThreadCreate(name, NULL, NULL, NULL, NULL, "custom", fn_p, mucond);
5,892✔
1109

1110
    if (tv != NULL) {
5,892✔
1111
        tv->type = TVT_MGMT;
5,892✔
1112
        tv->id = TmThreadsRegisterThread(tv, tv->type);
5,892✔
1113
        TmThreadSetCPU(tv, MANAGEMENT_CPU_SET);
5,892✔
1114
    }
5,892✔
1115

1116
    return tv;
5,892✔
1117
}
5,892✔
1118

1119
/**
1120
 * \brief Creates and returns the TV instance for a Management thread(MGMT).
1121
 *        This function supports only custom slot functions and hence a
1122
 *        function pointer should be sent as an argument.
1123
 *
1124
 * \param name       Name of this TV instance
1125
 * \param module     Name of TmModule with MANAGEMENT flag set.
1126
 * \param mucond     Flag to indicate whether to initialize the condition
1127
 *                   and the mutex variables for this newly created TV.
1128
 *
1129
 * \retval the newly created TV instance, or NULL on error
1130
 */
1131
ThreadVars *TmThreadCreateMgmtThreadByName(const char *name, const char *module,
1132
                                     int mucond)
1133
{
6,862✔
1134
    ThreadVars *tv = NULL;
6,862✔
1135

1136
    tv = TmThreadCreate(name, NULL, NULL, NULL, NULL, "management", NULL, mucond);
6,862✔
1137

1138
    if (tv != NULL) {
6,862✔
1139
        tv->type = TVT_MGMT;
6,862✔
1140
        tv->id = TmThreadsRegisterThread(tv, tv->type);
6,862✔
1141
        TmThreadSetCPU(tv, MANAGEMENT_CPU_SET);
6,862✔
1142

1143
        TmModule *m = TmModuleGetByName(module);
6,862✔
1144
        if (m) {
6,862✔
1145
            TmSlotSetFuncAppend(tv, m, NULL);
6,862✔
1146
        }
6,862✔
1147
    }
6,862✔
1148

1149
    return tv;
6,862✔
1150
}
6,862✔
1151

1152
/**
1153
 * \brief Creates and returns the TV instance for a Command thread (CMD).
1154
 *        This function supports only custom slot functions and hence a
1155
 *        function pointer should be sent as an argument.
1156
 *
1157
 * \param name       Name of this TV instance
1158
 * \param module     Name of TmModule with COMMAND flag set.
1159
 * \param mucond     Flag to indicate whether to initialize the condition
1160
 *                   and the mutex variables for this newly created TV.
1161
 *
1162
 * \retval the newly created TV instance, or NULL on error
1163
 */
1164
ThreadVars *TmThreadCreateCmdThreadByName(const char *name, const char *module,
1165
                                     int mucond)
1166
{
50✔
1167
    ThreadVars *tv = NULL;
50✔
1168

1169
    tv = TmThreadCreate(name, NULL, NULL, NULL, NULL, "command", NULL, mucond);
50✔
1170

1171
    if (tv != NULL) {
50✔
1172
        tv->type = TVT_CMD;
50✔
1173
        tv->id = TmThreadsRegisterThread(tv, tv->type);
50✔
1174
        TmThreadSetCPU(tv, MANAGEMENT_CPU_SET);
50✔
1175

1176
        TmModule *m = TmModuleGetByName(module);
50✔
1177
        if (m) {
50✔
1178
            TmSlotSetFuncAppend(tv, m, NULL);
50✔
1179
        }
50✔
1180
    }
50✔
1181

1182
    return tv;
50✔
1183
}
50✔
1184

1185
/**
1186
 * \brief Appends this TV to tv_root based on its type
1187
 *
1188
 * \param type holds the type this TV belongs to.
1189
 */
1190
void TmThreadAppend(ThreadVars *tv, int type)
1191
{
29,781✔
1192
    SCMutexLock(&tv_root_lock);
29,781✔
1193

1194
    if (tv_root[type] == NULL) {
29,781✔
1195
        tv_root[type] = tv;
6,900✔
1196
        tv->next = NULL;
6,900✔
1197

1198
        SCMutexUnlock(&tv_root_lock);
6,900✔
1199

1200
        return;
6,900✔
1201
    }
6,900✔
1202

1203
    ThreadVars *t = tv_root[type];
22,881✔
1204

1205
    while (t) {
52,176✔
1206
        if (t->next == NULL) {
52,176✔
1207
            t->next = tv;
22,881✔
1208
            tv->next = NULL;
22,881✔
1209
            break;
22,881✔
1210
        }
22,881✔
1211

1212
        t = t->next;
29,295✔
1213
    }
29,295✔
1214

1215
    SCMutexUnlock(&tv_root_lock);
22,881✔
1216
}
22,881✔
1217

1218
static bool ThreadStillHasPackets(ThreadVars *tv)
1219
{
79,084✔
1220
    if (tv->inq != NULL && !tv->inq->is_packet_pool) {
79,084✔
1221
        /* we wait till we dry out all the inq packets, before we
1222
         * kill this thread.  Do note that you should have disabled
1223
         * packet acquire by now using TmThreadDisableReceiveThreads()*/
1224
        PacketQueue *q = tv->inq->pq;
58,129✔
1225
        SCMutexLock(&q->mutex_q);
58,129✔
1226
        uint32_t len = q->len;
58,129✔
1227
        SCMutexUnlock(&q->mutex_q);
58,129✔
1228
        if (len != 0) {
58,129✔
1229
            return true;
5,120✔
1230
        }
5,120✔
1231
    }
58,129✔
1232

1233
    if (tv->stream_pq != NULL) {
73,964✔
1234
        SCMutexLock(&tv->stream_pq->mutex_q);
53,710✔
1235
        uint32_t len = tv->stream_pq->len;
53,710✔
1236
        SCMutexUnlock(&tv->stream_pq->mutex_q);
53,710✔
1237

1238
        if (len != 0) {
53,710✔
1239
            return true;
×
1240
        }
×
1241
    }
53,710✔
1242
    return false;
73,964✔
1243
}
73,964✔
1244

1245
/**
1246
 * \brief Kill a thread.
1247
 *
1248
 * \param tv A ThreadVars instance corresponding to the thread that has to be
1249
 *           killed.
1250
 *
1251
 * \retval r 1 killed successfully
1252
 *           0 not yet ready, needs another look
1253
 */
1254
static int TmThreadKillThread(ThreadVars *tv)
1255
{
387,594✔
1256
    BUG_ON(tv == NULL);
387,594✔
1257

1258
    /* kill only once :) */
1259
    if (TmThreadsCheckFlag(tv, THV_DEAD)) {
387,594✔
1260
        return 1;
239,370✔
1261
    }
239,370✔
1262

1263
    /* set the thread flag informing the thread that it needs to be
1264
     * terminated */
1265
    TmThreadsSetFlag(tv, THV_KILL);
148,224✔
1266
    TmThreadsSetFlag(tv, THV_DEINIT);
148,224✔
1267

1268
    /* to be sure, signal more */
1269
    if (!(TmThreadsCheckFlag(tv, THV_CLOSED))) {
148,224✔
1270
        if (tv->inq_id != TMQH_NOT_SET) {
118,456✔
1271
            Tmqh *qh = TmqhGetQueueHandlerByID(tv->inq_id);
87,260✔
1272
            if (qh != NULL && qh->InShutdownHandler != NULL) {
87,260✔
1273
                qh->InShutdownHandler(tv);
4✔
1274
            }
4✔
1275
        }
87,260✔
1276
        if (tv->inq != NULL) {
118,456✔
1277
            for (int i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) {
232,173✔
1278
                SCMutexLock(&tv->inq->pq->mutex_q);
154,836✔
1279
                SCCondSignal(&tv->inq->pq->cond_q);
154,836✔
1280
                SCMutexUnlock(&tv->inq->pq->mutex_q);
154,836✔
1281
            }
154,836✔
1282
            SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id);
77,337✔
1283
        }
77,337✔
1284

1285
        if (tv->ctrl_cond != NULL ) {
118,456✔
1286
            SCCtrlMutexLock(tv->ctrl_mutex);
17,521✔
1287
            pthread_cond_broadcast(tv->ctrl_cond);
17,521✔
1288
            SCCtrlMutexUnlock(tv->ctrl_mutex);
17,521✔
1289
        }
17,521✔
1290
        return 0;
118,456✔
1291
    }
118,456✔
1292

1293
    if (tv->outctx != NULL) {
29,768✔
1294
        if (tv->outq_id != TMQH_NOT_SET) {
3,379✔
1295
            Tmqh *qh = TmqhGetQueueHandlerByID(tv->outq_id);
3,379✔
1296
            if (qh != NULL && qh->OutHandlerCtxFree != NULL) {
3,379✔
1297
                qh->OutHandlerCtxFree(tv->outctx);
3,379✔
1298
                tv->outctx = NULL;
3,379✔
1299
            }
3,379✔
1300
        }
3,379✔
1301
    }
3,379✔
1302

1303
    /* Join the thread and flag as dead, unless the thread ID is 0 as
1304
     * its not a thread created by Suricata. */
1305
    if (tv->t) {
29,768✔
1306
        pthread_join(tv->t, NULL);
29,768✔
1307
        SCLogDebug("thread %s stopped", tv->name);
29,768✔
1308
    }
29,768✔
1309
    TmThreadsSetFlag(tv, THV_DEAD);
29,768✔
1310
    return 1;
29,768✔
1311
}
148,224✔
1312

1313
static bool ThreadBusy(ThreadVars *tv)
1314
{
73,964✔
1315
    for (TmSlot *s = tv->tm_slots; s != NULL; s = s->slot_next) {
169,065✔
1316
        TmModule *tm = TmModuleGetById(s->tm_id);
96,512✔
1317
        if (tm && tm->ThreadBusy != NULL) {
96,512✔
1318
            if (tm->ThreadBusy(tv, SC_ATOMIC_GET(s->slot_data)))
53,710✔
1319
                return true;
1,411✔
1320
        }
53,710✔
1321
    }
96,512✔
1322
    return false;
72,553✔
1323
}
73,964✔
1324

1325
/** \internal
1326
 *
1327
 *  \brief make sure that all packet threads are done processing their
1328
 *         in-flight packets, including 'injected' flow packets.
1329
 */
1330
static void TmThreadDrainPacketThreads(void)
1331
{
10,293✔
1332
    ThreadVars *tv = NULL;
10,293✔
1333
    struct timeval start_ts;
10,293✔
1334
    struct timeval cur_ts;
10,293✔
1335
    gettimeofday(&start_ts, NULL);
10,293✔
1336

1337
again:
16,824✔
1338
    gettimeofday(&cur_ts, NULL);
16,824✔
1339
    if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
16,824✔
1340
        SCLogWarning("unable to get all packet threads "
×
1341
                     "to process their packets in time");
×
1342
        return;
×
1343
    }
×
1344

1345
    SCMutexLock(&tv_root_lock);
16,824✔
1346

1347
    /* all receive threads are part of packet processing threads */
1348
    tv = tv_root[TVT_PPT];
16,824✔
1349
    while (tv) {
85,369✔
1350
        if (ThreadStillHasPackets(tv)) {
75,076✔
1351
            /* we wait till we dry out all the inq packets, before we
1352
             * kill this thread.  Do note that you should have disabled
1353
             * packet acquire by now using TmThreadDisableReceiveThreads()*/
1354
            SCMutexUnlock(&tv_root_lock);
5,120✔
1355

1356
            /* sleep outside lock */
1357
            SleepMsec(1);
5,120✔
1358
            goto again;
5,120✔
1359
        }
5,120✔
1360
        if (ThreadBusy(tv)) {
69,956✔
1361
            SCMutexUnlock(&tv_root_lock);
1,411✔
1362

1363
            Packet *p = PacketGetFromAlloc();
1,411✔
1364
            if (p != NULL) {
1,411✔
1365
                p->flags |= PKT_PSEUDO_STREAM_END;
1,411✔
1366
                PKT_SET_SRC(p, PKT_SRC_SHUTDOWN_FLUSH);
1,411✔
1367
                CaptureHooksOnPseudoPacketCreated(p);
1,411✔
1368
                PacketQueue *q = tv->stream_pq;
1,411✔
1369
                SCMutexLock(&q->mutex_q);
1,411✔
1370
                PacketEnqueue(q, p);
1,411✔
1371
                SCCondSignal(&q->cond_q);
1,411✔
1372
                SCMutexUnlock(&q->mutex_q);
1,411✔
1373
            }
1,411✔
1374

1375
            /* don't sleep while holding a lock */
1376
            SleepMsec(1);
1,411✔
1377
            goto again;
1,411✔
1378
        }
1,411✔
1379
        tv = tv->next;
68,545✔
1380
    }
68,545✔
1381

1382
    SCMutexUnlock(&tv_root_lock);
10,293✔
1383
}
10,293✔
1384

1385
/**
1386
 *  \brief Disable all threads having the specified TMs.
1387
 *
1388
 *  Breaks out of the packet acquisition loop, and bumps
1389
 *  into the 'flow loop', where it will process packets
1390
 *  from the flow engine's shutdown handling.
1391
 */
1392
void TmThreadDisableReceiveThreads(void)
1393
{
3,431✔
1394
    ThreadVars *tv = NULL;
3,431✔
1395
    struct timeval start_ts;
3,431✔
1396
    struct timeval cur_ts;
3,431✔
1397
    gettimeofday(&start_ts, NULL);
3,431✔
1398

1399
again:
3,967✔
1400
    gettimeofday(&cur_ts, NULL);
3,967✔
1401
    if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
3,967✔
1402
        FatalError("Engine unable to disable detect "
×
1403
                   "thread - \"%s\". Killing engine",
×
1404
                tv->name);
×
1405
    }
×
1406

1407
    SCMutexLock(&tv_root_lock);
3,967✔
1408

1409
    /* all receive threads are part of packet processing threads */
1410
    tv = tv_root[TVT_PPT];
3,967✔
1411

1412
    /* we do have to keep in mind that TVs are arranged in the order
1413
     * right from receive to log.  The moment we fail to find a
1414
     * receive TM amongst the slots in a tv, it indicates we are done
1415
     * with all receive threads */
1416
    while (tv) {
20,944✔
1417
        int disable = 0;
17,513✔
1418
        TmModule *tm = NULL;
17,513✔
1419
        /* obtain the slots for this TV */
1420
        TmSlot *slots = tv->tm_slots;
17,513✔
1421
        while (slots != NULL) {
31,039✔
1422
            tm = TmModuleGetById(slots->tm_id);
17,534✔
1423

1424
            if (tm->flags & TM_FLAG_RECEIVE_TM) {
17,534✔
1425
                disable = 1;
4,008✔
1426
                break;
4,008✔
1427
            }
4,008✔
1428

1429
            slots = slots->slot_next;
13,526✔
1430
            continue;
13,526✔
1431
        }
17,534✔
1432

1433
        if (disable) {
17,513✔
1434
            if (ThreadStillHasPackets(tv)) {
4,008✔
1435
                /* we wait till we dry out all the inq packets, before we
1436
                 * kill this thread.  Do note that you should have disabled
1437
                 * packet acquire by now using TmThreadDisableReceiveThreads()*/
1438
                SCMutexUnlock(&tv_root_lock);
×
1439
                /* don't sleep while holding a lock */
1440
                SleepMsec(1);
×
1441
                goto again;
×
1442
            }
×
1443

1444
            if (ThreadBusy(tv)) {
4,008✔
UNCOV
1445
                SCMutexUnlock(&tv_root_lock);
×
1446

UNCOV
1447
                Packet *p = PacketGetFromAlloc();
×
UNCOV
1448
                if (p != NULL) {
×
UNCOV
1449
                    p->flags |= PKT_PSEUDO_STREAM_END;
×
UNCOV
1450
                    PKT_SET_SRC(p, PKT_SRC_SHUTDOWN_FLUSH);
×
UNCOV
1451
                    CaptureHooksOnPseudoPacketCreated(p);
×
UNCOV
1452
                    PacketQueue *q = tv->stream_pq;
×
UNCOV
1453
                    SCMutexLock(&q->mutex_q);
×
UNCOV
1454
                    PacketEnqueue(q, p);
×
UNCOV
1455
                    SCCondSignal(&q->cond_q);
×
UNCOV
1456
                    SCMutexUnlock(&q->mutex_q);
×
UNCOV
1457
                }
×
1458

1459
                /* don't sleep while holding a lock */
UNCOV
1460
                SleepMsec(1);
×
UNCOV
1461
                goto again;
×
UNCOV
1462
            }
×
1463

1464
            /* we found a receive TV. Send it a KILL_PKTACQ signal. */
1465
            if (tm && tm->PktAcqBreakLoop != NULL) {
4,008✔
1466
                tm->PktAcqBreakLoop(tv, SC_ATOMIC_GET(slots->slot_data));
3✔
1467
            }
3✔
1468
            TmThreadsSetFlag(tv, THV_REQ_FLOW_LOOP);
4,008✔
1469

1470
            if (tv->inq != NULL) {
4,008✔
1471
                for (int i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) {
×
1472
                    SCMutexLock(&tv->inq->pq->mutex_q);
×
1473
                    SCCondSignal(&tv->inq->pq->cond_q);
×
1474
                    SCMutexUnlock(&tv->inq->pq->mutex_q);
×
1475
                }
×
1476
                SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id);
×
1477
            }
×
1478

1479
            /* wait for it to enter the 'flow loop' stage */
1480
            while (!TmThreadsCheckFlag(tv, THV_FLOW_LOOP)) {
4,008✔
1481
                SCMutexUnlock(&tv_root_lock);
536✔
1482

1483
                SleepMsec(1);
536✔
1484
                goto again;
536✔
1485
            }
536✔
1486
        }
4,008✔
1487

1488
        tv = tv->next;
16,977✔
1489
    }
16,977✔
1490

1491
    SCMutexUnlock(&tv_root_lock);
3,431✔
1492

1493
    /* finally wait for all packet threads to have
1494
     * processed all of their 'live' packets so we
1495
     * don't process the last live packets together
1496
     * with FFR packets */
1497
    TmThreadDrainPacketThreads();
3,431✔
1498
}
3,431✔
1499

1500
#ifdef DEBUG_VALIDATION
1501
static void TmThreadDumpThreads(void);
1502
#endif
1503

1504
static void TmThreadDebugValidateNoMorePackets(void)
1505
{
6,862✔
1506
#ifdef DEBUG_VALIDATION
1507
    SCMutexLock(&tv_root_lock);
1508
    for (ThreadVars *tv = tv_root[TVT_PPT]; tv != NULL; tv = tv->next) {
1509
        if (ThreadStillHasPackets(tv)) {
1510
            SCMutexUnlock(&tv_root_lock);
1511
            TmThreadDumpThreads();
1512
            DEBUG_VALIDATE_BUG_ON(1);
1513
        }
1514
    }
1515
    SCMutexUnlock(&tv_root_lock);
1516
#endif
1517
}
6,862✔
1518

1519
/** \internal
1520
 *  \brief check if a thread has any of the modules indicated by TM_FLAG_*
1521
 *  \param tv thread
1522
 *  \param flags TM_FLAG_*'s
1523
 *  \retval bool true if at least on of the flags is present */
1524
static inline bool CheckModuleFlags(const ThreadVars *tv, const uint8_t flags)
1525
{
181,759✔
1526
    return (tv->tmm_flags & flags) != 0;
181,759✔
1527
}
181,759✔
1528

1529
/**
1530
 * \brief Disable all packet threads
1531
 * \param set flag to set
1532
 * \param check flag to check
1533
 * \param module_flags bitflags of TmModule's to apply the `set` flag to.
1534
 *
1535
 * Support 2 stages in shutting down the packet threads:
1536
 * 1. set THV_REQ_FLOW_LOOP and wait for THV_FLOW_LOOP
1537
 * 2. set THV_KILL and wait for THV_RUNNING_DONE
1538
 *
1539
 * During step 1 the main loop is exited, and the flow loop logic is entered.
1540
 * During step 2, the flow loop logic is done and the thread closes.
1541
 *
1542
 * `module_flags` limits which threads are disabled
1543
 */
1544
void TmThreadDisablePacketThreads(
1545
        const uint16_t set, const uint16_t check, const uint8_t module_flags)
1546
{
6,862✔
1547
    struct timeval start_ts;
6,862✔
1548
    struct timeval cur_ts;
6,862✔
1549

1550
    /* first drain all packet threads of their packets */
1551
    TmThreadDrainPacketThreads();
6,862✔
1552

1553
    /* since all the threads possibly able to produce more packets
1554
     * are now gone or inactive, we should see no packets anywhere
1555
     * anymore. */
1556
    TmThreadDebugValidateNoMorePackets();
6,862✔
1557

1558
    gettimeofday(&start_ts, NULL);
6,862✔
1559
again:
45,603✔
1560
    gettimeofday(&cur_ts, NULL);
45,603✔
1561
    if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
45,603✔
1562
        FatalError("Engine unable to disable packet  "
×
1563
                   "threads. Killing engine");
×
1564
    }
×
1565

1566
    /* loop through the packet threads and kill them */
1567
    SCMutexLock(&tv_root_lock);
45,603✔
1568
    for (ThreadVars *tv = tv_root[TVT_PPT]; tv != NULL; tv = tv->next) {
188,621✔
1569
        /* only set flow worker threads to THV_REQ_FLOW_LOOP */
1570
        if (!CheckModuleFlags(tv, module_flags)) {
181,759✔
1571
            SCLogDebug("%s does not have any of the modules %02x, skip", tv->name, module_flags);
1✔
1572
            continue;
1✔
1573
        }
1✔
1574
        TmThreadsSetFlag(tv, set);
181,758✔
1575

1576
        /* separate worker threads (autofp) will still wait at their
1577
         * input queues. So nudge them here so they will observe the
1578
         * THV_KILL flag. */
1579
        if (tv->inq != NULL) {
181,758✔
1580
            for (int i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) {
407,961✔
1581
                SCMutexLock(&tv->inq->pq->mutex_q);
272,060✔
1582
                SCCondSignal(&tv->inq->pq->cond_q);
272,060✔
1583
                SCMutexUnlock(&tv->inq->pq->mutex_q);
272,060✔
1584
            }
272,060✔
1585
            SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id);
135,901✔
1586
        }
135,901✔
1587

1588
        /* wait for it to reach the expected state */
1589
        if (!TmThreadsCheckFlag(tv, check)) {
181,758✔
1590
            SCMutexUnlock(&tv_root_lock);
38,741✔
1591
            SCLogDebug("%s did not reach state %u, again", tv->name, check);
38,741✔
1592

1593
            SleepMsec(1);
38,741✔
1594
            goto again;
38,741✔
1595
        }
38,741✔
1596
    }
181,758✔
1597
    SCMutexUnlock(&tv_root_lock);
6,862✔
1598
}
6,862✔
1599

1600
#define MIN_WAIT_TIME 100
282,111✔
1601
#define MAX_WAIT_TIME 999999
1602
void TmThreadKillThreadsFamily(int family)
1603
{
12,973✔
1604
    ThreadVars *tv = NULL;
12,973✔
1605
    unsigned int sleep_usec = MIN_WAIT_TIME;
12,973✔
1606

1607
    BUG_ON((family < 0) || (family >= TVT_MAX));
12,973✔
1608

1609
again:
131,429✔
1610
    SCMutexLock(&tv_root_lock);
131,429✔
1611
    tv = tv_root[family];
131,429✔
1612

1613
    while (tv) {
400,567✔
1614
        int r = TmThreadKillThread(tv);
387,594✔
1615
        if (r == 0) {
387,594✔
1616
            SCMutexUnlock(&tv_root_lock);
118,456✔
1617
            SleepUsec(sleep_usec);
118,456✔
1618
            sleep_usec *= 2; /* slowly back off */
118,456✔
1619
            sleep_usec = MIN(sleep_usec, MAX_WAIT_TIME);
118,456✔
1620
            goto again;
118,456✔
1621
        }
118,456✔
1622
        sleep_usec = MIN_WAIT_TIME; /* reset */
269,138✔
1623

1624
        tv = tv->next;
269,138✔
1625
    }
269,138✔
1626
    SCMutexUnlock(&tv_root_lock);
12,973✔
1627
}
12,973✔
1628
#undef MIN_WAIT_TIME
1629
#undef MAX_WAIT_TIME
1630

1631
void TmThreadKillThreads(void)
1632
{
2,037✔
1633
    int i = 0;
2,037✔
1634

1635
    for (i = 0; i < TVT_MAX; i++) {
8,148✔
1636
        TmThreadKillThreadsFamily(i);
6,111✔
1637
    }
6,111✔
1638
}
2,037✔
1639

1640
static void TmThreadFree(ThreadVars *tv)
1641
{
29,731✔
1642
    TmSlot *s;
29,731✔
1643
    TmSlot *ps;
29,731✔
1644
    if (tv == NULL)
29,731✔
1645
        return;
×
1646

1647
    SCLogDebug("Freeing thread '%s'.", tv->name);
29,731✔
1648

1649
    ThreadFreeStorage(tv);
29,731✔
1650

1651
    if (tv->flow_queue) {
29,731✔
1652
        BUG_ON(tv->flow_queue->qlen != 0);
13,597✔
1653
        SCFree(tv->flow_queue);
13,597✔
1654
    }
13,597✔
1655

1656
    StatsThreadCleanup(&tv->stats);
29,731✔
1657

1658
    TmThreadDeinitMC(tv);
29,731✔
1659

1660
    if (tv->printable_name) {
29,731✔
1661
        SCFree(tv->printable_name);
59✔
1662
    }
59✔
1663

1664
    if (tv->iface_name) {
29,731✔
1665
        SCFree(tv->iface_name);
59✔
1666
    }
59✔
1667

1668
    if (tv->stream_pq_local) {
29,731✔
1669
        BUG_ON(tv->stream_pq_local->len);
93✔
1670
        SCMutexDestroy(&tv->stream_pq_local->mutex_q);
93✔
1671
        SCFree(tv->stream_pq_local);
93✔
1672
    }
93✔
1673

1674
    s = (TmSlot *)tv->tm_slots;
29,731✔
1675
    while (s) {
57,211✔
1676
        ps = s;
27,480✔
1677
        s = s->slot_next;
27,480✔
1678
        SCFree(ps);
27,480✔
1679
    }
27,480✔
1680

1681
    TmThreadsUnregisterThread(tv->id);
29,731✔
1682
    SCFree(tv);
29,731✔
1683
}
29,731✔
1684

1685
void TmThreadClearThreadsFamily(int family)
1686
{
6,862✔
1687
    ThreadVars *tv = NULL;
6,862✔
1688
    ThreadVars *ptv = NULL;
6,862✔
1689

1690
    if ((family < 0) || (family >= TVT_MAX))
6,862✔
1691
        return;
×
1692

1693
    SCMutexLock(&tv_root_lock);
6,862✔
1694
    tv = tv_root[family];
6,862✔
1695

1696
    while (tv) {
36,593✔
1697
        ptv = tv;
29,731✔
1698
        tv = tv->next;
29,731✔
1699
        TmThreadFree(ptv);
29,731✔
1700
    }
29,731✔
1701
    tv_root[family] = NULL;
6,862✔
1702
    SCMutexUnlock(&tv_root_lock);
6,862✔
1703
}
6,862✔
1704

1705
/**
1706
 * \brief Spawns a thread associated with the ThreadVars instance tv
1707
 *
1708
 * \retval TM_ECODE_OK on success and TM_ECODE_FAILED on failure
1709
 */
1710
TmEcode TmThreadSpawn(ThreadVars *tv)
1711
{
29,781✔
1712
    pthread_attr_t attr;
29,781✔
1713
    if (tv->tm_func == NULL) {
29,781✔
1714
        FatalError("No thread function set");
×
1715
    }
×
1716

1717
    /* Initialize and set thread detached attribute */
1718
    pthread_attr_init(&attr);
29,781✔
1719

1720
    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
29,781✔
1721

1722
    /* Adjust thread stack size if configured */
1723
    if (threading_set_stack_size) {
29,781✔
1724
        SCLogDebug("Setting per-thread stack size to %" PRIu64, threading_set_stack_size);
×
1725
        if (pthread_attr_setstacksize(&attr, (size_t)threading_set_stack_size)) {
×
1726
            FatalError("Unable to increase stack size to %" PRIu64 " in thread attributes",
×
1727
                    threading_set_stack_size);
×
1728
        }
×
1729
    }
×
1730

1731
    int rc = pthread_create(&tv->t, &attr, tv->tm_func, (void *)tv);
29,781✔
1732
    if (rc) {
29,781✔
1733
        FatalError("Unable to create thread %s with pthread_create(): retval %d: %s", tv->name, rc,
×
1734
                strerror(errno));
×
1735
    }
×
1736

1737
#if DEBUG && HAVE_PTHREAD_GETATTR_NP
1738
    if (threading_set_stack_size) {
1739
        if (pthread_getattr_np(tv->t, &attr) == 0) {
1740
            size_t stack_size;
1741
            void *stack_addr;
1742
            pthread_attr_getstack(&attr, &stack_addr, &stack_size);
1743
            SCLogDebug("stack: %p;  size %" PRIu64, stack_addr, (uintmax_t)stack_size);
1744
        } else {
1745
            SCLogDebug("Unable to retrieve current stack-size for display; return code from "
1746
                       "pthread_getattr_np() is %" PRId32,
1747
                    rc);
1748
        }
1749
    }
1750
#endif
1751

1752
    TmThreadWaitForFlag(tv, THV_INIT_DONE | THV_RUNNING_DONE);
29,781✔
1753

1754
    TmThreadAppend(tv, tv->type);
29,781✔
1755
    return TM_ECODE_OK;
29,781✔
1756
}
29,781✔
1757

1758
/**
1759
 * \brief Spawns a "fake" lib thread associated with the ThreadVars instance tv
1760
 *
1761
 * \retval TM_ECODE_OK on success and TM_ECODE_FAILED on failure
1762
 */
1763
TmEcode TmThreadLibSpawn(ThreadVars *tv)
1764
{
×
1765
    if (tv->tm_func == NULL) {
×
1766
        printf("ERROR: no thread function set\n");
×
1767
        return TM_ECODE_FAILED;
×
1768
    }
×
1769

1770
    if (tv->tm_func((void *)tv) == (void *)-1) {
×
1771
        return TM_ECODE_FAILED;
×
1772
    }
×
1773

1774
    TmThreadWaitForFlag(tv, THV_INIT_DONE | THV_RUNNING_DONE);
×
1775

1776
    return TM_ECODE_OK;
×
1777
}
×
1778

1779
/**
1780
 * \brief Initializes the mutex and condition variables for this TV
1781
 *
1782
 * It can be used by a thread to control a wait loop that can also be
1783
 * influenced by other threads.
1784
 *
1785
 * \param tv Pointer to a TV instance
1786
 */
1787
void TmThreadInitMC(ThreadVars *tv)
1788
{
5,907✔
1789
    if ( (tv->ctrl_mutex = SCMalloc(sizeof(*tv->ctrl_mutex))) == NULL) {
5,907✔
1790
        FatalError("Fatal error encountered in TmThreadInitMC.  "
×
1791
                   "Exiting...");
×
1792
    }
×
1793

1794
    if (SCCtrlMutexInit(tv->ctrl_mutex, NULL) != 0) {
5,907✔
1795
        printf("Error initializing the tv->m mutex\n");
×
1796
        exit(EXIT_FAILURE);
×
1797
    }
×
1798

1799
    if ( (tv->ctrl_cond = SCMalloc(sizeof(*tv->ctrl_cond))) == NULL) {
5,907✔
1800
        FatalError("Fatal error encountered in TmThreadInitMC.  "
×
1801
                   "Exiting...");
×
1802
    }
×
1803

1804
    if (SCCtrlCondInit(tv->ctrl_cond, NULL) != 0) {
5,907✔
1805
        FatalError("Error initializing the tv->cond condition "
×
1806
                   "variable");
×
1807
    }
×
1808
}
5,907✔
1809

1810
static void TmThreadDeinitMC(ThreadVars *tv)
1811
{
29,731✔
1812
    if (tv->ctrl_mutex) {
29,731✔
1813
        SCCtrlMutexDestroy(tv->ctrl_mutex);
5,892✔
1814
        SCFree(tv->ctrl_mutex);
5,892✔
1815
    }
5,892✔
1816
    if (tv->ctrl_cond) {
29,731✔
1817
        SCCtrlCondDestroy(tv->ctrl_cond);
5,892✔
1818
        SCFree(tv->ctrl_cond);
5,892✔
1819
    }
5,892✔
1820
}
29,731✔
1821

1822
/**
1823
 * \brief Waits till the specified flag(s) is(are) set.  We don't bother if
1824
 *        the kill flag has been set or not on the thread.
1825
 *
1826
 * \param tv Pointer to the TV instance.
1827
 */
1828
void TmThreadWaitForFlag(ThreadVars *tv, uint32_t flags)
1829
{
59,556✔
1830
    while (!TmThreadsCheckFlag(tv, flags)) {
3,376,381✔
1831
        SleepUsec(100);
3,316,825✔
1832
    }
3,316,825✔
1833
}
59,556✔
1834

1835
/**
1836
 * \brief Unpauses a thread
1837
 *
1838
 * \param tv Pointer to a TV instance that has to be unpaused
1839
 */
1840
void TmThreadContinue(ThreadVars *tv)
1841
{
31,179✔
1842
    TmThreadsUnsetFlag(tv, THV_PAUSE);
31,179✔
1843
}
31,179✔
1844

1845
static TmEcode WaitOnThreadsRunningByType(const int t)
1846
{
6,111✔
1847
    struct timeval start_ts;
6,111✔
1848
    struct timeval cur_ts;
6,111✔
1849
    uint32_t thread_cnt = 0;
6,111✔
1850

1851
    /* on retries, this will init to the last thread that started up already */
1852
    ThreadVars *tv_start = tv_root[t];
6,111✔
1853
    SCMutexLock(&tv_root_lock);
6,111✔
1854
    for (ThreadVars *tv = tv_start; tv != NULL; tv = tv->next) {
23,315✔
1855
        thread_cnt++;
17,204✔
1856
    }
17,204✔
1857
    SCMutexUnlock(&tv_root_lock);
6,111✔
1858

1859
    /* give threads a second each to start up, plus a margin of a minute. */
1860
    uint32_t time_budget = 60 + thread_cnt;
6,111✔
1861

1862
    gettimeofday(&start_ts, NULL);
6,111✔
1863
again:
16,731✔
1864
    SCMutexLock(&tv_root_lock);
16,731✔
1865
    ThreadVars *tv = tv_start;
16,731✔
1866
    while (tv != NULL) {
35,991✔
1867
        if (TmThreadsCheckFlag(tv, (THV_FAILED | THV_CLOSED | THV_DEAD))) {
29,880✔
1868
            SCMutexUnlock(&tv_root_lock);
×
1869

1870
            SCLogError("thread \"%s\" failed to "
×
1871
                       "start: flags %04x",
×
1872
                    tv->name, SC_ATOMIC_GET(tv->flags));
×
1873
            return TM_ECODE_FAILED;
×
1874
        }
×
1875

1876
        if (!(TmThreadsCheckFlag(tv, THV_RUNNING | THV_RUNNING_DONE))) {
29,880✔
1877
            SCMutexUnlock(&tv_root_lock);
10,620✔
1878

1879
            /* 60 seconds provided for the thread to transition from
1880
             * THV_INIT_DONE to THV_RUNNING */
1881
            gettimeofday(&cur_ts, NULL);
10,620✔
1882
            if (((uint32_t)cur_ts.tv_sec - (uint32_t)start_ts.tv_sec) > time_budget) {
10,620✔
1883
                SCLogError("thread \"%s\" failed to "
×
1884
                           "start in time: flags %04x. Total threads: %u. Time budget %us",
×
1885
                        tv->name, SC_ATOMIC_GET(tv->flags), thread_cnt, time_budget);
×
1886
                return TM_ECODE_FAILED;
×
1887
            }
×
1888

1889
            /* sleep a little to give the thread some
1890
             * time to start running */
1891
            SleepUsec(100);
10,620✔
1892
            goto again;
10,620✔
1893
        }
10,620✔
1894
        tv_start = tv;
19,260✔
1895

1896
        tv = tv->next;
19,260✔
1897
    }
19,260✔
1898
    SCMutexUnlock(&tv_root_lock);
6,111✔
1899
    return TM_ECODE_OK;
6,111✔
1900
}
16,731✔
1901

1902
/**
1903
 * \brief Waits for all threads to be in a running state
1904
 *
1905
 * \retval TM_ECODE_OK if all are running or error if a thread failed
1906
 */
1907
TmEcode TmThreadWaitOnThreadRunning(void)
1908
{
2,037✔
1909
    uint16_t RX_num = 0;
2,037✔
1910
    uint16_t W_num = 0;
2,037✔
1911
    uint16_t FM_num = 0;
2,037✔
1912
    uint16_t FR_num = 0;
2,037✔
1913
    uint16_t TX_num = 0;
2,037✔
1914

1915
    for (int i = 0; i < TVT_MAX; i++) {
8,148✔
1916
        if (WaitOnThreadsRunningByType(i) != TM_ECODE_OK)
6,111✔
1917
            return TM_ECODE_FAILED;
×
1918
    }
6,111✔
1919

1920
    SCMutexLock(&tv_root_lock);
2,037✔
1921
    for (int i = 0; i < TVT_MAX; i++) {
8,148✔
1922
        for (ThreadVars *tv = tv_root[i]; tv != NULL; tv = tv->next) {
23,315✔
1923
            if (strncmp(thread_name_autofp, tv->name, strlen(thread_name_autofp)) == 0)
17,204✔
1924
                RX_num++;
1,983✔
1925
            else if (strncmp(thread_name_workers, tv->name, strlen(thread_name_workers)) == 0)
15,221✔
1926
                W_num++;
8,013✔
1927
            else if (strncmp(thread_name_verdict, tv->name, strlen(thread_name_verdict)) == 0)
7,208✔
1928
                TX_num++;
1✔
1929
            else if (strncmp(thread_name_flow_mgr, tv->name, strlen(thread_name_flow_mgr)) == 0)
7,207✔
1930
                FM_num++;
2,035✔
1931
            else if (strncmp(thread_name_flow_rec, tv->name, strlen(thread_name_flow_rec)) == 0)
5,172✔
1932
                FR_num++;
2,035✔
1933
        }
17,204✔
1934
    }
6,111✔
1935
    SCMutexUnlock(&tv_root_lock);
2,037✔
1936

1937
    /* Construct a welcome string displaying
1938
     * initialized thread types and counts */
1939
    uint16_t app_len = 32;
2,037✔
1940
    uint16_t buf_len = 256;
2,037✔
1941

1942
    char append_str[app_len];
2,037✔
1943
    char thread_counts[buf_len];
2,037✔
1944

1945
    strlcpy(thread_counts, "Threads created -> ", strlen("Threads created -> ") + 1);
2,037✔
1946
    if (RX_num > 0) {
2,037✔
1947
        snprintf(append_str, app_len, "RX: %u ", RX_num);
1,974✔
1948
        strlcat(thread_counts, append_str, buf_len);
1,974✔
1949
    }
1,974✔
1950
    if (W_num > 0) {
2,037✔
1951
        snprintf(append_str, app_len, "W: %u ", W_num);
2,035✔
1952
        strlcat(thread_counts, append_str, buf_len);
2,035✔
1953
    }
2,035✔
1954
    if (TX_num > 0) {
2,037✔
1955
        snprintf(append_str, app_len, "TX: %u ", TX_num);
1✔
1956
        strlcat(thread_counts, append_str, buf_len);
1✔
1957
    }
1✔
1958
    if (FM_num > 0) {
2,037✔
1959
        snprintf(append_str, app_len, "FM: %u ", FM_num);
2,035✔
1960
        strlcat(thread_counts, append_str, buf_len);
2,035✔
1961
    }
2,035✔
1962
    if (FR_num > 0) {
2,037✔
1963
        snprintf(append_str, app_len, "FR: %u ", FR_num);
2,035✔
1964
        strlcat(thread_counts, append_str, buf_len);
2,035✔
1965
    }
2,035✔
1966
    snprintf(append_str, app_len, "  Engine started.");
2,037✔
1967
    strlcat(thread_counts, append_str, buf_len);
2,037✔
1968
    SCLogNotice("%s", thread_counts);
2,037✔
1969

1970
    return TM_ECODE_OK;
2,037✔
1971
}
2,037✔
1972

1973
/**
1974
 * \brief Unpauses all threads present in tv_root
1975
 */
1976
void TmThreadContinueThreads(void)
1977
{
3,433✔
1978
    SCMutexLock(&tv_root_lock);
3,433✔
1979
    for (int i = 0; i < TVT_MAX; i++) {
13,732✔
1980
        ThreadVars *tv = tv_root[i];
10,299✔
1981
        while (tv != NULL) {
41,463✔
1982
            TmThreadContinue(tv);
31,164✔
1983
            tv = tv->next;
31,164✔
1984
        }
31,164✔
1985
    }
10,299✔
1986
    SCMutexUnlock(&tv_root_lock);
3,433✔
1987
}
3,433✔
1988

1989
/**
1990
 * \brief Used to check the thread for certain conditions of failure.
1991
 */
1992
void TmThreadCheckThreadState(void)
1993
{
96,133✔
1994
    SCMutexLock(&tv_root_lock);
96,133✔
1995
    for (int i = 0; i < TVT_MAX; i++) {
384,532✔
1996
        ThreadVars *tv = tv_root[i];
288,399✔
1997
        while (tv) {
1,189,671✔
1998
            if (TmThreadsCheckFlag(tv, THV_FAILED)) {
901,272✔
1999
                FatalError("thread %s failed", tv->name);
×
2000
            }
×
2001
            tv = tv->next;
901,272✔
2002
        }
901,272✔
2003
    }
288,399✔
2004
    SCMutexUnlock(&tv_root_lock);
96,133✔
2005
}
96,133✔
2006

2007
/**
2008
 *  \brief Used to check if all threads have finished their initialization.  On
2009
 *         finding an un-initialized thread, it waits till that thread completes
2010
 *         its initialization, before proceeding to the next thread.
2011
 *
2012
 *  \retval TM_ECODE_OK all initialized properly
2013
 *  \retval TM_ECODE_FAILED failure
2014
 */
2015
TmEcode TmThreadWaitOnThreadInit(void)
2016
{
3,433✔
2017
    struct timeval start_ts;
3,433✔
2018
    struct timeval cur_ts;
3,433✔
2019
    gettimeofday(&start_ts, NULL);
3,433✔
2020

2021
again:
3,433✔
2022
    SCMutexLock(&tv_root_lock);
3,433✔
2023
    for (int i = 0; i < TVT_MAX; i++) {
13,732✔
2024
        ThreadVars *tv = tv_root[i];
10,299✔
2025
        while (tv != NULL) {
41,463✔
2026
            if (TmThreadsCheckFlag(tv, (THV_CLOSED|THV_DEAD))) {
31,164✔
2027
                SCMutexUnlock(&tv_root_lock);
×
2028

2029
                SCLogError("thread \"%s\" failed to "
×
2030
                           "initialize: flags %04x",
×
2031
                        tv->name, SC_ATOMIC_GET(tv->flags));
×
2032
                return TM_ECODE_FAILED;
×
2033
            }
×
2034

2035
            if (!(TmThreadsCheckFlag(tv, THV_INIT_DONE))) {
31,164✔
2036
                SCMutexUnlock(&tv_root_lock);
×
2037

2038
                gettimeofday(&cur_ts, NULL);
×
2039
                if ((cur_ts.tv_sec - start_ts.tv_sec) > 120) {
×
2040
                    SCLogError("thread \"%s\" failed to "
×
2041
                               "initialize in time: flags %04x",
×
2042
                            tv->name, SC_ATOMIC_GET(tv->flags));
×
2043
                    return TM_ECODE_FAILED;
×
2044
                }
×
2045

2046
                /* sleep a little to give the thread some
2047
                 * time to finish initialization */
2048
                SleepUsec(100);
×
2049
                goto again;
×
2050
            }
×
2051

2052
            if (TmThreadsCheckFlag(tv, THV_FAILED)) {
31,164✔
2053
                SCMutexUnlock(&tv_root_lock);
×
2054
                SCLogError("thread \"%s\" failed to "
×
2055
                           "initialize.",
×
2056
                        tv->name);
×
2057
                return TM_ECODE_FAILED;
×
2058
            }
×
2059
            if (TmThreadsCheckFlag(tv, THV_CLOSED)) {
31,164✔
2060
                SCMutexUnlock(&tv_root_lock);
×
2061
                SCLogError("thread \"%s\" closed on "
×
2062
                           "initialization.",
×
2063
                        tv->name);
×
2064
                return TM_ECODE_FAILED;
×
2065
            }
×
2066

2067
            tv = tv->next;
31,164✔
2068
        }
31,164✔
2069
    }
10,299✔
2070
    SCMutexUnlock(&tv_root_lock);
3,433✔
2071

2072
    return TM_ECODE_OK;
3,433✔
2073
}
3,433✔
2074

2075
/**
2076
 * \brief returns a count of all the threads that match the flag
2077
 */
2078
uint32_t TmThreadCountThreadsByTmmFlags(uint8_t flags)
2079
{
3,448✔
2080
    uint32_t cnt = 0;
3,448✔
2081
    SCMutexLock(&tv_root_lock);
3,448✔
2082
    for (int i = 0; i < TVT_MAX; i++) {
13,792✔
2083
        ThreadVars *tv = tv_root[i];
10,344✔
2084
        while (tv != NULL) {
41,674✔
2085
            if ((tv->tmm_flags & flags) == flags)
31,330✔
2086
                cnt++;
13,651✔
2087

2088
            tv = tv->next;
31,330✔
2089
        }
31,330✔
2090
    }
10,344✔
2091
    SCMutexUnlock(&tv_root_lock);
3,448✔
2092
    return cnt;
3,448✔
2093
}
3,448✔
2094

2095
#ifdef DEBUG_VALIDATION
2096
static void TmThreadDoDumpSlots(const ThreadVars *tv)
2097
{
2098
    for (TmSlot *s = tv->tm_slots; s != NULL; s = s->slot_next) {
2099
        TmModule *m = TmModuleGetById(s->tm_id);
2100
        SCLogNotice("tv %p: -> slot %p tm_id %d name %s",
2101
            tv, s, s->tm_id, m->name);
2102
    }
2103
}
2104

2105
static void TmThreadDumpThreads(void)
2106
{
2107
    SCMutexLock(&tv_root_lock);
2108
    for (int i = 0; i < TVT_MAX; i++) {
2109
        ThreadVars *tv = tv_root[i];
2110
        while (tv != NULL) {
2111
            const uint32_t flags = SC_ATOMIC_GET(tv->flags);
2112
            SCLogNotice("tv %p: type %u name %s tmm_flags %02X flags %X stream_pq %p",
2113
                    tv, tv->type, tv->name, tv->tmm_flags, flags, tv->stream_pq);
2114
            if (tv->inq && tv->stream_pq == tv->inq->pq) {
2115
                SCLogNotice("tv %p: stream_pq at tv->inq %u", tv, tv->inq->id);
2116
            } else if (tv->stream_pq_local != NULL) {
2117
                for (Packet *xp = tv->stream_pq_local->top; xp != NULL; xp = xp->next) {
2118
                    SCLogNotice("tv %p: ==> stream_pq_local: pq.len %u packet src %s",
2119
                            tv, tv->stream_pq_local->len, PktSrcToString(xp->pkt_src));
2120
                }
2121
            }
2122
            for (Packet *xp = tv->decode_pq.top; xp != NULL; xp = xp->next) {
2123
                SCLogNotice("tv %p: ==> decode_pq: decode_pq.len %u packet src %s",
2124
                        tv, tv->decode_pq.len, PktSrcToString(xp->pkt_src));
2125
            }
2126
            TmThreadDoDumpSlots(tv);
2127
            tv = tv->next;
2128
        }
2129
    }
2130
    SCMutexUnlock(&tv_root_lock);
2131
    TmThreadsListThreads();
2132
}
2133
#endif
2134

2135
/* Aligned to CLS to avoid false sharing between atomic ops. */
2136
typedef struct Thread_ {
2137
    ThreadVars *tv;     /**< threadvars structure */
2138
    const char *name;
2139
    int type;
2140
    int in_use;         /**< bool to indicate this is in use */
2141

2142
    SC_ATOMIC_DECLARE(SCTime_t, pktts); /**< current packet time of this thread
2143
                                         *   (offline mode) */
2144
    SCTime_t sys_sec_stamp; /**< timestamp in real system
2145
                             *   time when the pktts was last updated. */
2146
    SCSpinlock spin;
2147
} __attribute__((aligned(CLS))) Thread;
2148

2149
typedef struct Threads_ {
2150
    Thread *threads;
2151
    size_t threads_size;
2152
    int threads_cnt;
2153
} Threads;
2154

2155
static bool thread_store_sealed = false;
2156
static Threads thread_store = { NULL, 0, 0 };
2157
static SCMutex thread_store_lock = SCMUTEX_INITIALIZER;
2158

2159
void TmThreadsSealThreads(void)
2160
{
3,431✔
2161
    SCMutexLock(&thread_store_lock);
3,431✔
2162
    DEBUG_VALIDATE_BUG_ON(thread_store_sealed);
3,431✔
2163
    thread_store_sealed = true;
3,431✔
2164
    SCMutexUnlock(&thread_store_lock);
3,431✔
2165
}
3,431✔
2166

2167
void TmThreadsUnsealThreads(void)
2168
{
3,431✔
2169
    SCMutexLock(&thread_store_lock);
3,431✔
2170
    DEBUG_VALIDATE_BUG_ON(!thread_store_sealed);
3,431✔
2171
    thread_store_sealed = false;
3,431✔
2172
    SCMutexUnlock(&thread_store_lock);
3,431✔
2173
}
3,431✔
2174

2175
void TmThreadsListThreads(void)
2176
{
×
2177
    SCMutexLock(&thread_store_lock);
×
2178
    for (size_t s = 0; s < thread_store.threads_size; s++) {
×
2179
        Thread *t = &thread_store.threads[s];
×
2180
        if (t == NULL || t->in_use == 0)
×
2181
            continue;
×
2182

2183
        SCLogNotice("Thread %"PRIuMAX", %s type %d, tv %p in_use %d",
×
2184
                (uintmax_t)s+1, t->name, t->type, t->tv, t->in_use);
×
2185
        if (t->tv) {
×
2186
            ThreadVars *tv = t->tv;
×
2187
            const uint32_t flags = SC_ATOMIC_GET(tv->flags);
×
2188
            SCLogNotice("tv %p type %u name %s tmm_flags %02X flags %X",
×
2189
                    tv, tv->type, tv->name, tv->tmm_flags, flags);
×
2190
        }
×
2191
    }
×
2192
    SCMutexUnlock(&thread_store_lock);
×
2193
}
×
2194

2195
#define STEP 32
2,044✔
2196
/**
2197
 *  \retval id thread id, or 0 if not found
2198
 */
2199
int TmThreadsRegisterThread(ThreadVars *tv, const int type)
2200
{
29,781✔
2201
    SCMutexLock(&thread_store_lock);
29,781✔
2202
    DEBUG_VALIDATE_BUG_ON(thread_store_sealed);
29,781✔
2203
    if (thread_store.threads == NULL) {
29,781✔
2204
        thread_store.threads = SCCalloc(STEP, sizeof(Thread));
2,044✔
2205
        BUG_ON(thread_store.threads == NULL);
2,044✔
2206
        thread_store.threads_size = STEP;
2,044✔
2207
    }
2,044✔
2208

2209
    size_t s;
29,781✔
2210
    for (s = 0; s < thread_store.threads_size; s++) {
157,984✔
2211
        if (thread_store.threads[s].in_use == 0) {
157,984✔
2212
            Thread *t = &thread_store.threads[s];
29,781✔
2213
            SCSpinInit(&t->spin, 0);
29,781✔
2214
            SCSpinLock(&t->spin);
29,781✔
2215
            t->name = tv->name;
29,781✔
2216
            t->type = type;
29,781✔
2217
            t->tv = tv;
29,781✔
2218
            t->in_use = 1;
29,781✔
2219
            SCSpinUnlock(&t->spin);
29,781✔
2220

2221
            SCMutexUnlock(&thread_store_lock);
29,781✔
2222
            return (int)(s+1);
29,781✔
2223
        }
29,781✔
2224
    }
157,984✔
2225

2226
    /* if we get here the array is completely filled */
2227
    void *newmem = SCRealloc(thread_store.threads, ((thread_store.threads_size + STEP) * sizeof(Thread)));
×
2228
    BUG_ON(newmem == NULL);
×
2229
    thread_store.threads = newmem;
×
2230
    memset((uint8_t *)thread_store.threads + (thread_store.threads_size * sizeof(Thread)), 0x00, STEP * sizeof(Thread));
×
2231

2232
    Thread *t = &thread_store.threads[thread_store.threads_size];
×
2233
    SCSpinInit(&t->spin, 0);
×
2234
    SCSpinLock(&t->spin);
×
2235
    t->name = tv->name;
×
2236
    t->type = type;
×
2237
    t->tv = tv;
×
2238
    t->in_use = 1;
×
2239
    SCSpinUnlock(&t->spin);
×
2240

2241
    s = thread_store.threads_size;
×
2242
    thread_store.threads_size += STEP;
×
2243

2244
    SCMutexUnlock(&thread_store_lock);
×
2245
    return (int)(s+1);
×
2246
}
×
2247
#undef STEP
2248

2249
void TmThreadsUnregisterThread(const int id)
2250
{
29,731✔
2251
    SCMutexLock(&thread_store_lock);
29,731✔
2252
    DEBUG_VALIDATE_BUG_ON(thread_store_sealed);
29,731✔
2253
    if (id <= 0 || id > (int)thread_store.threads_size) {
29,731✔
2254
        SCMutexUnlock(&thread_store_lock);
×
2255
        return;
×
2256
    }
×
2257

2258
    /* id is one higher than index */
2259
    int idx = id - 1;
29,731✔
2260

2261
    /* reset thread_id, which serves as clearing the record */
2262
    thread_store.threads[idx].in_use = 0;
29,731✔
2263

2264
    /* check if we have at least one registered thread left */
2265
    size_t s;
29,731✔
2266
    for (s = 0; s < thread_store.threads_size; s++) {
113,725✔
2267
        Thread *t = &thread_store.threads[s];
111,719✔
2268
        if (t->in_use == 1) {
111,719✔
2269
            goto end;
27,725✔
2270
        }
27,725✔
2271
    }
111,719✔
2272

2273
    /* if we get here no threads are registered */
2274
    SCFree(thread_store.threads);
2,006✔
2275
    thread_store.threads = NULL;
2,006✔
2276
    thread_store.threads_size = 0;
2,006✔
2277
    thread_store.threads_cnt = 0;
2,006✔
2278

2279
end:
29,731✔
2280
    SCMutexUnlock(&thread_store_lock);
29,731✔
2281
}
29,731✔
2282

2283
void TmThreadsSetThreadTimestamp(const int id, const SCTime_t ts)
2284
{
2,809,447✔
2285
    SCTime_t now = SCTimeGetTime();
2,809,447✔
2286
    int idx = id - 1;
2,809,447✔
2287
    Thread *t = &thread_store.threads[idx];
2,809,447✔
2288
    SCSpinLock(&t->spin);
2,809,447✔
2289
    SC_ATOMIC_SET(t->pktts, ts);
2,809,447✔
2290

2291
#ifdef DEBUG
2292
    if (t->sys_sec_stamp.secs != 0) {
2293
        SCTime_t tmpts = SCTIME_ADD_SECS(t->sys_sec_stamp, 3);
2294
        if (SCTIME_CMP_LT(tmpts, now)) {
2295
            SCLogDebug("%s: thread slept for %u secs", t->name, (uint32_t)(now.secs - tmpts.secs));
2296
        }
2297
    }
2298
#endif
2299

2300
    t->sys_sec_stamp = now;
2,809,447✔
2301
    SCSpinUnlock(&t->spin);
2,809,447✔
2302
}
2,809,447✔
2303

2304
bool TmThreadsTimeSubsysIsReady(void)
2305
{
28,140✔
2306
    static SCTime_t nullts = SCTIME_INITIALIZER;
28,140✔
2307
    bool ready = true;
28,140✔
2308
    for (size_t s = 0; s < thread_store.threads_size; s++) {
54,711✔
2309
        Thread *t = &thread_store.threads[s];
54,711✔
2310
        if (!t->in_use) {
54,711✔
2311
            break;
3,373✔
2312
        }
3,373✔
2313
        SCSpinLock(&t->spin);
51,338✔
2314
        if (t->type != TVT_PPT) {
51,338✔
2315
            SCSpinUnlock(&t->spin);
9,838✔
2316
            continue;
9,838✔
2317
        }
9,838✔
2318
        if (SCTIME_CMP_EQ(t->sys_sec_stamp, nullts)) {
41,500✔
2319
            ready = false;
24,767✔
2320
            SCSpinUnlock(&t->spin);
24,767✔
2321
            break;
24,767✔
2322
        }
24,767✔
2323
        SCSpinUnlock(&t->spin);
16,733✔
2324
    }
16,733✔
2325
    return ready;
28,140✔
2326
}
28,140✔
2327

2328
void TmThreadsInitThreadsTimestamp(const SCTime_t ts)
2329
{
3,372✔
2330
    SCTime_t now = SCTimeGetTime();
3,372✔
2331
    for (size_t s = 0; s < thread_store.threads_size; s++) {
34,075✔
2332
        Thread *t = &thread_store.threads[s];
34,075✔
2333
        if (!t->in_use) {
34,075✔
2334
            break;
3,372✔
2335
        }
3,372✔
2336
        SCSpinLock(&t->spin);
30,703✔
2337
        if (t->type != TVT_PPT) {
30,703✔
2338
            SCSpinUnlock(&t->spin);
13,975✔
2339
            continue;
13,975✔
2340
        }
13,975✔
2341
        SC_ATOMIC_SET(t->pktts, ts);
16,728✔
2342
        t->sys_sec_stamp = now;
16,728✔
2343
        SCSpinUnlock(&t->spin);
16,728✔
2344
    }
16,728✔
2345
}
3,372✔
2346

2347
SCTime_t TmThreadsGetThreadTime(const int idx)
2348
{
2,558✔
2349
    DEBUG_VALIDATE_BUG_ON(idx == 0);
2,558✔
2350
    const int i = idx - 1;
2,558✔
2351
    Thread *t = &thread_store.threads[i];
2,558✔
2352
    return SC_ATOMIC_GET(t->pktts);
2,558✔
2353
}
2,558✔
2354

2355
void TmThreadsGetMinimalTimestamp(struct timeval *ts)
2356
{
387,046✔
2357
    struct timeval local = { 0 };
387,046✔
2358
    static SCTime_t nullts = SCTIME_INITIALIZER;
387,046✔
2359
    bool set = false;
387,046✔
2360
    SCTime_t now = SCTimeGetTime();
387,046✔
2361

2362
    for (size_t s = 0; s < thread_store.threads_size; s++) {
4,095,564✔
2363
        Thread *t = &thread_store.threads[s];
4,095,564✔
2364
        if (t->in_use == 0) {
4,095,564✔
2365
            break;
387,046✔
2366
        }
387,046✔
2367
        SCSpinLock(&t->spin);
3,708,518✔
2368
        /* only packet threads set timestamps based on packets */
2369
        if (t->type != TVT_PPT) {
3,708,518✔
2370
            SCSpinUnlock(&t->spin);
1,800,020✔
2371
            continue;
1,800,020✔
2372
        }
1,800,020✔
2373
        SCTime_t pktts = SC_ATOMIC_GET(t->pktts);
1,908,498✔
2374
        if (SCTIME_CMP_NEQ(pktts, nullts)) {
1,908,498✔
2375
            SCTime_t sys_sec_stamp = SCTIME_ADD_SECS(t->sys_sec_stamp, 5);
1,890,398✔
2376
            /* ignore sleeping threads */
2377
            if (SCTIME_CMP_LT(sys_sec_stamp, now)) {
1,890,398✔
2378
                SCSpinUnlock(&t->spin);
2,234✔
2379
                continue;
2,234✔
2380
            }
2,234✔
2381
            if (!set) {
1,888,164✔
2382
                SCTIME_TO_TIMEVAL(&local, pktts);
384,355✔
2383
                set = true;
384,355✔
2384
            } else {
1,503,809✔
2385
                if (SCTIME_CMP_LT(pktts, SCTIME_FROM_TIMEVAL(&local))) {
1,503,809✔
2386
                    SCTIME_TO_TIMEVAL(&local, pktts);
5,042✔
2387
                }
5,042✔
2388
            }
1,503,809✔
2389
        }
1,888,164✔
2390
        SCSpinUnlock(&t->spin);
1,906,264✔
2391
    }
1,906,264✔
2392
    *ts = local;
387,046✔
2393
    SCLogDebug("ts->tv_sec %"PRIuMAX, (uintmax_t)ts->tv_sec);
387,046✔
2394
}
387,046✔
2395

2396
uint16_t TmThreadsGetWorkerThreadMax(void)
2397
{
33✔
2398
    uint16_t ncpus = UtilCpuGetNumProcessorsOnline();
33✔
2399
    int thread_max = TmThreadGetNbThreads(WORKER_CPU_SET);
33✔
2400
    /* always create at least one thread */
2401
    if (thread_max == 0)
33✔
2402
        thread_max = ncpus * threading_detect_ratio;
33✔
2403
    if (thread_max < 1)
33✔
2404
        thread_max = 1;
×
2405
    if (thread_max > 1024) {
33✔
2406
        SCLogWarning("limited number of 'worker' threads to 1024. Wanted %d", thread_max);
×
2407
        thread_max = 1024;
×
2408
    }
×
2409
    return (uint16_t)thread_max;
33✔
2410
}
33✔
2411

2412
/** \brief inject a flow into a threads flow queue
2413
 */
2414
void TmThreadsInjectFlowById(Flow *f, const int id)
2415
{
4,470✔
2416
    if (id > 0 && id <= (int)thread_store.threads_size) {
4,470✔
2417
        int idx = id - 1;
4,470✔
2418
        Thread *t = &thread_store.threads[idx];
4,470✔
2419
        ThreadVars *tv = t->tv;
4,470✔
2420
        if (tv != NULL && tv->flow_queue != NULL) {
4,470✔
2421
            FlowEnqueue(tv->flow_queue, f);
4,470✔
2422

2423
            /* wake up listening thread(s) if necessary */
2424
            if (tv->inq != NULL) {
4,470✔
2425
                SCMutexLock(&tv->inq->pq->mutex_q);
3,397✔
2426
                SCCondSignal(&tv->inq->pq->cond_q);
3,397✔
2427
                SCMutexUnlock(&tv->inq->pq->mutex_q);
3,397✔
2428
            } else if (tv->break_loop) {
3,402✔
2429
                TmThreadsCaptureBreakLoop(tv);
×
2430
            }
×
2431
            return;
4,470✔
2432
        }
4,470✔
2433
    }
4,470✔
2434
    BUG_ON(1);
×
2435
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc