• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

OISF / suricata / 22553492142

01 Mar 2026 09:48PM UTC coverage: 70.74% (-2.9%) from 73.687%
22553492142

Pull #14920

github

web-flow
Merge e15a765bc into 90823fa90
Pull Request #14920: draft: rust based configuration file parser and loader - v4

38209 of 77306 branches covered (49.43%)

Branch coverage included in aggregate %.

533 of 779 new or added lines in 5 files covered. (68.42%)

11924 existing lines in 491 files now uncovered.

252429 of 333548 relevant lines covered (75.68%)

2403268.06 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.94
/src/tm-threads.c
1
/* Copyright (C) 2007-2024 Open Information Security Foundation
2
 *
3
 * You can copy, redistribute or modify this Program under the terms of
4
 * the GNU General Public License version 2 as published by the Free
5
 * Software Foundation.
6
 *
7
 * This program is distributed in the hope that it will be useful,
8
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10
 * GNU General Public License for more details.
11
 *
12
 * You should have received a copy of the GNU General Public License
13
 * version 2 along with this program; if not, write to the Free Software
14
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15
 * 02110-1301, USA.
16
 */
17

18
/**
19
 * \file
20
 *
21
 * \author Victor Julien <victor@inliniac.net>
22
 * \author Anoop Saldanha <anoopsaldanha@gmail.com>
23
 * \author Eric Leblond <eric@regit.org>
24
 *
25
 * Thread management functions.
26
 */
27

28
#include "suricata-common.h"
29
#include "suricata.h"
30
#include "stream.h"
31
#include "runmodes.h"
32
#include "thread-callbacks.h"
33
#include "threadvars.h"
34
#include "thread-storage.h"
35
#include "tm-queues.h"
36
#include "tm-queuehandlers.h"
37
#include "tm-threads.h"
38
#include "capture-hooks.h"
39
#include "tmqh-packetpool.h"
40
#include "threads.h"
41
#include "util-affinity.h"
42
#include "util-debug.h"
43
#include "util-privs.h"
44
#include "util-cpu.h"
45
#include "util-optimize.h"
46
#include "util-profiling.h"
47
#include "util-signal.h"
48
#include "queue.h"
49
#include "util-validate.h"
50
#include "source-pcap-file-helper.h"
51

52
#ifdef PROFILE_LOCKING
53
thread_local uint64_t mutex_lock_contention;
54
thread_local uint64_t mutex_lock_wait_ticks;
55
thread_local uint64_t mutex_lock_cnt;
56

57
thread_local uint64_t spin_lock_contention;
58
thread_local uint64_t spin_lock_wait_ticks;
59
thread_local uint64_t spin_lock_cnt;
60

61
thread_local uint64_t rww_lock_contention;
62
thread_local uint64_t rww_lock_wait_ticks;
63
thread_local uint64_t rww_lock_cnt;
64

65
thread_local uint64_t rwr_lock_contention;
66
thread_local uint64_t rwr_lock_wait_ticks;
67
thread_local uint64_t rwr_lock_cnt;
68
#endif
69

70
#ifdef OS_FREEBSD
71
#include <sched.h>
72
#include <sys/param.h>
73
#include <sys/resource.h>
74
#include <sys/cpuset.h>
75
#include <sys/thr.h>
76
#define cpu_set_t cpuset_t
77
#endif /* OS_FREEBSD */
78

79
/* prototypes */
80
static int SetCPUAffinity(uint16_t cpu);
81
static void TmThreadDeinitMC(ThreadVars *tv);
82

83
/* root of the threadvars list */
84
ThreadVars *tv_root[TVT_MAX] = { NULL };
85

86
/* lock to protect tv_root */
87
SCMutex tv_root_lock = SCMUTEX_INITIALIZER;
88

89
/**
90
 * \brief Check if a thread flag is set.
91
 *
92
 * \retval 1 flag is set.
93
 * \retval 0 flag is not set.
94
 */
95
int TmThreadsCheckFlag(ThreadVars *tv, uint32_t flag)
96
{
928,064,230✔
97
    return (SC_ATOMIC_GET(tv->flags) & flag) ? 1 : 0;
928,064,230✔
98
}
928,064,230✔
99

100
/**
101
 * \brief Set a thread flag.
102
 */
103
void TmThreadsSetFlag(ThreadVars *tv, uint32_t flag)
104
{
333,771✔
105
    SC_ATOMIC_OR(tv->flags, flag);
333,771✔
106
}
333,771✔
107

108
/**
109
 * \brief Unset a thread flag.
110
 */
111
void TmThreadsUnsetFlag(ThreadVars *tv, uint32_t flag)
112
{
31,580✔
113
    SC_ATOMIC_AND(tv->flags, ~flag);
31,580✔
114
}
31,580✔
115

116
TmEcode TmThreadsProcessDecodePseudoPackets(
117
        ThreadVars *tv, PacketQueueNoLock *decode_pq, TmSlot *slot)
118
{
1,665,712✔
119
    while (decode_pq->top != NULL) {
1,667,357✔
120
        Packet *extra_p = PacketDequeueNoLock(decode_pq);
1,645✔
121
        if (unlikely(extra_p == NULL))
1,645!
122
            continue;
×
123
        DEBUG_VALIDATE_BUG_ON(extra_p->flow != NULL);
1,645✔
124

125
        if (TmThreadsSlotProcessPkt(tv, slot, extra_p) != TM_ECODE_OK) {
1,645!
126
            SCReturnInt(TM_ECODE_FAILED);
×
127
        }
×
128
    }
1,645✔
129
    SCReturnInt(TM_ECODE_OK);
1,665,712✔
130
}
1,665,712✔
131

132
/**
133
 * \brief Separate run function so we can call it recursively.
134
 */
135
TmEcode TmThreadsSlotVarRun(ThreadVars *tv, Packet *p, TmSlot *slot)
136
{
3,213,866✔
137
    for (TmSlot *s = slot; s != NULL; s = s->slot_next) {
6,566,009✔
138
        PACKET_PROFILING_TMM_START(p, s->tm_id);
3,352,143✔
139
        TmEcode r = s->SlotFunc(tv, p, SC_ATOMIC_GET(s->slot_data));
3,352,143✔
140
        PACKET_PROFILING_TMM_END(p, s->tm_id);
3,352,143✔
141
        DEBUG_VALIDATE_BUG_ON(p->flow != NULL);
3,352,143✔
142

143
        /* handle error */
144
        if (unlikely(r == TM_ECODE_FAILED)) {
3,352,143!
145
            /* Encountered error.  Return packets to packetpool and return */
146
            TmThreadsSlotProcessPktFail(tv, NULL);
×
147
            return TM_ECODE_FAILED;
×
148
        }
×
149
        if (s->tm_flags & TM_FLAG_DECODE_TM) {
3,352,143✔
150
            if (TmThreadsProcessDecodePseudoPackets(tv, &tv->decode_pq, s->slot_next) !=
1,665,710!
151
                    TM_ECODE_OK) {
1,665,710✔
152
                return TM_ECODE_FAILED;
×
153
            }
×
154
        }
1,665,710✔
155
    }
3,352,143✔
156

157
    return TM_ECODE_OK;
3,213,866✔
158
}
3,213,866✔
159

160
/** \internal
161
 *
162
 *  \brief Process flow timeout packets
163
 *
164
 *  Process flow timeout pseudo packets. During shutdown this loop
165
 *  is run until the flow engine kills the thread and the queue is
166
 *  empty.
167
 */
168
int TmThreadTimeoutLoop(ThreadVars *tv, TmSlot *s)
169
{
9,778✔
170
    TmSlot *fw_slot = tv->tm_flowworker;
9,778✔
171
    int r = TM_ECODE_OK;
9,778✔
172

173
    if (tv->stream_pq == NULL || fw_slot == NULL) {
9,778!
174
        SCLogDebug("not running TmThreadTimeoutLoop %p/%p", tv->stream_pq, fw_slot);
1,942!
175
        return r;
1,942✔
176
    }
1,942✔
177

178
    SCLogDebug("flow end loop starting");
7,836!
179
    while (1) {
758,984!
180
        SCMutexLock(&tv->stream_pq->mutex_q);
758,984✔
181
        uint32_t len = tv->stream_pq->len;
758,984✔
182
        SCMutexUnlock(&tv->stream_pq->mutex_q);
758,984✔
183
        if (len > 0) {
758,984✔
184
            while (len--) {
1,632✔
185
                SCMutexLock(&tv->stream_pq->mutex_q);
816✔
186
                Packet *p = PacketDequeue(tv->stream_pq);
816✔
187
                SCMutexUnlock(&tv->stream_pq->mutex_q);
816✔
188
                if (likely(p)) {
816!
189
                    DEBUG_VALIDATE_BUG_ON(p->flow != NULL);
816✔
190
                    r = TmThreadsSlotProcessPkt(tv, fw_slot, p);
816✔
191
                    if (r == TM_ECODE_FAILED) {
816!
192
                        break;
×
193
                    }
×
194
                }
816✔
195
            }
816✔
196
        } else {
758,168✔
197
            if (TmThreadsCheckFlag(tv, THV_KILL)) {
758,168✔
198
                break;
7,836✔
199
            }
7,836✔
200
            SleepUsec(1);
750,332✔
201
        }
750,332✔
202
    }
758,984✔
203
    SCLogDebug("flow end loop complete");
7,836!
204
    StatsSyncCounters(&tv->stats);
7,836✔
205

206
    return r;
7,836✔
207
}
9,778✔
208

209
static bool TmThreadsSlotPktAcqLoopInit(ThreadVars *tv)
210
{
2,001✔
211
    TmSlot *s = tv->tm_slots;
2,001✔
212

213
    SCSetThreadName(tv->name);
2,001!
214

215
    if (tv->thread_setup_flags != 0)
2,001!
UNCOV
216
        TmThreadSetupOptions(tv);
×
217

218
    CaptureStatsSetup(tv);
2,001✔
219
    PacketPoolInit();
2,001✔
220

221
    for (TmSlot *slot = s; slot != NULL; slot = slot->slot_next) {
6,085✔
222
        if (slot->SlotThreadInit != NULL) {
4,084!
223
            void *slot_data = NULL;
4,063✔
224
            TmEcode r = slot->SlotThreadInit(tv, slot->slot_initdata, &slot_data);
4,063✔
225
            if (r != TM_ECODE_OK) {
4,063!
226
                if (r == TM_ECODE_DONE) {
×
227
                    EngineDone();
×
228
                    TmThreadsSetFlag(tv, THV_CLOSED | THV_INIT_DONE | THV_RUNNING_DONE);
×
229
                    goto error;
×
230
                } else {
×
231
                    TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE);
×
232
                    goto error;
×
233
                }
×
234
            }
×
235
            (void)SC_ATOMIC_SET(slot->slot_data, slot_data);
4,063✔
236
        }
4,063✔
237

238
        /* if the flowworker module is the first, get the threads input queue */
239
        if (slot == (TmSlot *)tv->tm_slots && (slot->tm_id == TMM_FLOWWORKER)) {
4,084!
240
            tv->stream_pq = tv->inq->pq;
×
241
            tv->tm_flowworker = slot;
×
242
            SCLogDebug("pre-stream packetqueue %p (inq)", tv->stream_pq);
×
243
            tv->flow_queue = FlowQueueNew();
×
244
            if (tv->flow_queue == NULL) {
×
245
                TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE);
×
246
                goto error;
×
247
            }
×
248
        /* setup a queue */
249
        } else if (slot->tm_id == TMM_FLOWWORKER) {
4,084✔
250
            tv->stream_pq_local = SCCalloc(1, sizeof(PacketQueue));
60✔
251
            if (tv->stream_pq_local == NULL)
60!
252
                FatalError("failed to alloc PacketQueue");
×
253
            SCMutexInit(&tv->stream_pq_local->mutex_q, NULL);
60✔
254
            tv->stream_pq = tv->stream_pq_local;
60✔
255
            tv->tm_flowworker = slot;
60✔
256
            SCLogDebug("pre-stream packetqueue %p (local)", tv->stream_pq);
60!
257
            tv->flow_queue = FlowQueueNew();
60✔
258
            if (tv->flow_queue == NULL) {
60!
259
                TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE);
×
260
                goto error;
×
261
            }
×
262
        }
60✔
263
    }
4,084✔
264

265
    StatsSetupPrivate(&tv->stats, tv->printable_name ? tv->printable_name : tv->name);
2,001!
266

267
    TmThreadsSetFlag(tv, THV_INIT_DONE);
2,001✔
268

269
    return true;
2,001✔
270

271
error:
×
272
    return false;
×
273
}
2,001✔
274

275
bool SCTmThreadsSlotPacketLoopFinish(ThreadVars *tv)
276
{
9,778✔
277
    TmSlot *s = tv->tm_slots;
9,778✔
278
    bool rc = true;
9,778✔
279

280
    StatsSyncCounters(&tv->stats);
9,778✔
281

282
    TmThreadsSetFlag(tv, THV_FLOW_LOOP);
9,778✔
283

284
    /* process all pseudo packets the flow timeout may throw at us */
285
    TmThreadTimeoutLoop(tv, s);
9,778✔
286

287
    TmThreadsSetFlag(tv, THV_RUNNING_DONE);
9,778✔
288
    TmThreadWaitForFlag(tv, THV_DEINIT);
9,778✔
289

290
    PacketPoolDestroy();
9,778✔
291

292
    for (TmSlot *slot = s; slot != NULL; slot = slot->slot_next) {
21,644✔
293
        if (slot->SlotThreadExitPrintStats != NULL) {
11,866✔
294
            slot->SlotThreadExitPrintStats(tv, SC_ATOMIC_GET(slot->slot_data));
2,001✔
295
        }
2,001✔
296

297
        if (slot->SlotThreadDeinit != NULL) {
11,866!
298
            TmEcode r = slot->SlotThreadDeinit(tv, SC_ATOMIC_GET(slot->slot_data));
11,866✔
299
            if (r != TM_ECODE_OK) {
11,866!
300
                TmThreadsSetFlag(tv, THV_CLOSED);
×
301
                rc = false;
×
302
                break;
×
303
            }
×
304
        }
11,866✔
305
    }
11,866✔
306

307
    tv->stream_pq = NULL;
9,778✔
308
    TmThreadsSetFlag(tv, THV_CLOSED);
9,778✔
309
    return rc;
9,778✔
310
}
9,778✔
311

312
static void *TmThreadsSlotPktAcqLoop(void *td)
313
{
2,001✔
314
    ThreadVars *tv = (ThreadVars *)td;
2,001✔
315
    TmSlot *s = tv->tm_slots;
2,001✔
316
    TmEcode r = TM_ECODE_OK;
2,001✔
317

318
    /* check if we are setup properly */
319
    if (s == NULL || s->PktAcqLoop == NULL || tv->tmqh_in == NULL || tv->tmqh_out == NULL) {
2,001!
320
        SCLogError("TmSlot or ThreadVars badly setup: s=%p,"
×
321
                   " PktAcqLoop=%p, tmqh_in=%p,"
×
322
                   " tmqh_out=%p",
×
323
                s, s ? s->PktAcqLoop : NULL, tv->tmqh_in, tv->tmqh_out);
×
324
        TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE);
×
325
        pthread_exit(NULL);
×
326
        return NULL;
×
327
    }
×
328

329
    if (!TmThreadsSlotPktAcqLoopInit(td)) {
2,001!
330
        goto error;
×
331
    }
×
332

333
    bool run = TmThreadsWaitForUnpause(tv);
2,001✔
334

335
    while (run) {
4,002✔
336
        r = s->PktAcqLoop(tv, SC_ATOMIC_GET(s->slot_data), s);
2,001✔
337

338
        if (r == TM_ECODE_FAILED) {
2,001!
339
            TmThreadsSetFlag(tv, THV_FAILED);
×
340
            run = false;
×
341
        }
×
342
        if (TmThreadsCheckFlag(tv, THV_REQ_FLOW_LOOP) || suricata_ctl_flags) {
2,001!
343
            run = false;
2,001✔
344
        }
2,001✔
345
        if (r == TM_ECODE_DONE) {
2,001✔
346
            run = false;
1,946✔
347
        }
1,946✔
348
    }
2,001✔
349
    if (!SCTmThreadsSlotPacketLoopFinish(tv)) {
2,001!
350
        goto error;
×
351
    }
×
352

353
    SCLogDebug("%s ending", tv->name);
2,001!
354
    pthread_exit((void *) 0);
2,001✔
355
    return NULL;
×
356

357
error:
×
358
    pthread_exit(NULL);
×
359
    return NULL;
×
360
}
2,001✔
361

362
/**
363
 * Also returns if the kill flag is set.
364
 */
365
bool TmThreadsWaitForUnpause(ThreadVars *tv)
366
{
16,744✔
367
    if (TmThreadsCheckFlag(tv, THV_PAUSE)) {
16,744✔
368
        TmThreadsSetFlag(tv, THV_PAUSED);
14,801✔
369

370
        while (TmThreadsCheckFlag(tv, THV_PAUSE)) {
1,235,831✔
371
            SleepUsec(100);
1,221,030✔
372

373
            if (TmThreadsCheckFlag(tv, THV_KILL))
1,221,030!
UNCOV
374
                return false;
×
375
        }
1,221,030✔
376

377
        TmThreadsUnsetFlag(tv, THV_PAUSED);
14,801✔
378
    }
14,801✔
379

380
    return true;
16,744✔
381
}
16,744✔
382

383
static void *TmThreadsLib(void *td)
384
{
×
385
    ThreadVars *tv = (ThreadVars *)td;
×
386
    TmSlot *s = tv->tm_slots;
×
387

388
    /* check if we are setup properly */
389
    if (s == NULL || tv->tmqh_in == NULL || tv->tmqh_out == NULL) {
×
390
        SCLogError("TmSlot or ThreadVars badly setup: s=%p, tmqh_in=%p,"
×
391
                   " tmqh_out=%p",
×
392
                s, tv->tmqh_in, tv->tmqh_out);
×
393
        TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE);
×
394
        return NULL;
×
395
    }
×
396

397
    if (!TmThreadsSlotPktAcqLoopInit(tv)) {
×
398
        goto error;
×
399
    }
×
400

401
    if (!TmThreadsWaitForUnpause(tv)) {
×
402
        goto error;
×
403
    }
×
404

405
    return NULL;
×
406

407
error:
×
408
    tv->stream_pq = NULL;
×
409
    return (void *)-1;
×
410
}
×
411

412
static void *TmThreadsSlotVar(void *td)
413
{
7,777✔
414
    ThreadVars *tv = (ThreadVars *)td;
7,777✔
415
    TmSlot *s = (TmSlot *)tv->tm_slots;
7,777✔
416
    Packet *p = NULL;
7,777✔
417
    TmEcode r = TM_ECODE_OK;
7,777✔
418

419
    CaptureStatsSetup(tv);
7,777✔
420
    PacketPoolInit();//Empty();
7,777✔
421

422
    SCSetThreadName(tv->name);
7,777!
423

424
    if (tv->thread_setup_flags != 0)
7,777!
425
        TmThreadSetupOptions(tv);
×
426

427
    /* Drop the capabilities for this thread */
428
    SCDropCaps(tv);
7,777✔
429

430
    /* check if we are setup properly */
431
    if (s == NULL || tv->tmqh_in == NULL || tv->tmqh_out == NULL) {
7,777!
432
        TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE);
×
433
        pthread_exit(NULL);
×
434
        return NULL;
×
435
    }
×
436

437
    for (; s != NULL; s = s->slot_next) {
15,559✔
438
        if (s->SlotThreadInit != NULL) {
7,782!
439
            void *slot_data = NULL;
7,777✔
440
            r = s->SlotThreadInit(tv, s->slot_initdata, &slot_data);
7,777✔
441
            if (r != TM_ECODE_OK) {
7,777!
442
                TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE);
×
443
                goto error;
×
444
            }
×
445
            (void)SC_ATOMIC_SET(s->slot_data, slot_data);
7,777✔
446
        }
7,777✔
447

448
        /* special case: we need to access the stream queue
449
         * from the flow timeout code */
450

451
        /* if the flowworker module is the first, get the threads input queue */
452
        if (s == (TmSlot *)tv->tm_slots && (s->tm_id == TMM_FLOWWORKER)) {
7,782!
453
            tv->stream_pq = tv->inq->pq;
7,776✔
454
            tv->tm_flowworker = s;
7,776✔
455
            SCLogDebug("pre-stream packetqueue %p (inq)", tv->stream_pq);
7,776!
456
            tv->flow_queue = FlowQueueNew();
7,776✔
457
            if (tv->flow_queue == NULL) {
7,776!
458
                TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE);
×
459
                pthread_exit(NULL);
×
460
                return NULL;
×
461
            }
×
462
        /* setup a queue */
463
        } else if (s->tm_id == TMM_FLOWWORKER) {
7,776!
464
            tv->stream_pq_local = SCCalloc(1, sizeof(PacketQueue));
×
465
            if (tv->stream_pq_local == NULL)
×
466
                FatalError("failed to alloc PacketQueue");
×
467
            SCMutexInit(&tv->stream_pq_local->mutex_q, NULL);
×
468
            tv->stream_pq = tv->stream_pq_local;
×
469
            tv->tm_flowworker = s;
×
470
            SCLogDebug("pre-stream packetqueue %p (local)", tv->stream_pq);
×
471
            tv->flow_queue = FlowQueueNew();
×
472
            if (tv->flow_queue == NULL) {
×
473
                TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE);
×
474
                pthread_exit(NULL);
×
475
                return NULL;
×
476
            }
×
477
        }
×
478
    }
7,782✔
479

480
    StatsSetupPrivate(&tv->stats, tv->printable_name ? tv->printable_name : tv->name);
7,777!
481

482
    // Each 'worker' thread uses this func to process/decode the packet read.
483
    // Each decode method is different to receive methods in that they do not
484
    // enter infinite loops. They use this as the core loop. As a result, at this
485
    // point the worker threads can be considered both initialized and running.
486
    TmThreadsSetFlag(tv, THV_INIT_DONE | THV_RUNNING);
7,777✔
487
    bool run = TmThreadsWaitForUnpause(tv);
7,777✔
488

489
    s = (TmSlot *)tv->tm_slots;
7,777✔
490

491
    while (run) {
1,564,346✔
492
        /* input a packet */
493
        p = tv->tmqh_in(tv);
1,556,569✔
494

495
        /* if we didn't get a packet see if we need to do some housekeeping */
496
        if (unlikely(p == NULL)) {
1,556,569✔
497
            if (tv->flow_queue && SC_ATOMIC_GET(tv->flow_queue->non_empty)) {
7,946!
498
                p = PacketGetFromQueueOrAlloc();
4✔
499
                if (p != NULL) {
4!
500
                    p->flags |= PKT_PSEUDO_STREAM_END;
4✔
501
                    PKT_SET_SRC(p, PKT_SRC_CAPTURE_TIMEOUT);
4✔
502
                }
4✔
503
            }
4✔
504
        }
7,946✔
505

506
        if (p != NULL) {
1,556,569✔
507
            /* run the thread module(s) */
508
            r = TmThreadsSlotVarRun(tv, p, s);
1,548,713✔
509
            if (r == TM_ECODE_FAILED) {
1,548,713!
510
                TmqhOutputPacketpool(tv, p);
×
511
                TmThreadsSetFlag(tv, THV_FAILED);
×
512
                break;
×
513
            }
×
514

515
            /* output the packet */
516
            tv->tmqh_out(tv, p);
1,548,713✔
517

518
            /* now handle the stream pq packets */
519
            TmThreadsHandleInjectedPackets(tv);
1,548,713✔
520
        }
1,548,713✔
521

522
        if (TmThreadsCheckFlag(tv, (THV_KILL | THV_REQ_FLOW_LOOP))) {
1,556,569✔
523
            run = false;
7,777✔
524
        }
7,777✔
525
    }
1,556,569✔
526
    if (!SCTmThreadsSlotPacketLoopFinish(tv)) {
7,777!
527
        goto error;
×
528
    }
×
529
    StatsSyncCounters(&tv->stats);
7,777✔
530

531
    pthread_exit(NULL);
7,777✔
532
    return NULL;
×
533

534
error:
×
535
    tv->stream_pq = NULL;
×
536
    pthread_exit(NULL);
×
537
    return NULL;
×
538
}
7,777✔
539

540
static void *TmThreadsManagement(void *td)
541
{
3,981✔
542
    ThreadVars *tv = (ThreadVars *)td;
3,981✔
543
    TmSlot *s = (TmSlot *)tv->tm_slots;
3,981✔
544
    TmEcode r = TM_ECODE_OK;
3,981✔
545

546
    BUG_ON(s == NULL);
3,981!
547

548
    SCSetThreadName(tv->name);
3,981!
549

550
    if (tv->thread_setup_flags != 0)
3,981!
UNCOV
551
        TmThreadSetupOptions(tv);
×
552

553
    /* Drop the capabilities for this thread */
554
    SCDropCaps(tv);
3,981✔
555

556
    SCLogDebug("%s starting", tv->name);
3,981!
557

558
    if (s->SlotThreadInit != NULL) {
3,981!
559
        void *slot_data = NULL;
3,981✔
560
        r = s->SlotThreadInit(tv, s->slot_initdata, &slot_data);
3,981✔
561
        if (r != TM_ECODE_OK) {
3,981!
562
            TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE);
×
563
            pthread_exit(NULL);
×
564
            return NULL;
×
565
        }
×
566
        (void)SC_ATOMIC_SET(s->slot_data, slot_data);
3,981✔
567
    }
3,981✔
568

569
    StatsSetupPrivate(&tv->stats, tv->printable_name ? tv->printable_name : tv->name);
3,981!
570

571
    TmThreadsSetFlag(tv, THV_INIT_DONE);
3,981✔
572

573
    r = s->Management(tv, SC_ATOMIC_GET(s->slot_data));
3,981✔
574
    /* handle error */
575
    if (r == TM_ECODE_FAILED) {
3,981!
576
        TmThreadsSetFlag(tv, THV_FAILED);
×
577
    }
×
578

579
    if (TmThreadsCheckFlag(tv, THV_KILL)) {
3,981✔
580
        StatsSyncCounters(&tv->stats);
3,946✔
581
    }
3,946✔
582

583
    TmThreadsSetFlag(tv, THV_RUNNING_DONE);
3,981✔
584
    TmThreadWaitForFlag(tv, THV_DEINIT);
3,981✔
585

586
    if (s->SlotThreadExitPrintStats != NULL) {
3,981!
587
        s->SlotThreadExitPrintStats(tv, SC_ATOMIC_GET(s->slot_data));
×
588
    }
×
589

590
    if (s->SlotThreadDeinit != NULL) {
3,981✔
591
        r = s->SlotThreadDeinit(tv, SC_ATOMIC_GET(s->slot_data));
3,973✔
592
        if (r != TM_ECODE_OK) {
3,973!
593
            TmThreadsSetFlag(tv, THV_CLOSED);
×
594
            pthread_exit(NULL);
×
595
            return NULL;
×
596
        }
×
597
    }
3,973✔
598

599
    TmThreadsSetFlag(tv, THV_CLOSED);
3,981✔
600
    pthread_exit((void *) 0);
3,981✔
601
    return NULL;
×
602
}
3,981✔
603

604
/**
605
 * \brief We set the slot functions.
606
 *
607
 * \param tv   Pointer to the TV to set the slot function for.
608
 * \param name Name of the slot variant.
609
 * \param fn_p Pointer to a custom slot function.  Used only if slot variant
610
 *             "name" is "custom".
611
 *
612
 * \retval TmEcode TM_ECODE_OK on success; TM_ECODE_FAILED on failure.
613
 */
614
static TmEcode TmThreadSetSlots(ThreadVars *tv, const char *name, void *(*fn_p)(void *))
615
{
16,781✔
616
    if (name == NULL) {
16,781!
617
        if (fn_p == NULL) {
×
618
            printf("Both slot name and function pointer can't be NULL inside "
×
619
                   "TmThreadSetSlots\n");
×
620
            goto error;
×
621
        } else {
×
622
            name = "custom";
×
623
        }
×
624
    }
×
625

626
    if (strcmp(name, "varslot") == 0) {
16,781✔
627
        tv->tm_func = TmThreadsSlotVar;
7,777✔
628
    } else if (strcmp(name, "pktacqloop") == 0) {
9,004✔
629
        tv->tm_func = TmThreadsSlotPktAcqLoop;
2,001✔
630
    } else if (strcmp(name, "management") == 0) {
7,003✔
631
        tv->tm_func = TmThreadsManagement;
3,962✔
632
    } else if (strcmp(name, "command") == 0) {
3,968✔
633
        tv->tm_func = TmThreadsManagement;
19✔
634
    } else if (strcmp(name, "lib") == 0) {
3,022!
635
        tv->tm_func = TmThreadsLib;
×
636
    } else if (strcmp(name, "custom") == 0) {
3,022!
637
        if (fn_p == NULL)
3,022!
638
            goto error;
×
639
        tv->tm_func = fn_p;
3,022✔
640
    } else {
3,022✔
641
        printf("Error: Slot \"%s\" not supported\n", name);
×
642
        goto error;
×
643
    }
×
644

645
    return TM_ECODE_OK;
16,781✔
646

647
error:
×
648
    return TM_ECODE_FAILED;
×
649
}
16,781✔
650

651
/**
652
 * \brief Appends a new entry to the slots.
653
 *
654
 * \param tv   TV the slot is attached to.
655
 * \param tm   TM to append.
656
 * \param data Data to be passed on to the slot init function.
657
 *
658
 * \retval The allocated TmSlot or NULL if there is an error
659
 */
660
void TmSlotSetFuncAppend(ThreadVars *tv, TmModule *tm, const void *data)
661
{
15,847✔
662
    TmSlot *slot = SCCalloc(1, sizeof(TmSlot));
15,847✔
663
    if (unlikely(slot == NULL))
15,847!
664
        return;
×
665
    SC_ATOMIC_INITPTR(slot->slot_data);
15,847✔
666
    slot->SlotThreadInit = tm->ThreadInit;
15,847✔
667
    slot->slot_initdata = data;
15,847✔
668
    if (tm->Func) {
15,847✔
669
        slot->SlotFunc = tm->Func;
9,865✔
670
    } else if (tm->PktAcqLoop) {
9,865✔
671
        slot->PktAcqLoop = tm->PktAcqLoop;
2,001✔
672
        if (tm->PktAcqBreakLoop) {
2,001!
UNCOV
673
            tv->break_loop = true;
×
UNCOV
674
        }
×
675
    } else if (tm->Management) {
3,989!
676
        slot->Management = tm->Management;
3,981✔
677
    }
3,981✔
678
    slot->SlotThreadExitPrintStats = tm->ThreadExitPrintStats;
15,847✔
679
    slot->SlotThreadDeinit = tm->ThreadDeinit;
15,847✔
680
    /* we don't have to check for the return value "-1".  We wouldn't have
681
     * received a TM as arg, if it didn't exist */
682
    slot->tm_id = TmModuleGetIDForTM(tm);
15,847✔
683
    slot->tm_flags |= tm->flags;
15,847✔
684

685
    tv->tmm_flags |= tm->flags;
15,847✔
686
    tv->cap_flags |= tm->cap_flags;
15,847✔
687

688
    if (tv->tm_slots == NULL) {
15,847✔
689
        tv->tm_slots = slot;
13,759✔
690
    } else {
13,780✔
691
        TmSlot *a = (TmSlot *)tv->tm_slots, *b = NULL;
2,088✔
692

693
        /* get the last slot */
694
        for ( ; a != NULL; a = a->slot_next) {
4,281✔
695
             b = a;
2,193✔
696
        }
2,193✔
697
        /* append the new slot */
698
        if (b != NULL) {
2,088!
699
            b->slot_next = slot;
2,088✔
700
        }
2,088✔
701
    }
2,088✔
702
}
15,847✔
703

704
#if !defined __CYGWIN__ && !defined OS_WIN32 && !defined __OpenBSD__ && !defined sun
705
static int SetCPUAffinitySet(cpu_set_t *cs)
UNCOV
706
{
×
707
#if defined OS_FREEBSD
708
    int r = cpuset_setaffinity(CPU_LEVEL_WHICH, CPU_WHICH_TID,
709
                               SCGetThreadIdLong(), sizeof(cpu_set_t),cs);
710
#elif OS_DARWIN
711
    int r = thread_policy_set(mach_thread_self(), THREAD_AFFINITY_POLICY,
712
                              (void*)cs, THREAD_AFFINITY_POLICY_COUNT);
713
#else
UNCOV
714
    pid_t tid = (pid_t)syscall(SYS_gettid);
×
UNCOV
715
    int r = sched_setaffinity(tid, sizeof(cpu_set_t), cs);
×
UNCOV
716
#endif /* OS_FREEBSD */
×
717

UNCOV
718
    if (r != 0) {
×
719
        printf("Warning: sched_setaffinity failed (%" PRId32 "): %s\n", r,
×
720
               strerror(errno));
×
721
        return -1;
×
722
    }
×
723

UNCOV
724
    return 0;
×
UNCOV
725
}
×
726
#endif
727

728

729
/**
730
 * \brief Set the thread affinity on the calling thread.
731
 *
732
 * \param cpuid Id of the core/cpu to setup the affinity.
733
 *
734
 * \retval 0 If all goes well; -1 if something is wrong.
735
 */
736
static int SetCPUAffinity(uint16_t cpuid)
UNCOV
737
{
×
738
#if defined __OpenBSD__ || defined sun
739
    return 0;
740
#else
UNCOV
741
    int cpu = (int)cpuid;
×
742

743
#if defined OS_WIN32 || defined __CYGWIN__
744
    DWORD cs = 1 << cpu;
745

746
    int r = (0 == SetThreadAffinityMask(GetCurrentThread(), cs));
747
    if (r != 0) {
748
        printf("Warning: sched_setaffinity failed (%" PRId32 "): %s\n", r,
749
               strerror(errno));
750
        return -1;
751
    }
752
    SCLogDebug("CPU Affinity for thread %lu set to CPU %" PRId32,
753
               SCGetThreadIdLong(), cpu);
754

755
    return 0;
756

757
#else
UNCOV
758
    cpu_set_t cs;
×
UNCOV
759
    memset(&cs, 0, sizeof(cs));
×
760

UNCOV
761
    CPU_ZERO(&cs);
×
UNCOV
762
    CPU_SET(cpu, &cs);
×
UNCOV
763
    return SetCPUAffinitySet(&cs);
×
UNCOV
764
#endif /* windows */
×
UNCOV
765
#endif /* not supported */
×
UNCOV
766
}
×
767

768

769
/**
770
 * \brief Set the thread options (thread priority).
771
 *
772
 * \param tv Pointer to the ThreadVars to setup the thread priority.
773
 *
774
 * \retval TM_ECODE_OK.
775
 */
776
TmEcode TmThreadSetThreadPriority(ThreadVars *tv, int prio)
777
{
×
778
    tv->thread_setup_flags |= THREAD_SET_PRIORITY;
×
779
    tv->thread_priority = prio;
×
780

781
    return TM_ECODE_OK;
×
782
}
×
783

784
/**
785
 * \brief Adjusting nice value for threads.
786
 */
787
void TmThreadSetPrio(ThreadVars *tv)
UNCOV
788
{
×
UNCOV
789
    SCEnter();
×
UNCOV
790
#ifndef __CYGWIN__
×
791
#ifdef OS_WIN32
792
        if (0 == SetThreadPriority(GetCurrentThread(), tv->thread_priority)) {
793
            SCLogError("Error setting priority for "
794
                       "thread %s: %s",
795
                    tv->name, strerror(errno));
796
    } else {
797
        SCLogDebug("Priority set to %"PRId32" for thread %s",
798
                   tv->thread_priority, tv->name);
799
    }
800
#else
UNCOV
801
    int ret = nice(tv->thread_priority);
×
UNCOV
802
    if (ret == -1) {
×
803
        SCLogError("Error setting nice value %d "
×
804
                   "for thread %s: %s",
×
805
                tv->thread_priority, tv->name, strerror(errno));
×
UNCOV
806
    } else {
×
UNCOV
807
        SCLogDebug("Nice value set to %"PRId32" for thread %s",
×
UNCOV
808
                   tv->thread_priority, tv->name);
×
UNCOV
809
    }
×
UNCOV
810
#endif /* OS_WIN32 */
×
UNCOV
811
#endif
×
UNCOV
812
    SCReturn;
×
UNCOV
813
}
×
814

815

816
/**
817
 * \brief Set the thread options (cpu affinity).
818
 *
819
 * \param tv pointer to the ThreadVars to setup the affinity.
820
 * \param cpu cpu on which affinity is set.
821
 *
822
 * \retval TM_ECODE_OK
823
 */
824
TmEcode TmThreadSetCPUAffinity(ThreadVars *tv, uint16_t cpu)
825
{
×
826
    tv->thread_setup_flags |= THREAD_SET_AFFINITY;
×
827
    tv->cpu_affinity = cpu;
×
828

829
    return TM_ECODE_OK;
×
830
}
×
831

832

833
TmEcode TmThreadSetCPU(ThreadVars *tv, uint8_t type)
834
{
16,781✔
835
    if (!threading_set_cpu_affinity)
16,781!
836
        return TM_ECODE_OK;
16,781✔
837

UNCOV
838
    if (type > MAX_CPU_SET) {
×
839
        SCLogError("invalid cpu type family");
×
840
        return TM_ECODE_FAILED;
×
841
    }
×
842

UNCOV
843
    tv->thread_setup_flags |= THREAD_SET_AFFTYPE;
×
UNCOV
844
    tv->cpu_affinity = type;
×
845

UNCOV
846
    return TM_ECODE_OK;
×
UNCOV
847
}
×
848

849
int TmThreadGetNbThreads(uint8_t type)
850
{
1,946✔
851
    if (type >= MAX_CPU_SET) {
1,946!
852
        SCLogError("invalid cpu type family");
×
853
        return 0;
×
854
    }
×
855

856
    return thread_affinity[type].nb_threads;
1,946✔
857
}
1,946✔
858

859
/**
860
 * \brief Set the thread options (cpu affinitythread).
861
 *        Priority should be already set by pthread_create.
862
 *
863
 * \param tv pointer to the ThreadVars of the calling thread.
864
 */
865
TmEcode TmThreadSetupOptions(ThreadVars *tv)
UNCOV
866
{
×
UNCOV
867
    if (tv->thread_setup_flags & THREAD_SET_AFFINITY) {
×
868
        SCLogPerf("Setting affinity for thread \"%s\"to cpu/core "
×
869
                  "%"PRIu16", thread id %lu", tv->name, tv->cpu_affinity,
×
870
                  SCGetThreadIdLong());
×
871
        SetCPUAffinity(tv->cpu_affinity);
×
872
    }
×
873

UNCOV
874
#if !defined __CYGWIN__ && !defined OS_WIN32 && !defined __OpenBSD__ && !defined sun
×
UNCOV
875
    if (tv->thread_setup_flags & THREAD_SET_PRIORITY)
×
876
        TmThreadSetPrio(tv);
×
UNCOV
877
    if (tv->thread_setup_flags & THREAD_SET_AFFTYPE) {
×
UNCOV
878
        ThreadsAffinityType *taf = &thread_affinity[tv->cpu_affinity];
×
UNCOV
879
        bool use_iface_affinity = RunmodeIsAutofp() && tv->cpu_affinity == RECEIVE_CPU_SET &&
×
UNCOV
880
                                  FindAffinityByInterface(taf, tv->iface_name) != NULL;
×
UNCOV
881
        use_iface_affinity |= RunmodeIsWorkers() && tv->cpu_affinity == WORKER_CPU_SET &&
×
UNCOV
882
                              FindAffinityByInterface(taf, tv->iface_name) != NULL;
×
883

UNCOV
884
        if (use_iface_affinity) {
×
885
            taf = FindAffinityByInterface(taf, tv->iface_name);
×
886
        }
×
887

UNCOV
888
        if (UtilAffinityGetAffinedCPUNum(taf) == 0) {
×
889
            if (!taf->nocpu_warned) {
×
890
                SCLogWarning("No CPU affinity set for %s", AffinityGetYamlPath(taf));
×
891
                taf->nocpu_warned = true;
×
892
            }
×
893
        }
×
894

UNCOV
895
        if (taf->mode_flag == EXCLUSIVE_AFFINITY) {
×
UNCOV
896
            uint16_t cpu = AffinityGetNextCPU(tv, taf);
×
UNCOV
897
            SetCPUAffinity(cpu);
×
898
            /* If CPU is in a set overwrite the default thread prio */
UNCOV
899
            if (CPU_ISSET(cpu, &taf->lowprio_cpu)) {
×
900
                tv->thread_priority = PRIO_LOW;
×
UNCOV
901
            } else if (CPU_ISSET(cpu, &taf->medprio_cpu)) {
×
902
                tv->thread_priority = PRIO_MEDIUM;
×
UNCOV
903
            } else if (CPU_ISSET(cpu, &taf->hiprio_cpu)) {
×
904
                tv->thread_priority = PRIO_HIGH;
×
UNCOV
905
            } else {
×
UNCOV
906
                tv->thread_priority = taf->prio;
×
UNCOV
907
            }
×
UNCOV
908
            SCLogPerf("Setting prio %d for thread \"%s\" to cpu/core "
×
UNCOV
909
                      "%d, thread id %lu", tv->thread_priority,
×
UNCOV
910
                      tv->name, cpu, SCGetThreadIdLong());
×
UNCOV
911
        } else {
×
UNCOV
912
            SetCPUAffinitySet(&taf->cpu_set);
×
UNCOV
913
            tv->thread_priority = taf->prio;
×
UNCOV
914
            SCLogPerf("Setting prio %d for thread \"%s\", "
×
UNCOV
915
                      "thread id %lu", tv->thread_priority,
×
UNCOV
916
                      tv->name, SCGetThreadIdLong());
×
UNCOV
917
        }
×
UNCOV
918
        TmThreadSetPrio(tv);
×
UNCOV
919
    }
×
UNCOV
920
#endif
×
921

UNCOV
922
    return TM_ECODE_OK;
×
UNCOV
923
}
×
924

925
/**
926
 * \brief Creates and returns the TV instance for a new thread.
927
 *
928
 * \param name       Name of this TV instance
929
 * \param inq_name   Incoming queue name
930
 * \param inqh_name  Incoming queue handler name as set by TmqhSetup()
931
 * \param outq_name  Outgoing queue name
932
 * \param outqh_name Outgoing queue handler as set by TmqhSetup()
933
 * \param slots      String representation for the slot function to be used
934
 * \param fn_p       Pointer to function when \"slots\" is of type \"custom\"
935
 * \param mucond     Flag to indicate whether to initialize the condition
936
 *                   and the mutex variables for this newly created TV.
937
 *
938
 * \retval the newly created TV instance, or NULL on error
939
 */
940
ThreadVars *TmThreadCreate(const char *name, const char *inq_name, const char *inqh_name,
941
                           const char *outq_name, const char *outqh_name, const char *slots,
942
                           void * (*fn_p)(void *), int mucond)
943
{
16,781✔
944
    ThreadVars *tv = NULL;
16,781✔
945
    Tmq *tmq = NULL;
16,781✔
946
    Tmqh *tmqh = NULL;
16,781✔
947

948
    SCLogDebug("creating thread \"%s\"...", name);
16,781!
949

950
    /* XXX create separate function for this: allocate a thread container */
951
    tv = SCCalloc(1, sizeof(ThreadVars) + ThreadStorageSize());
16,781✔
952
    if (unlikely(tv == NULL))
16,781!
953
        goto error;
×
954

955
    SC_ATOMIC_INIT(tv->flags);
16,781✔
956
    StatsThreadInit(&tv->stats);
16,781✔
957

958
    strlcpy(tv->name, name, sizeof(tv->name));
16,781✔
959

960
    /* default state for every newly created thread */
961
    TmThreadsSetFlag(tv, THV_PAUSE);
16,781✔
962

963
    /* set the incoming queue */
964
    if (inq_name != NULL && strcmp(inq_name, "packetpool") != 0) {
16,781✔
965
        SCLogDebug("inq_name \"%s\"", inq_name);
7,777!
966

967
        tmq = TmqGetQueueByName(inq_name);
7,777✔
968
        if (tmq == NULL) {
7,777!
969
            tmq = TmqCreateQueue(inq_name);
×
970
            if (tmq == NULL)
×
971
                goto error;
×
972
        }
×
973
        SCLogDebug("tmq %p", tmq);
7,777!
974

975
        tv->inq = tmq;
7,777✔
976
        tv->inq->reader_cnt++;
7,777✔
977
        SCLogDebug("tv->inq %p", tv->inq);
7,777!
978
    }
7,777✔
979
    if (inqh_name != NULL) {
16,781✔
980
        SCLogDebug("inqh_name \"%s\"", inqh_name);
9,778!
981

982
        int id = TmqhNameToID(inqh_name);
9,778✔
983
        if (id <= 0) {
9,778!
984
            goto error;
×
985
        }
×
986
        tmqh = TmqhGetQueueHandlerByName(inqh_name);
9,778✔
987
        if (tmqh == NULL)
9,778!
988
            goto error;
×
989

990
        tv->tmqh_in = tmqh->InHandler;
9,778✔
991
        tv->inq_id = (uint8_t)id;
9,778✔
992
        SCLogDebug("tv->tmqh_in %p", tv->tmqh_in);
9,778!
993
    }
9,778✔
994

995
    /* set the outgoing queue */
996
    if (outqh_name != NULL) {
16,781✔
997
        SCLogDebug("outqh_name \"%s\"", outqh_name);
9,778!
998

999
        int id = TmqhNameToID(outqh_name);
9,778✔
1000
        if (id <= 0) {
9,778!
1001
            goto error;
×
1002
        }
×
1003

1004
        tmqh = TmqhGetQueueHandlerByName(outqh_name);
9,778✔
1005
        if (tmqh == NULL)
9,778!
1006
            goto error;
×
1007

1008
        tv->tmqh_out = tmqh->OutHandler;
9,778✔
1009
        tv->outq_id = (uint8_t)id;
9,778✔
1010

1011
        if (outq_name != NULL && strcmp(outq_name, "packetpool") != 0) {
9,778!
1012
            SCLogDebug("outq_name \"%s\"", outq_name);
1,945!
1013

1014
            if (tmqh->OutHandlerCtxSetup != NULL) {
1,945!
1015
                tv->outctx = tmqh->OutHandlerCtxSetup(outq_name);
1,941✔
1016
                if (tv->outctx == NULL)
1,941!
1017
                    goto error;
×
1018
                tv->outq = NULL;
1,941✔
1019
            } else {
1,941✔
1020
                tmq = TmqGetQueueByName(outq_name);
4✔
1021
                if (tmq == NULL) {
4!
1022
                    tmq = TmqCreateQueue(outq_name);
1✔
1023
                    if (tmq == NULL)
1!
1024
                        goto error;
×
1025
                }
1✔
1026
                SCLogDebug("tmq %p", tmq);
4!
1027

1028
                tv->outq = tmq;
4✔
1029
                tv->outctx = NULL;
4✔
1030
                tv->outq->writer_cnt++;
4✔
1031
            }
4✔
1032
        }
1,945✔
1033
    }
9,778✔
1034

1035
    if (TmThreadSetSlots(tv, slots, fn_p) != TM_ECODE_OK) {
16,781!
1036
        goto error;
×
1037
    }
×
1038

1039
    if (mucond != 0)
16,781✔
1040
        TmThreadInitMC(tv);
3,034✔
1041

1042
    SCThreadRunInitCallbacks(tv);
16,781✔
1043

1044
    return tv;
16,781✔
1045

1046
error:
×
1047
    SCLogError("failed to setup a thread");
×
1048

1049
    if (tv != NULL)
×
1050
        SCFree(tv);
×
1051
    return NULL;
×
1052
}
16,781✔
1053

1054
/**
1055
 * \brief Creates and returns a TV instance for a Packet Processing Thread.
1056
 *        This function doesn't support custom slots, and hence shouldn't be
1057
 *        supplied \"custom\" as its slot type.  All PPT threads are created
1058
 *        with a mucond(see TmThreadCreate declaration) of 0. Hence the tv
1059
 *        conditional variables are not used to kill the thread.
1060
 *
1061
 * \param name       Name of this TV instance
1062
 * \param inq_name   Incoming queue name
1063
 * \param inqh_name  Incoming queue handler name as set by TmqhSetup()
1064
 * \param outq_name  Outgoing queue name
1065
 * \param outqh_name Outgoing queue handler as set by TmqhSetup()
1066
 * \param slots      String representation for the slot function to be used
1067
 *
1068
 * \retval the newly created TV instance, or NULL on error
1069
 */
1070
ThreadVars *TmThreadCreatePacketHandler(const char *name, const char *inq_name,
1071
                                        const char *inqh_name, const char *outq_name,
1072
                                        const char *outqh_name, const char *slots)
1073
{
9,778✔
1074
    ThreadVars *tv = NULL;
9,778✔
1075

1076
    tv = TmThreadCreate(name, inq_name, inqh_name, outq_name, outqh_name,
9,778✔
1077
                        slots, NULL, 0);
9,778✔
1078

1079
    if (tv != NULL) {
9,778!
1080
        tv->type = TVT_PPT;
9,778✔
1081
        tv->id = TmThreadsRegisterThread(tv, tv->type);
9,778✔
1082
    }
9,778✔
1083

1084
    return tv;
9,778✔
1085
}
9,778✔
1086

1087
/**
1088
 * \brief Creates and returns the TV instance for a Management thread(MGMT).
1089
 *        This function supports only custom slot functions and hence a
1090
 *        function pointer should be sent as an argument.
1091
 *
1092
 * \param name       Name of this TV instance
1093
 * \param fn_p       Pointer to function when \"slots\" is of type \"custom\"
1094
 * \param mucond     Flag to indicate whether to initialize the condition
1095
 *                   and the mutex variables for this newly created TV.
1096
 *
1097
 * \retval the newly created TV instance, or NULL on error
1098
 */
1099
ThreadVars *TmThreadCreateMgmtThread(const char *name, void *(fn_p)(void *),
1100
                                     int mucond)
1101
{
3,022✔
1102
    ThreadVars *tv = NULL;
3,022✔
1103

1104
    tv = TmThreadCreate(name, NULL, NULL, NULL, NULL, "custom", fn_p, mucond);
3,022✔
1105

1106
    if (tv != NULL) {
3,022!
1107
        tv->type = TVT_MGMT;
3,022✔
1108
        tv->id = TmThreadsRegisterThread(tv, tv->type);
3,022✔
1109
        TmThreadSetCPU(tv, MANAGEMENT_CPU_SET);
3,022✔
1110
    }
3,022✔
1111

1112
    return tv;
3,022✔
1113
}
3,022✔
1114

1115
/**
1116
 * \brief Creates and returns the TV instance for a Management thread(MGMT).
1117
 *        This function supports only custom slot functions and hence a
1118
 *        function pointer should be sent as an argument.
1119
 *
1120
 * \param name       Name of this TV instance
1121
 * \param module     Name of TmModule with MANAGEMENT flag set.
1122
 * \param mucond     Flag to indicate whether to initialize the condition
1123
 *                   and the mutex variables for this newly created TV.
1124
 *
1125
 * \retval the newly created TV instance, or NULL on error
1126
 */
1127
ThreadVars *TmThreadCreateMgmtThreadByName(const char *name, const char *module,
1128
                                     int mucond)
1129
{
3,962✔
1130
    ThreadVars *tv = NULL;
3,962✔
1131

1132
    tv = TmThreadCreate(name, NULL, NULL, NULL, NULL, "management", NULL, mucond);
3,962✔
1133

1134
    if (tv != NULL) {
3,962!
1135
        tv->type = TVT_MGMT;
3,962✔
1136
        tv->id = TmThreadsRegisterThread(tv, tv->type);
3,962✔
1137
        TmThreadSetCPU(tv, MANAGEMENT_CPU_SET);
3,962✔
1138

1139
        TmModule *m = TmModuleGetByName(module);
3,962✔
1140
        if (m) {
3,962!
1141
            TmSlotSetFuncAppend(tv, m, NULL);
3,962✔
1142
        }
3,962✔
1143
    }
3,962✔
1144

1145
    return tv;
3,962✔
1146
}
3,962✔
1147

1148
/**
1149
 * \brief Creates and returns the TV instance for a Command thread (CMD).
1150
 *        This function supports only custom slot functions and hence a
1151
 *        function pointer should be sent as an argument.
1152
 *
1153
 * \param name       Name of this TV instance
1154
 * \param module     Name of TmModule with COMMAND flag set.
1155
 * \param mucond     Flag to indicate whether to initialize the condition
1156
 *                   and the mutex variables for this newly created TV.
1157
 *
1158
 * \retval the newly created TV instance, or NULL on error
1159
 */
1160
ThreadVars *TmThreadCreateCmdThreadByName(const char *name, const char *module,
1161
                                     int mucond)
1162
{
19✔
1163
    ThreadVars *tv = NULL;
19✔
1164

1165
    tv = TmThreadCreate(name, NULL, NULL, NULL, NULL, "command", NULL, mucond);
19✔
1166

1167
    if (tv != NULL) {
19!
1168
        tv->type = TVT_CMD;
19✔
1169
        tv->id = TmThreadsRegisterThread(tv, tv->type);
19✔
1170
        TmThreadSetCPU(tv, MANAGEMENT_CPU_SET);
19✔
1171

1172
        TmModule *m = TmModuleGetByName(module);
19✔
1173
        if (m) {
19!
1174
            TmSlotSetFuncAppend(tv, m, NULL);
19✔
1175
        }
19✔
1176
    }
19✔
1177

1178
    return tv;
19✔
1179
}
19✔
1180

1181
/**
1182
 * \brief Appends this TV to tv_root based on its type
1183
 *
1184
 * \param type holds the type this TV belongs to.
1185
 */
1186
void TmThreadAppend(ThreadVars *tv, int type)
1187
{
16,781✔
1188
    SCMutexLock(&tv_root_lock);
16,781✔
1189

1190
    if (tv_root[type] == NULL) {
16,781✔
1191
        tv_root[type] = tv;
3,972✔
1192
        tv->next = NULL;
3,972✔
1193

1194
        SCMutexUnlock(&tv_root_lock);
3,972✔
1195

1196
        return;
3,972✔
1197
    }
3,972✔
1198

1199
    ThreadVars *t = tv_root[type];
12,809✔
1200

1201
    while (t) {
29,167!
1202
        if (t->next == NULL) {
29,167✔
1203
            t->next = tv;
12,809✔
1204
            tv->next = NULL;
12,809✔
1205
            break;
12,809✔
1206
        }
12,809✔
1207

1208
        t = t->next;
16,358✔
1209
    }
16,358✔
1210

1211
    SCMutexUnlock(&tv_root_lock);
12,809✔
1212
}
12,809✔
1213

1214
static bool ThreadStillHasPackets(ThreadVars *tv)
1215
{
68,386✔
1216
    if (tv->inq != NULL && !tv->inq->is_packet_pool) {
68,386!
1217
        /* we wait till we dry out all the inq packets, before we
1218
         * kill this thread.  Do note that you should have disabled
1219
         * packet acquire by now using TmThreadDisableReceiveThreads()*/
1220
        PacketQueue *q = tv->inq->pq;
44,989✔
1221
        SCMutexLock(&q->mutex_q);
44,989✔
1222
        uint32_t len = q->len;
44,989✔
1223
        SCMutexUnlock(&q->mutex_q);
44,989✔
1224
        if (len != 0) {
44,989✔
1225
            return true;
14,042✔
1226
        }
14,042✔
1227
    }
44,989✔
1228

1229
    if (tv->stream_pq != NULL) {
54,344✔
1230
        SCMutexLock(&tv->stream_pq->mutex_q);
31,484✔
1231
        uint32_t len = tv->stream_pq->len;
31,484✔
1232
        SCMutexUnlock(&tv->stream_pq->mutex_q);
31,484✔
1233

1234
        if (len != 0) {
31,484!
UNCOV
1235
            return true;
×
UNCOV
1236
        }
×
1237
    }
31,484✔
1238
    return false;
54,344✔
1239
}
54,344✔
1240

1241
/**
1242
 * \brief Kill a thread.
1243
 *
1244
 * \param tv A ThreadVars instance corresponding to the thread that has to be
1245
 *           killed.
1246
 *
1247
 * \retval r 1 killed successfully
1248
 *           0 not yet ready, needs another look
1249
 */
1250
static int TmThreadKillThread(ThreadVars *tv)
1251
{
188,985✔
1252
    BUG_ON(tv == NULL);
188,985!
1253

1254
    /* kill only once :) */
1255
    if (TmThreadsCheckFlag(tv, THV_DEAD)) {
188,985✔
1256
        return 1;
115,165✔
1257
    }
115,165✔
1258

1259
    /* set the thread flag informing the thread that it needs to be
1260
     * terminated */
1261
    TmThreadsSetFlag(tv, THV_KILL);
73,820✔
1262
    TmThreadsSetFlag(tv, THV_DEINIT);
73,820✔
1263

1264
    /* to be sure, signal more */
1265
    if (!(TmThreadsCheckFlag(tv, THV_CLOSED))) {
73,820✔
1266
        if (tv->inq_id != TMQH_NOT_SET) {
57,047✔
1267
            Tmqh *qh = TmqhGetQueueHandlerByID(tv->inq_id);
38,770✔
1268
            if (qh != NULL && qh->InShutdownHandler != NULL) {
38,770!
1269
                qh->InShutdownHandler(tv);
4✔
1270
            }
4✔
1271
        }
38,770✔
1272
        if (tv->inq != NULL) {
57,047✔
1273
            for (int i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) {
99,900✔
1274
                SCMutexLock(&tv->inq->pq->mutex_q);
66,622✔
1275
                SCCondSignal(&tv->inq->pq->cond_q);
66,622✔
1276
                SCMutexUnlock(&tv->inq->pq->mutex_q);
66,622✔
1277
            }
66,622✔
1278
            SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id);
33,278!
1279
        }
33,278✔
1280

1281
        if (tv->ctrl_cond != NULL ) {
57,047✔
1282
            SCCtrlMutexLock(tv->ctrl_mutex);
10,601✔
1283
            pthread_cond_broadcast(tv->ctrl_cond);
10,601✔
1284
            SCCtrlMutexUnlock(tv->ctrl_mutex);
10,601✔
1285
        }
10,601✔
1286
        return 0;
57,047✔
1287
    }
57,047✔
1288

1289
    if (tv->outctx != NULL) {
16,773✔
1290
        if (tv->outq_id != TMQH_NOT_SET) {
1,941!
1291
            Tmqh *qh = TmqhGetQueueHandlerByID(tv->outq_id);
1,941✔
1292
            if (qh != NULL && qh->OutHandlerCtxFree != NULL) {
1,941!
1293
                qh->OutHandlerCtxFree(tv->outctx);
1,941✔
1294
                tv->outctx = NULL;
1,941✔
1295
            }
1,941✔
1296
        }
1,941✔
1297
    }
1,941✔
1298

1299
    /* Join the thread and flag as dead, unless the thread ID is 0 as
1300
     * its not a thread created by Suricata. */
1301
    if (tv->t) {
16,773!
1302
        pthread_join(tv->t, NULL);
16,773✔
1303
        SCLogDebug("thread %s stopped", tv->name);
16,773!
1304
    }
16,773✔
1305
    TmThreadsSetFlag(tv, THV_DEAD);
16,773✔
1306
    return 1;
16,773✔
1307
}
73,820✔
1308

1309
static bool ThreadBusy(ThreadVars *tv)
1310
{
54,344✔
1311
    for (TmSlot *s = tv->tm_slots; s != NULL; s = s->slot_next) {
132,458✔
1312
        TmModule *tm = TmModuleGetById(s->tm_id);
78,930✔
1313
        if (tm && tm->ThreadBusy != NULL) {
78,930!
1314
            if (tm->ThreadBusy(tv, SC_ATOMIC_GET(s->slot_data)))
31,484✔
1315
                return true;
816✔
1316
        }
31,484✔
1317
    }
78,930✔
1318
    return false;
53,528✔
1319
}
54,344✔
1320

1321
/** \internal
1322
 *
1323
 *  \brief make sure that all packet threads are done processing their
1324
 *         in-flight packets, including 'injected' flow packets.
1325
 */
1326
static void TmThreadDrainPacketThreads(void)
1327
{
5,943✔
1328
    ThreadVars *tv = NULL;
5,943✔
1329
    struct timeval start_ts;
5,943✔
1330
    struct timeval cur_ts;
5,943✔
1331
    gettimeofday(&start_ts, NULL);
5,943✔
1332

1333
again:
20,800✔
1334
    gettimeofday(&cur_ts, NULL);
20,800✔
1335
    if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
20,800!
1336
        SCLogWarning("unable to get all packet threads "
×
1337
                     "to process their packets in time");
×
1338
        return;
×
1339
    }
×
1340

1341
    SCMutexLock(&tv_root_lock);
20,800✔
1342

1343
    /* all receive threads are part of packet processing threads */
1344
    tv = tv_root[TVT_PPT];
20,800✔
1345
    while (tv) {
71,792✔
1346
        if (ThreadStillHasPackets(tv)) {
65,849✔
1347
            /* we wait till we dry out all the inq packets, before we
1348
             * kill this thread.  Do note that you should have disabled
1349
             * packet acquire by now using TmThreadDisableReceiveThreads()*/
1350
            SCMutexUnlock(&tv_root_lock);
14,042✔
1351

1352
            /* sleep outside lock */
1353
            SleepMsec(1);
14,042✔
1354
            goto again;
14,042✔
1355
        }
14,042✔
1356
        if (ThreadBusy(tv)) {
51,807✔
1357
            SCMutexUnlock(&tv_root_lock);
815✔
1358

1359
            Packet *p = PacketGetFromAlloc();
815✔
1360
            if (p != NULL) {
815!
1361
                p->flags |= PKT_PSEUDO_STREAM_END;
815✔
1362
                PKT_SET_SRC(p, PKT_SRC_SHUTDOWN_FLUSH);
815✔
1363
                CaptureHooksOnPseudoPacketCreated(p);
815✔
1364
                PacketQueue *q = tv->stream_pq;
815✔
1365
                SCMutexLock(&q->mutex_q);
815✔
1366
                PacketEnqueue(q, p);
815✔
1367
                SCCondSignal(&q->cond_q);
815✔
1368
                SCMutexUnlock(&q->mutex_q);
815✔
1369
            }
815✔
1370

1371
            /* don't sleep while holding a lock */
1372
            SleepMsec(1);
815✔
1373
            goto again;
815✔
1374
        }
815✔
1375
        tv = tv->next;
50,992✔
1376
    }
50,992✔
1377

1378
    SCMutexUnlock(&tv_root_lock);
5,943✔
1379
}
5,943✔
1380

1381
/**
1382
 *  \brief Disable all threads having the specified TMs.
1383
 *
1384
 *  Breaks out of the packet acquisition loop, and bumps
1385
 *  into the 'flow loop', where it will process packets
1386
 *  from the flow engine's shutdown handling.
1387
 */
1388
void TmThreadDisableReceiveThreads(void)
1389
{
1,981✔
1390
    ThreadVars *tv = NULL;
1,981✔
1391
    struct timeval start_ts;
1,981✔
1392
    struct timeval cur_ts;
1,981✔
1393
    gettimeofday(&start_ts, NULL);
1,981✔
1394

1395
again:
2,517✔
1396
    gettimeofday(&cur_ts, NULL);
2,517✔
1397
    if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
2,517!
1398
        FatalError("Engine unable to disable detect "
×
1399
                   "thread - \"%s\". Killing engine",
×
1400
                tv->name);
×
1401
    }
×
1402

1403
    SCMutexLock(&tv_root_lock);
2,517✔
1404

1405
    /* all receive threads are part of packet processing threads */
1406
    tv = tv_root[TVT_PPT];
2,517✔
1407

1408
    /* we do have to keep in mind that TVs are arranged in the order
1409
     * right from receive to log.  The moment we fail to find a
1410
     * receive TM amongst the slots in a tv, it indicates we are done
1411
     * with all receive threads */
1412
    while (tv) {
12,295✔
1413
        int disable = 0;
10,314✔
1414
        TmModule *tm = NULL;
10,314✔
1415
        /* obtain the slots for this TV */
1416
        TmSlot *slots = tv->tm_slots;
10,314✔
1417
        while (slots != NULL) {
18,096✔
1418
            tm = TmModuleGetById(slots->tm_id);
10,319✔
1419

1420
            if (tm->flags & TM_FLAG_RECEIVE_TM) {
10,319✔
1421
                disable = 1;
2,537✔
1422
                break;
2,537✔
1423
            }
2,537✔
1424

1425
            slots = slots->slot_next;
7,782✔
1426
            continue;
7,782✔
1427
        }
10,319✔
1428

1429
        if (disable) {
10,314✔
1430
            if (ThreadStillHasPackets(tv)) {
2,537!
1431
                /* we wait till we dry out all the inq packets, before we
1432
                 * kill this thread.  Do note that you should have disabled
1433
                 * packet acquire by now using TmThreadDisableReceiveThreads()*/
1434
                SCMutexUnlock(&tv_root_lock);
×
1435
                /* don't sleep while holding a lock */
1436
                SleepMsec(1);
×
1437
                goto again;
×
1438
            }
×
1439

1440
            if (ThreadBusy(tv)) {
2,537✔
1441
                SCMutexUnlock(&tv_root_lock);
1✔
1442

1443
                Packet *p = PacketGetFromAlloc();
1✔
1444
                if (p != NULL) {
1!
1445
                    p->flags |= PKT_PSEUDO_STREAM_END;
1✔
1446
                    PKT_SET_SRC(p, PKT_SRC_SHUTDOWN_FLUSH);
1✔
1447
                    CaptureHooksOnPseudoPacketCreated(p);
1✔
1448
                    PacketQueue *q = tv->stream_pq;
1✔
1449
                    SCMutexLock(&q->mutex_q);
1✔
1450
                    PacketEnqueue(q, p);
1✔
1451
                    SCCondSignal(&q->cond_q);
1✔
1452
                    SCMutexUnlock(&q->mutex_q);
1✔
1453
                }
1✔
1454

1455
                /* don't sleep while holding a lock */
1456
                SleepMsec(1);
1✔
1457
                goto again;
1✔
1458
            }
1✔
1459

1460
            /* we found a receive TV. Send it a KILL_PKTACQ signal. */
1461
            if (tm && tm->PktAcqBreakLoop != NULL) {
2,536!
UNCOV
1462
                tm->PktAcqBreakLoop(tv, SC_ATOMIC_GET(slots->slot_data));
×
UNCOV
1463
            }
×
1464
            TmThreadsSetFlag(tv, THV_REQ_FLOW_LOOP);
2,536✔
1465

1466
            if (tv->inq != NULL) {
2,536!
1467
                for (int i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) {
×
1468
                    SCMutexLock(&tv->inq->pq->mutex_q);
×
1469
                    SCCondSignal(&tv->inq->pq->cond_q);
×
1470
                    SCMutexUnlock(&tv->inq->pq->mutex_q);
×
1471
                }
×
1472
                SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id);
×
1473
            }
×
1474

1475
            /* wait for it to enter the 'flow loop' stage */
1476
            while (!TmThreadsCheckFlag(tv, THV_FLOW_LOOP)) {
2,536!
1477
                SCMutexUnlock(&tv_root_lock);
535✔
1478

1479
                SleepMsec(1);
535✔
1480
                goto again;
535✔
1481
            }
535✔
1482
        }
2,536✔
1483

1484
        tv = tv->next;
9,778✔
1485
    }
9,778✔
1486

1487
    SCMutexUnlock(&tv_root_lock);
1,981✔
1488

1489
    /* finally wait for all packet threads to have
1490
     * processed all of their 'live' packets so we
1491
     * don't process the last live packets together
1492
     * with FFR packets */
1493
    TmThreadDrainPacketThreads();
1,981✔
1494
}
1,981✔
1495

1496
#ifdef DEBUG_VALIDATION
1497
static void TmThreadDumpThreads(void);
1498
#endif
1499

1500
static void TmThreadDebugValidateNoMorePackets(void)
1501
{
3,962✔
1502
#ifdef DEBUG_VALIDATION
1503
    SCMutexLock(&tv_root_lock);
1504
    for (ThreadVars *tv = tv_root[TVT_PPT]; tv != NULL; tv = tv->next) {
1505
        if (ThreadStillHasPackets(tv)) {
1506
            SCMutexUnlock(&tv_root_lock);
1507
            TmThreadDumpThreads();
1508
            DEBUG_VALIDATE_BUG_ON(1);
1509
        }
1510
    }
1511
    SCMutexUnlock(&tv_root_lock);
1512
#endif
1513
}
3,962✔
1514

1515
/** \internal
1516
 *  \brief check if a thread has any of the modules indicated by TM_FLAG_*
1517
 *  \param tv thread
1518
 *  \param flags TM_FLAG_*'s
1519
 *  \retval bool true if at least on of the flags is present */
1520
static inline bool CheckModuleFlags(const ThreadVars *tv, const uint8_t flags)
1521
{
65,185✔
1522
    return (tv->tmm_flags & flags) != 0;
65,185✔
1523
}
65,185✔
1524

1525
/**
1526
 * \brief Disable all packet threads
1527
 * \param set flag to set
1528
 * \param check flag to check
1529
 * \param module_flags bitflags of TmModule's to apply the `set` flag to.
1530
 *
1531
 * Support 2 stages in shutting down the packet threads:
1532
 * 1. set THV_REQ_FLOW_LOOP and wait for THV_FLOW_LOOP
1533
 * 2. set THV_KILL and wait for THV_RUNNING_DONE
1534
 *
1535
 * During step 1 the main loop is exited, and the flow loop logic is entered.
1536
 * During step 2, the flow loop logic is done and the thread closes.
1537
 *
1538
 * `module_flags` limits which threads are disabled
1539
 */
1540
void TmThreadDisablePacketThreads(
1541
        const uint16_t set, const uint16_t check, const uint8_t module_flags)
1542
{
3,962✔
1543
    struct timeval start_ts;
3,962✔
1544
    struct timeval cur_ts;
3,962✔
1545

1546
    /* first drain all packet threads of their packets */
1547
    TmThreadDrainPacketThreads();
3,962✔
1548

1549
    /* since all the threads possibly able to produce more packets
1550
     * are now gone or inactive, we should see no packets anywhere
1551
     * anymore. */
1552
    TmThreadDebugValidateNoMorePackets();
3,962✔
1553

1554
    gettimeofday(&start_ts, NULL);
3,962✔
1555
again:
16,907✔
1556
    gettimeofday(&cur_ts, NULL);
16,907✔
1557
    if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
16,907!
1558
        FatalError("Engine unable to disable packet  "
×
1559
                   "threads. Killing engine");
×
1560
    }
×
1561

1562
    /* loop through the packet threads and kill them */
1563
    SCMutexLock(&tv_root_lock);
16,907✔
1564
    for (ThreadVars *tv = tv_root[TVT_PPT]; tv != NULL; tv = tv->next) {
69,147✔
1565
        /* only set flow worker threads to THV_REQ_FLOW_LOOP */
1566
        if (!CheckModuleFlags(tv, module_flags)) {
65,185!
1567
            SCLogDebug("%s does not have any of the modules %02x, skip", tv->name, module_flags);
1!
1568
            continue;
1✔
1569
        }
1✔
1570
        TmThreadsSetFlag(tv, set);
65,184✔
1571

1572
        /* separate worker threads (autofp) will still wait at their
1573
         * input queues. So nudge them here so they will observe the
1574
         * THV_KILL flag. */
1575
        if (tv->inq != NULL) {
65,184✔
1576
            for (int i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) {
144,495✔
1577
                SCMutexLock(&tv->inq->pq->mutex_q);
96,360✔
1578
                SCCondSignal(&tv->inq->pq->cond_q);
96,360✔
1579
                SCMutexUnlock(&tv->inq->pq->mutex_q);
96,360✔
1580
            }
96,360✔
1581
            SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id);
48,135!
1582
        }
48,135✔
1583

1584
        /* wait for it to reach the expected state */
1585
        if (!TmThreadsCheckFlag(tv, check)) {
65,184✔
1586
            SCMutexUnlock(&tv_root_lock);
12,945✔
1587
            SCLogDebug("%s did not reach state %u, again", tv->name, check);
12,945!
1588

1589
            SleepMsec(1);
12,945✔
1590
            goto again;
12,945✔
1591
        }
12,945✔
1592
    }
65,184✔
1593
    SCMutexUnlock(&tv_root_lock);
3,962✔
1594
}
3,962✔
1595

1596
#define MIN_WAIT_TIME 100
141,846✔
1597
#define MAX_WAIT_TIME 999999
1598
void TmThreadKillThreadsFamily(int family)
1599
{
9,908✔
1600
    ThreadVars *tv = NULL;
9,908✔
1601
    unsigned int sleep_usec = MIN_WAIT_TIME;
9,908✔
1602

1603
    BUG_ON((family < 0) || (family >= TVT_MAX));
9,908!
1604

1605
again:
66,955✔
1606
    SCMutexLock(&tv_root_lock);
66,955✔
1607
    tv = tv_root[family];
66,955✔
1608

1609
    while (tv) {
198,893✔
1610
        int r = TmThreadKillThread(tv);
188,985✔
1611
        if (r == 0) {
188,985✔
1612
            SCMutexUnlock(&tv_root_lock);
57,047✔
1613
            SleepUsec(sleep_usec);
57,047✔
1614
            sleep_usec *= 2; /* slowly back off */
57,047✔
1615
            sleep_usec = MIN(sleep_usec, MAX_WAIT_TIME);
57,047!
1616
            goto again;
57,047✔
1617
        }
57,047✔
1618
        sleep_usec = MIN_WAIT_TIME; /* reset */
131,938✔
1619

1620
        tv = tv->next;
131,938✔
1621
    }
131,938✔
1622
    SCMutexUnlock(&tv_root_lock);
9,908✔
1623
}
9,908✔
1624
#undef MIN_WAIT_TIME
1625
#undef MAX_WAIT_TIME
1626

1627
void TmThreadKillThreads(void)
1628
{
1,982✔
1629
    int i = 0;
1,982✔
1630

1631
    for (i = 0; i < TVT_MAX; i++) {
7,928✔
1632
        TmThreadKillThreadsFamily(i);
5,946✔
1633
    }
5,946✔
1634
}
1,982✔
1635

1636
static void TmThreadFree(ThreadVars *tv)
1637
{
16,762✔
1638
    TmSlot *s;
16,762✔
1639
    TmSlot *ps;
16,762✔
1640
    if (tv == NULL)
16,762!
1641
        return;
×
1642

1643
    SCLogDebug("Freeing thread '%s'.", tv->name);
16,762!
1644

1645
    ThreadFreeStorage(tv);
16,762✔
1646

1647
    if (tv->flow_queue) {
16,762✔
1648
        BUG_ON(tv->flow_queue->qlen != 0);
7,836!
1649
        SCFree(tv->flow_queue);
7,836✔
1650
    }
7,836✔
1651

1652
    StatsThreadCleanup(&tv->stats);
16,762✔
1653

1654
    TmThreadDeinitMC(tv);
16,762✔
1655

1656
    if (tv->printable_name) {
16,762!
1657
        SCFree(tv->printable_name);
24✔
1658
    }
24✔
1659

1660
    if (tv->iface_name) {
16,762!
1661
        SCFree(tv->iface_name);
24✔
1662
    }
24✔
1663

1664
    if (tv->stream_pq_local) {
16,762✔
1665
        BUG_ON(tv->stream_pq_local->len);
60!
1666
        SCMutexDestroy(&tv->stream_pq_local->mutex_q);
60✔
1667
        SCFree(tv->stream_pq_local);
60✔
1668
    }
60✔
1669

1670
    s = (TmSlot *)tv->tm_slots;
16,762✔
1671
    while (s) {
32,590✔
1672
        ps = s;
15,828✔
1673
        s = s->slot_next;
15,828✔
1674
        SCFree(ps);
15,828✔
1675
    }
15,828✔
1676

1677
    TmThreadsUnregisterThread(tv->id);
16,762✔
1678
    SCFree(tv);
16,762✔
1679
}
16,762✔
1680

1681
void TmThreadClearThreadsFamily(int family)
1682
{
3,962✔
1683
    ThreadVars *tv = NULL;
3,962✔
1684
    ThreadVars *ptv = NULL;
3,962✔
1685

1686
    if ((family < 0) || (family >= TVT_MAX))
3,962!
1687
        return;
×
1688

1689
    SCMutexLock(&tv_root_lock);
3,962✔
1690
    tv = tv_root[family];
3,962✔
1691

1692
    while (tv) {
20,724✔
1693
        ptv = tv;
16,762✔
1694
        tv = tv->next;
16,762✔
1695
        TmThreadFree(ptv);
16,762✔
1696
    }
16,762✔
1697
    tv_root[family] = NULL;
3,962✔
1698
    SCMutexUnlock(&tv_root_lock);
3,962✔
1699
}
3,962✔
1700

1701
/**
1702
 * \brief Spawns a thread associated with the ThreadVars instance tv
1703
 *
1704
 * \retval TM_ECODE_OK on success and TM_ECODE_FAILED on failure
1705
 */
1706
TmEcode TmThreadSpawn(ThreadVars *tv)
1707
{
16,781✔
1708
    pthread_attr_t attr;
16,781✔
1709
    if (tv->tm_func == NULL) {
16,781!
1710
        FatalError("No thread function set");
×
1711
    }
×
1712

1713
    /* Initialize and set thread detached attribute */
1714
    pthread_attr_init(&attr);
16,781✔
1715

1716
    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
16,781✔
1717

1718
    /* Adjust thread stack size if configured */
1719
    if (threading_set_stack_size) {
16,781!
1720
        SCLogDebug("Setting per-thread stack size to %" PRIu64, threading_set_stack_size);
×
1721
        if (pthread_attr_setstacksize(&attr, (size_t)threading_set_stack_size)) {
×
1722
            FatalError("Unable to increase stack size to %" PRIu64 " in thread attributes",
×
1723
                    threading_set_stack_size);
×
1724
        }
×
1725
    }
×
1726

1727
    int rc = pthread_create(&tv->t, &attr, tv->tm_func, (void *)tv);
16,781✔
1728
    if (rc) {
16,781!
1729
        FatalError("Unable to create thread %s with pthread_create(): retval %d: %s", tv->name, rc,
×
1730
                strerror(errno));
×
1731
    }
×
1732

1733
#if DEBUG && HAVE_PTHREAD_GETATTR_NP
1734
    if (threading_set_stack_size) {
1735
        if (pthread_getattr_np(tv->t, &attr) == 0) {
1736
            size_t stack_size;
1737
            void *stack_addr;
1738
            pthread_attr_getstack(&attr, &stack_addr, &stack_size);
1739
            SCLogDebug("stack: %p;  size %" PRIu64, stack_addr, (uintmax_t)stack_size);
1740
        } else {
1741
            SCLogDebug("Unable to retrieve current stack-size for display; return code from "
1742
                       "pthread_getattr_np() is %" PRId32,
1743
                    rc);
1744
        }
1745
    }
1746
#endif
1747

1748
    TmThreadWaitForFlag(tv, THV_INIT_DONE | THV_RUNNING_DONE);
16,781✔
1749

1750
    TmThreadAppend(tv, tv->type);
16,781✔
1751
    return TM_ECODE_OK;
16,781✔
1752
}
16,781✔
1753

1754
/**
1755
 * \brief Spawns a "fake" lib thread associated with the ThreadVars instance tv
1756
 *
1757
 * \retval TM_ECODE_OK on success and TM_ECODE_FAILED on failure
1758
 */
1759
TmEcode TmThreadLibSpawn(ThreadVars *tv)
1760
{
×
1761
    if (tv->tm_func == NULL) {
×
1762
        printf("ERROR: no thread function set\n");
×
1763
        return TM_ECODE_FAILED;
×
1764
    }
×
1765

1766
    if (tv->tm_func((void *)tv) == (void *)-1) {
×
1767
        return TM_ECODE_FAILED;
×
1768
    }
×
1769

1770
    TmThreadWaitForFlag(tv, THV_INIT_DONE | THV_RUNNING_DONE);
×
1771

1772
    return TM_ECODE_OK;
×
1773
}
×
1774

1775
/**
1776
 * \brief Initializes the mutex and condition variables for this TV
1777
 *
1778
 * It can be used by a thread to control a wait loop that can also be
1779
 * influenced by other threads.
1780
 *
1781
 * \param tv Pointer to a TV instance
1782
 */
1783
void TmThreadInitMC(ThreadVars *tv)
1784
{
3,034✔
1785
    if ( (tv->ctrl_mutex = SCMalloc(sizeof(*tv->ctrl_mutex))) == NULL) {
3,034!
1786
        FatalError("Fatal error encountered in TmThreadInitMC.  "
×
1787
                   "Exiting...");
×
1788
    }
×
1789

1790
    if (SCCtrlMutexInit(tv->ctrl_mutex, NULL) != 0) {
3,034!
1791
        printf("Error initializing the tv->m mutex\n");
×
1792
        exit(EXIT_FAILURE);
×
1793
    }
×
1794

1795
    if ( (tv->ctrl_cond = SCMalloc(sizeof(*tv->ctrl_cond))) == NULL) {
3,034!
1796
        FatalError("Fatal error encountered in TmThreadInitMC.  "
×
1797
                   "Exiting...");
×
1798
    }
×
1799

1800
    if (SCCtrlCondInit(tv->ctrl_cond, NULL) != 0) {
3,034!
1801
        FatalError("Error initializing the tv->cond condition "
×
1802
                   "variable");
×
1803
    }
×
1804
}
3,034✔
1805

1806
static void TmThreadDeinitMC(ThreadVars *tv)
1807
{
16,762✔
1808
    if (tv->ctrl_mutex) {
16,762✔
1809
        SCCtrlMutexDestroy(tv->ctrl_mutex);
3,022✔
1810
        SCFree(tv->ctrl_mutex);
3,022✔
1811
    }
3,022✔
1812
    if (tv->ctrl_cond) {
16,762✔
1813
        SCCtrlCondDestroy(tv->ctrl_cond);
3,022✔
1814
        SCFree(tv->ctrl_cond);
3,022✔
1815
    }
3,022✔
1816
}
16,762✔
1817

1818
/**
1819
 * \brief Waits till the specified flag(s) is(are) set.  We don't bother if
1820
 *        the kill flag has been set or not on the thread.
1821
 *
1822
 * \param tv Pointer to the TV instance.
1823
 */
1824
void TmThreadWaitForFlag(ThreadVars *tv, uint32_t flags)
1825
{
33,558✔
1826
    while (!TmThreadsCheckFlag(tv, flags)) {
1,713,430✔
1827
        SleepUsec(100);
1,679,872✔
1828
    }
1,679,872✔
1829
}
33,558✔
1830

1831
/**
1832
 * \brief Unpauses a thread
1833
 *
1834
 * \param tv Pointer to a TV instance that has to be unpaused
1835
 */
1836
void TmThreadContinue(ThreadVars *tv)
1837
{
16,785✔
1838
    TmThreadsUnsetFlag(tv, THV_PAUSE);
16,785✔
1839
}
16,785✔
1840

1841
static TmEcode WaitOnThreadsRunningByType(const int t)
1842
{
5,946✔
1843
    struct timeval start_ts;
5,946✔
1844
    struct timeval cur_ts;
5,946✔
1845
    uint32_t thread_cnt = 0;
5,946✔
1846

1847
    /* on retries, this will init to the last thread that started up already */
1848
    ThreadVars *tv_start = tv_root[t];
5,946✔
1849
    SCMutexLock(&tv_root_lock);
5,946✔
1850
    for (ThreadVars *tv = tv_start; tv != NULL; tv = tv->next) {
22,719✔
1851
        thread_cnt++;
16,773✔
1852
    }
16,773✔
1853
    SCMutexUnlock(&tv_root_lock);
5,946✔
1854

1855
    /* give threads a second each to start up, plus a margin of a minute. */
1856
    uint32_t time_budget = 60 + thread_cnt;
5,946✔
1857

1858
    gettimeofday(&start_ts, NULL);
5,946✔
1859
again:
12,105✔
1860
    SCMutexLock(&tv_root_lock);
12,105✔
1861
    ThreadVars *tv = tv_start;
12,105✔
1862
    while (tv != NULL) {
29,851✔
1863
        if (TmThreadsCheckFlag(tv, (THV_FAILED | THV_CLOSED | THV_DEAD))) {
23,905!
1864
            SCMutexUnlock(&tv_root_lock);
×
1865

1866
            SCLogError("thread \"%s\" failed to "
×
1867
                       "start: flags %04x",
×
1868
                    tv->name, SC_ATOMIC_GET(tv->flags));
×
1869
            return TM_ECODE_FAILED;
×
1870
        }
×
1871

1872
        if (!(TmThreadsCheckFlag(tv, THV_RUNNING | THV_RUNNING_DONE))) {
23,905✔
1873
            SCMutexUnlock(&tv_root_lock);
6,159✔
1874

1875
            /* 60 seconds provided for the thread to transition from
1876
             * THV_INIT_DONE to THV_RUNNING */
1877
            gettimeofday(&cur_ts, NULL);
6,159✔
1878
            if (((uint32_t)cur_ts.tv_sec - (uint32_t)start_ts.tv_sec) > time_budget) {
6,159!
1879
                SCLogError("thread \"%s\" failed to "
×
1880
                           "start in time: flags %04x. Total threads: %u. Time budget %us",
×
1881
                        tv->name, SC_ATOMIC_GET(tv->flags), thread_cnt, time_budget);
×
1882
                return TM_ECODE_FAILED;
×
1883
            }
×
1884

1885
            /* sleep a little to give the thread some
1886
             * time to start running */
1887
            SleepUsec(100);
6,159✔
1888
            goto again;
6,159✔
1889
        }
6,159✔
1890
        tv_start = tv;
17,746✔
1891

1892
        tv = tv->next;
17,746✔
1893
    }
17,746✔
1894
    SCMutexUnlock(&tv_root_lock);
5,946✔
1895
    return TM_ECODE_OK;
5,946✔
1896
}
12,105✔
1897

1898
/**
1899
 * \brief Waits for all threads to be in a running state
1900
 *
1901
 * \retval TM_ECODE_OK if all are running or error if a thread failed
1902
 */
1903
TmEcode TmThreadWaitOnThreadRunning(void)
1904
{
1,982✔
1905
    uint16_t RX_num = 0;
1,982✔
1906
    uint16_t W_num = 0;
1,982✔
1907
    uint16_t FM_num = 0;
1,982✔
1908
    uint16_t FR_num = 0;
1,982✔
1909
    uint16_t TX_num = 0;
1,982✔
1910

1911
    for (int i = 0; i < TVT_MAX; i++) {
7,928✔
1912
        if (WaitOnThreadsRunningByType(i) != TM_ECODE_OK)
5,946!
1913
            return TM_ECODE_FAILED;
×
1914
    }
5,946✔
1915

1916
    SCMutexLock(&tv_root_lock);
1,982✔
1917
    for (int i = 0; i < TVT_MAX; i++) {
7,928✔
1918
        for (ThreadVars *tv = tv_root[i]; tv != NULL; tv = tv->next) {
22,719✔
1919
            if (strncmp(thread_name_autofp, tv->name, strlen(thread_name_autofp)) == 0)
16,773✔
1920
                RX_num++;
1,941✔
1921
            else if (strncmp(thread_name_workers, tv->name, strlen(thread_name_workers)) == 0)
14,832✔
1922
                W_num++;
7,836✔
1923
            else if (strncmp(thread_name_verdict, tv->name, strlen(thread_name_verdict)) == 0)
6,996!
1924
                TX_num++;
1✔
1925
            else if (strncmp(thread_name_flow_mgr, tv->name, strlen(thread_name_flow_mgr)) == 0)
6,995✔
1926
                FM_num++;
1,981✔
1927
            else if (strncmp(thread_name_flow_rec, tv->name, strlen(thread_name_flow_rec)) == 0)
5,014✔
1928
                FR_num++;
1,981✔
1929
        }
16,773✔
1930
    }
5,946✔
1931
    SCMutexUnlock(&tv_root_lock);
1,982✔
1932

1933
    /* Construct a welcome string displaying
1934
     * initialized thread types and counts */
1935
    uint16_t app_len = 32;
1,982✔
1936
    uint16_t buf_len = 256;
1,982✔
1937

1938
    char append_str[app_len];
1,982✔
1939
    char thread_counts[buf_len];
1,982✔
1940

1941
    strlcpy(thread_counts, "Threads created -> ", strlen("Threads created -> ") + 1);
1,982✔
1942
    if (RX_num > 0) {
1,982✔
1943
        snprintf(append_str, app_len, "RX: %u ", RX_num);
1,938✔
1944
        strlcat(thread_counts, append_str, buf_len);
1,938✔
1945
    }
1,938✔
1946
    if (W_num > 0) {
1,982✔
1947
        snprintf(append_str, app_len, "W: %u ", W_num);
1,981✔
1948
        strlcat(thread_counts, append_str, buf_len);
1,981✔
1949
    }
1,981✔
1950
    if (TX_num > 0) {
1,982!
1951
        snprintf(append_str, app_len, "TX: %u ", TX_num);
1✔
1952
        strlcat(thread_counts, append_str, buf_len);
1✔
1953
    }
1✔
1954
    if (FM_num > 0) {
1,982✔
1955
        snprintf(append_str, app_len, "FM: %u ", FM_num);
1,981✔
1956
        strlcat(thread_counts, append_str, buf_len);
1,981✔
1957
    }
1,981✔
1958
    if (FR_num > 0) {
1,982✔
1959
        snprintf(append_str, app_len, "FR: %u ", FR_num);
1,981✔
1960
        strlcat(thread_counts, append_str, buf_len);
1,981✔
1961
    }
1,981✔
1962
    snprintf(append_str, app_len, "  Engine started.");
1,982✔
1963
    strlcat(thread_counts, append_str, buf_len);
1,982✔
1964
    SCLogNotice("%s", thread_counts);
1,982✔
1965

1966
    return TM_ECODE_OK;
1,982✔
1967
}
1,982✔
1968

1969
/**
1970
 * \brief Unpauses all threads present in tv_root
1971
 */
1972
void TmThreadContinueThreads(void)
1973
{
1,982✔
1974
    SCMutexLock(&tv_root_lock);
1,982✔
1975
    for (int i = 0; i < TVT_MAX; i++) {
7,928✔
1976
        ThreadVars *tv = tv_root[i];
5,946✔
1977
        while (tv != NULL) {
22,719✔
1978
            TmThreadContinue(tv);
16,773✔
1979
            tv = tv->next;
16,773✔
1980
        }
16,773✔
1981
    }
5,946✔
1982
    SCMutexUnlock(&tv_root_lock);
1,982✔
1983
}
1,982✔
1984

1985
/**
1986
 * \brief Used to check the thread for certain conditions of failure.
1987
 */
1988
void TmThreadCheckThreadState(void)
1989
{
36,545✔
1990
    SCMutexLock(&tv_root_lock);
36,545✔
1991
    for (int i = 0; i < TVT_MAX; i++) {
146,180✔
1992
        ThreadVars *tv = tv_root[i];
109,635✔
1993
        while (tv) {
513,430✔
1994
            if (TmThreadsCheckFlag(tv, THV_FAILED)) {
403,795!
1995
                FatalError("thread %s failed", tv->name);
×
1996
            }
×
1997
            tv = tv->next;
403,795✔
1998
        }
403,795✔
1999
    }
109,635✔
2000
    SCMutexUnlock(&tv_root_lock);
36,545✔
2001
}
36,545✔
2002

2003
/**
2004
 *  \brief Used to check if all threads have finished their initialization.  On
2005
 *         finding an un-initialized thread, it waits till that thread completes
2006
 *         its initialization, before proceeding to the next thread.
2007
 *
2008
 *  \retval TM_ECODE_OK all initialized properly
2009
 *  \retval TM_ECODE_FAILED failure
2010
 */
2011
TmEcode TmThreadWaitOnThreadInit(void)
2012
{
1,982✔
2013
    struct timeval start_ts;
1,982✔
2014
    struct timeval cur_ts;
1,982✔
2015
    gettimeofday(&start_ts, NULL);
1,982✔
2016

2017
again:
1,982✔
2018
    SCMutexLock(&tv_root_lock);
1,982✔
2019
    for (int i = 0; i < TVT_MAX; i++) {
7,928✔
2020
        ThreadVars *tv = tv_root[i];
5,946✔
2021
        while (tv != NULL) {
22,719✔
2022
            if (TmThreadsCheckFlag(tv, (THV_CLOSED|THV_DEAD))) {
16,773!
2023
                SCMutexUnlock(&tv_root_lock);
×
2024

2025
                SCLogError("thread \"%s\" failed to "
×
2026
                           "initialize: flags %04x",
×
2027
                        tv->name, SC_ATOMIC_GET(tv->flags));
×
2028
                return TM_ECODE_FAILED;
×
2029
            }
×
2030

2031
            if (!(TmThreadsCheckFlag(tv, THV_INIT_DONE))) {
16,773!
2032
                SCMutexUnlock(&tv_root_lock);
×
2033

2034
                gettimeofday(&cur_ts, NULL);
×
2035
                if ((cur_ts.tv_sec - start_ts.tv_sec) > 120) {
×
2036
                    SCLogError("thread \"%s\" failed to "
×
2037
                               "initialize in time: flags %04x",
×
2038
                            tv->name, SC_ATOMIC_GET(tv->flags));
×
2039
                    return TM_ECODE_FAILED;
×
2040
                }
×
2041

2042
                /* sleep a little to give the thread some
2043
                 * time to finish initialization */
2044
                SleepUsec(100);
×
2045
                goto again;
×
2046
            }
×
2047

2048
            if (TmThreadsCheckFlag(tv, THV_FAILED)) {
16,773!
2049
                SCMutexUnlock(&tv_root_lock);
×
2050
                SCLogError("thread \"%s\" failed to "
×
2051
                           "initialize.",
×
2052
                        tv->name);
×
2053
                return TM_ECODE_FAILED;
×
2054
            }
×
2055
            if (TmThreadsCheckFlag(tv, THV_CLOSED)) {
16,773!
2056
                SCMutexUnlock(&tv_root_lock);
×
2057
                SCLogError("thread \"%s\" closed on "
×
2058
                           "initialization.",
×
2059
                        tv->name);
×
2060
                return TM_ECODE_FAILED;
×
2061
            }
×
2062

2063
            tv = tv->next;
16,773✔
2064
        }
16,773✔
2065
    }
5,946✔
2066
    SCMutexUnlock(&tv_root_lock);
1,982✔
2067

2068
    return TM_ECODE_OK;
1,982✔
2069
}
1,982✔
2070

2071
/**
2072
 * \brief returns a count of all the threads that match the flag
2073
 */
2074
uint32_t TmThreadCountThreadsByTmmFlags(uint8_t flags)
2075
{
1,982✔
2076
    uint32_t cnt = 0;
1,982✔
2077
    SCMutexLock(&tv_root_lock);
1,982✔
2078
    for (int i = 0; i < TVT_MAX; i++) {
7,928✔
2079
        ThreadVars *tv = tv_root[i];
5,946✔
2080
        while (tv != NULL) {
22,719✔
2081
            if ((tv->tmm_flags & flags) == flags)
16,773✔
2082
                cnt++;
7,836✔
2083

2084
            tv = tv->next;
16,773✔
2085
        }
16,773✔
2086
    }
5,946✔
2087
    SCMutexUnlock(&tv_root_lock);
1,982✔
2088
    return cnt;
1,982✔
2089
}
1,982✔
2090

2091
#ifdef DEBUG_VALIDATION
2092
static void TmThreadDoDumpSlots(const ThreadVars *tv)
2093
{
2094
    for (TmSlot *s = tv->tm_slots; s != NULL; s = s->slot_next) {
2095
        TmModule *m = TmModuleGetById(s->tm_id);
2096
        SCLogNotice("tv %p: -> slot %p tm_id %d name %s",
2097
            tv, s, s->tm_id, m->name);
2098
    }
2099
}
2100

2101
static void TmThreadDumpThreads(void)
2102
{
2103
    SCMutexLock(&tv_root_lock);
2104
    for (int i = 0; i < TVT_MAX; i++) {
2105
        ThreadVars *tv = tv_root[i];
2106
        while (tv != NULL) {
2107
            const uint32_t flags = SC_ATOMIC_GET(tv->flags);
2108
            SCLogNotice("tv %p: type %u name %s tmm_flags %02X flags %X stream_pq %p",
2109
                    tv, tv->type, tv->name, tv->tmm_flags, flags, tv->stream_pq);
2110
            if (tv->inq && tv->stream_pq == tv->inq->pq) {
2111
                SCLogNotice("tv %p: stream_pq at tv->inq %u", tv, tv->inq->id);
2112
            } else if (tv->stream_pq_local != NULL) {
2113
                for (Packet *xp = tv->stream_pq_local->top; xp != NULL; xp = xp->next) {
2114
                    SCLogNotice("tv %p: ==> stream_pq_local: pq.len %u packet src %s",
2115
                            tv, tv->stream_pq_local->len, PktSrcToString(xp->pkt_src));
2116
                }
2117
            }
2118
            for (Packet *xp = tv->decode_pq.top; xp != NULL; xp = xp->next) {
2119
                SCLogNotice("tv %p: ==> decode_pq: decode_pq.len %u packet src %s",
2120
                        tv, tv->decode_pq.len, PktSrcToString(xp->pkt_src));
2121
            }
2122
            TmThreadDoDumpSlots(tv);
2123
            tv = tv->next;
2124
        }
2125
    }
2126
    SCMutexUnlock(&tv_root_lock);
2127
    TmThreadsListThreads();
2128
}
2129
#endif
2130

2131
/* Aligned to CLS to avoid false sharing between atomic ops. */
2132
typedef struct Thread_ {
2133
    ThreadVars *tv;     /**< threadvars structure */
2134
    const char *name;
2135
    int type;
2136
    int in_use;         /**< bool to indicate this is in use */
2137

2138
    SC_ATOMIC_DECLARE(SCTime_t, pktts); /**< current packet time of this thread
2139
                                         *   (offline mode) */
2140
    SCTime_t sys_sec_stamp; /**< timestamp in real system
2141
                             *   time when the pktts was last updated. */
2142
    SCSpinlock spin;
2143
} __attribute__((aligned(CLS))) Thread;
2144

2145
typedef struct Threads_ {
2146
    Thread *threads;
2147
    size_t threads_size;
2148
    int threads_cnt;
2149
} Threads;
2150

2151
static bool thread_store_sealed = false;
2152
static Threads thread_store = { NULL, 0, 0 };
2153
static SCMutex thread_store_lock = SCMUTEX_INITIALIZER;
2154

2155
void TmThreadsSealThreads(void)
2156
{
1,981✔
2157
    SCMutexLock(&thread_store_lock);
1,981✔
2158
    DEBUG_VALIDATE_BUG_ON(thread_store_sealed);
1,981✔
2159
    thread_store_sealed = true;
1,981✔
2160
    SCMutexUnlock(&thread_store_lock);
1,981✔
2161
}
1,981✔
2162

2163
void TmThreadsUnsealThreads(void)
2164
{
1,981✔
2165
    SCMutexLock(&thread_store_lock);
1,981✔
2166
    DEBUG_VALIDATE_BUG_ON(!thread_store_sealed);
1,981✔
2167
    thread_store_sealed = false;
1,981✔
2168
    SCMutexUnlock(&thread_store_lock);
1,981✔
2169
}
1,981✔
2170

2171
void TmThreadsListThreads(void)
2172
{
×
2173
    SCMutexLock(&thread_store_lock);
×
2174
    for (size_t s = 0; s < thread_store.threads_size; s++) {
×
2175
        Thread *t = &thread_store.threads[s];
×
2176
        if (t == NULL || t->in_use == 0)
×
2177
            continue;
×
2178

2179
        SCLogNotice("Thread %"PRIuMAX", %s type %d, tv %p in_use %d",
×
2180
                (uintmax_t)s+1, t->name, t->type, t->tv, t->in_use);
×
2181
        if (t->tv) {
×
2182
            ThreadVars *tv = t->tv;
×
2183
            const uint32_t flags = SC_ATOMIC_GET(tv->flags);
×
2184
            SCLogNotice("tv %p type %u name %s tmm_flags %02X flags %X",
×
2185
                    tv, tv->type, tv->name, tv->tmm_flags, flags);
×
2186
        }
×
2187
    }
×
2188
    SCMutexUnlock(&thread_store_lock);
×
2189
}
×
2190

2191
#define STEP 32
1,984✔
2192
/**
2193
 *  \retval id thread id, or 0 if not found
2194
 */
2195
int TmThreadsRegisterThread(ThreadVars *tv, const int type)
2196
{
16,781✔
2197
    SCMutexLock(&thread_store_lock);
16,781✔
2198
    DEBUG_VALIDATE_BUG_ON(thread_store_sealed);
16,781✔
2199
    if (thread_store.threads == NULL) {
16,781✔
2200
        thread_store.threads = SCCalloc(STEP, sizeof(Thread));
1,984✔
2201
        BUG_ON(thread_store.threads == NULL);
1,984!
2202
        thread_store.threads_size = STEP;
1,984✔
2203
    }
1,984✔
2204

2205
    size_t s;
16,781✔
2206
    for (s = 0; s < thread_store.threads_size; s++) {
80,591!
2207
        if (thread_store.threads[s].in_use == 0) {
80,591✔
2208
            Thread *t = &thread_store.threads[s];
16,781✔
2209
            SCSpinInit(&t->spin, 0);
16,781✔
2210
            SCSpinLock(&t->spin);
16,781✔
2211
            t->name = tv->name;
16,781✔
2212
            t->type = type;
16,781✔
2213
            t->tv = tv;
16,781✔
2214
            t->in_use = 1;
16,781✔
2215
            SCSpinUnlock(&t->spin);
16,781✔
2216

2217
            SCMutexUnlock(&thread_store_lock);
16,781✔
2218
            return (int)(s+1);
16,781✔
2219
        }
16,781✔
2220
    }
80,591✔
2221

2222
    /* if we get here the array is completely filled */
2223
    void *newmem = SCRealloc(thread_store.threads, ((thread_store.threads_size + STEP) * sizeof(Thread)));
×
2224
    BUG_ON(newmem == NULL);
×
2225
    thread_store.threads = newmem;
×
2226
    memset((uint8_t *)thread_store.threads + (thread_store.threads_size * sizeof(Thread)), 0x00, STEP * sizeof(Thread));
×
2227

2228
    Thread *t = &thread_store.threads[thread_store.threads_size];
×
2229
    SCSpinInit(&t->spin, 0);
×
2230
    SCSpinLock(&t->spin);
×
2231
    t->name = tv->name;
×
2232
    t->type = type;
×
2233
    t->tv = tv;
×
2234
    t->in_use = 1;
×
2235
    SCSpinUnlock(&t->spin);
×
2236

2237
    s = thread_store.threads_size;
×
2238
    thread_store.threads_size += STEP;
×
2239

2240
    SCMutexUnlock(&thread_store_lock);
×
2241
    return (int)(s+1);
×
2242
}
×
2243
#undef STEP
2244

2245
void TmThreadsUnregisterThread(const int id)
2246
{
16,762✔
2247
    SCMutexLock(&thread_store_lock);
16,762✔
2248
    DEBUG_VALIDATE_BUG_ON(thread_store_sealed);
16,762✔
2249
    if (id <= 0 || id > (int)thread_store.threads_size) {
16,762!
2250
        SCMutexUnlock(&thread_store_lock);
×
2251
        return;
×
2252
    }
×
2253

2254
    /* id is one higher than index */
2255
    int idx = id - 1;
16,762✔
2256

2257
    /* reset thread_id, which serves as clearing the record */
2258
    thread_store.threads[idx].in_use = 0;
16,762✔
2259

2260
    /* check if we have at least one registered thread left */
2261
    size_t s;
16,762✔
2262
    for (s = 0; s < thread_store.threads_size; s++) {
99,412✔
2263
        Thread *t = &thread_store.threads[s];
97,438✔
2264
        if (t->in_use == 1) {
97,438✔
2265
            goto end;
14,788✔
2266
        }
14,788✔
2267
    }
97,438✔
2268

2269
    /* if we get here no threads are registered */
2270
    SCFree(thread_store.threads);
1,974✔
2271
    thread_store.threads = NULL;
1,974✔
2272
    thread_store.threads_size = 0;
1,974✔
2273
    thread_store.threads_cnt = 0;
1,974✔
2274

2275
end:
16,762✔
2276
    SCMutexUnlock(&thread_store_lock);
16,762✔
2277
}
16,762✔
2278

2279
void TmThreadsSetThreadTimestamp(const int id, const SCTime_t ts)
2280
{
1,647,893✔
2281
    SCTime_t now = SCTimeGetTime();
1,647,893✔
2282
    int idx = id - 1;
1,647,893✔
2283
    Thread *t = &thread_store.threads[idx];
1,647,893✔
2284
    SCSpinLock(&t->spin);
1,647,893✔
2285
    SC_ATOMIC_SET(t->pktts, ts);
1,647,893✔
2286

2287
#ifdef DEBUG
2288
    if (t->sys_sec_stamp.secs != 0) {
2289
        SCTime_t tmpts = SCTIME_ADD_SECS(t->sys_sec_stamp, 3);
2290
        if (SCTIME_CMP_LT(tmpts, now)) {
2291
            SCLogDebug("%s: thread slept for %u secs", t->name, (uint32_t)(now.secs - tmpts.secs));
2292
        }
2293
    }
2294
#endif
2295

2296
    t->sys_sec_stamp = now;
1,647,893✔
2297
    SCSpinUnlock(&t->spin);
1,647,893✔
2298
}
1,647,893✔
2299

2300
bool TmThreadsTimeSubsysIsReady(void)
2301
{
25,504✔
2302
    static SCTime_t nullts = SCTIME_INITIALIZER;
25,504✔
2303
    bool ready = true;
25,504✔
2304
    for (size_t s = 0; s < thread_store.threads_size; s++) {
42,020!
2305
        Thread *t = &thread_store.threads[s];
42,020✔
2306
        if (!t->in_use) {
42,020✔
2307
            break;
1,945✔
2308
        }
1,945✔
2309
        SCSpinLock(&t->spin);
40,075✔
2310
        if (t->type != TVT_PPT) {
40,075✔
2311
            SCSpinUnlock(&t->spin);
6,922✔
2312
            continue;
6,922✔
2313
        }
6,922✔
2314
        if (SCTIME_CMP_EQ(t->sys_sec_stamp, nullts)) {
33,153✔
2315
            ready = false;
23,559✔
2316
            SCSpinUnlock(&t->spin);
23,559✔
2317
            break;
23,559✔
2318
        }
23,559✔
2319
        SCSpinUnlock(&t->spin);
9,594✔
2320
    }
9,594✔
2321
    return ready;
25,504✔
2322
}
25,504✔
2323

2324
void TmThreadsInitThreadsTimestamp(const SCTime_t ts)
2325
{
1,945✔
2326
    SCTime_t now = SCTimeGetTime();
1,945✔
2327
    for (size_t s = 0; s < thread_store.threads_size; s++) {
18,440!
2328
        Thread *t = &thread_store.threads[s];
18,440✔
2329
        if (!t->in_use) {
18,440✔
2330
            break;
1,945✔
2331
        }
1,945✔
2332
        SCSpinLock(&t->spin);
16,495✔
2333
        if (t->type != TVT_PPT) {
16,495✔
2334
            SCSpinUnlock(&t->spin);
6,902✔
2335
            continue;
6,902✔
2336
        }
6,902✔
2337
        SC_ATOMIC_SET(t->pktts, ts);
9,593✔
2338
        t->sys_sec_stamp = now;
9,593✔
2339
        SCSpinUnlock(&t->spin);
9,593✔
2340
    }
9,593✔
2341
}
1,945✔
2342

2343
SCTime_t TmThreadsGetThreadTime(const int idx)
2344
{
2,571✔
2345
    DEBUG_VALIDATE_BUG_ON(idx == 0);
2,571✔
2346
    const int i = idx - 1;
2,571✔
2347
    Thread *t = &thread_store.threads[i];
2,571✔
2348
    return SC_ATOMIC_GET(t->pktts);
2,571✔
2349
}
2,571✔
2350

2351
void TmThreadsGetMinimalTimestamp(struct timeval *ts)
2352
{
95,139✔
2353
    struct timeval local = { 0 };
95,139✔
2354
    static SCTime_t nullts = SCTIME_INITIALIZER;
95,139✔
2355
    bool set = false;
95,139✔
2356
    SCTime_t now = SCTimeGetTime();
95,139✔
2357

2358
    for (size_t s = 0; s < thread_store.threads_size; s++) {
895,474!
2359
        Thread *t = &thread_store.threads[s];
895,474✔
2360
        if (t->in_use == 0) {
895,474✔
2361
            break;
95,139✔
2362
        }
95,139✔
2363
        SCSpinLock(&t->spin);
800,335✔
2364
        /* only packet threads set timestamps based on packets */
2365
        if (t->type != TVT_PPT) {
800,335✔
2366
            SCSpinUnlock(&t->spin);
352,568✔
2367
            continue;
352,568✔
2368
        }
352,568✔
2369
        SCTime_t pktts = SC_ATOMIC_GET(t->pktts);
447,767✔
2370
        if (SCTIME_CMP_NEQ(pktts, nullts)) {
447,767✔
2371
            SCTime_t sys_sec_stamp = SCTIME_ADD_SECS(t->sys_sec_stamp, 5);
447,003✔
2372
            /* ignore sleeping threads */
2373
            if (SCTIME_CMP_LT(sys_sec_stamp, now)) {
447,003✔
2374
                SCSpinUnlock(&t->spin);
2,515✔
2375
                continue;
2,515✔
2376
            }
2,515✔
2377
            if (!set) {
444,488✔
2378
                SCTIME_TO_TIMEVAL(&local, pktts);
95,075✔
2379
                set = true;
95,075✔
2380
            } else {
349,413✔
2381
                if (SCTIME_CMP_LT(pktts, SCTIME_FROM_TIMEVAL(&local))) {
349,413✔
2382
                    SCTIME_TO_TIMEVAL(&local, pktts);
2,547✔
2383
                }
2,547✔
2384
            }
349,413✔
2385
        }
444,488✔
2386
        SCSpinUnlock(&t->spin);
445,252✔
2387
    }
445,252✔
2388
    *ts = local;
95,139✔
2389
    SCLogDebug("ts->tv_sec %"PRIuMAX, (uintmax_t)ts->tv_sec);
95,139!
2390
}
95,139✔
2391

2392
uint16_t TmThreadsGetWorkerThreadMax(void)
2393
{
10✔
2394
    uint16_t ncpus = UtilCpuGetNumProcessorsOnline();
10✔
2395
    int thread_max = TmThreadGetNbThreads(WORKER_CPU_SET);
10✔
2396
    /* always create at least one thread */
2397
    if (thread_max == 0)
10!
2398
        thread_max = ncpus * threading_detect_ratio;
10✔
2399
    if (thread_max < 1)
10!
2400
        thread_max = 1;
×
2401
    if (thread_max > 1024) {
10!
2402
        SCLogWarning("limited number of 'worker' threads to 1024. Wanted %d", thread_max);
×
2403
        thread_max = 1024;
×
2404
    }
×
2405
    return (uint16_t)thread_max;
10✔
2406
}
10✔
2407

2408
/** \brief inject a flow into a threads flow queue
2409
 */
2410
void TmThreadsInjectFlowById(Flow *f, const int id)
2411
{
3,083✔
2412
    if (id > 0 && id <= (int)thread_store.threads_size) {
3,083!
2413
        int idx = id - 1;
3,083✔
2414
        Thread *t = &thread_store.threads[idx];
3,083✔
2415
        ThreadVars *tv = t->tv;
3,083✔
2416
        if (tv != NULL && tv->flow_queue != NULL) {
3,083!
2417
            FlowEnqueue(tv->flow_queue, f);
3,083✔
2418

2419
            /* wake up listening thread(s) if necessary */
2420
            if (tv->inq != NULL) {
3,083✔
2421
                SCMutexLock(&tv->inq->pq->mutex_q);
2,010✔
2422
                SCCondSignal(&tv->inq->pq->cond_q);
2,010✔
2423
                SCMutexUnlock(&tv->inq->pq->mutex_q);
2,010✔
2424
            } else if (tv->break_loop) {
2,015!
2425
                TmThreadsCaptureBreakLoop(tv);
×
2426
            }
×
2427
            return;
3,083✔
2428
        }
3,083✔
2429
    }
3,083✔
2430
    BUG_ON(1);
×
2431
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc