• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jasonish / suricata / 23114797756

15 Mar 2026 04:48PM UTC coverage: 79.303% (-0.007%) from 79.31%
23114797756

push

github

jasonish
fixup

265901 of 335299 relevant lines covered (79.3%)

5138115.35 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.26
/src/flow-manager.c
1
/* Copyright (C) 2007-2024 Open Information Security Foundation
2
 *
3
 * You can copy, redistribute or modify this Program under the terms of
4
 * the GNU General Public License version 2 as published by the Free
5
 * Software Foundation.
6
 *
7
 * This program is distributed in the hope that it will be useful,
8
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10
 * GNU General Public License for more details.
11
 *
12
 * You should have received a copy of the GNU General Public License
13
 * version 2 along with this program; if not, write to the Free Software
14
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15
 * 02110-1301, USA.
16
 */
17

18
/**
19
 * \file
20
 *
21
 * \author Anoop Saldanha <anoopsaldanha@gmail.com>
22
 * \author Victor Julien <victor@inliniac.net>
23
 */
24

25
#include "suricata-common.h"
26
#include "conf.h"
27
#include "threadvars.h"
28
#include "tm-threads.h"
29
#include "runmodes.h"
30

31
#include "util-time.h"
32

33
#include "flow.h"
34
#include "flow-queue.h"
35
#include "flow-hash.h"
36
#include "flow-util.h"
37
#include "flow-private.h"
38
#include "flow-timeout.h"
39
#include "flow-manager.h"
40
#include "flow-storage.h"
41
#include "flow-spare-pool.h"
42
#include "flow-callbacks.h"
43

44
#include "stream-tcp.h"
45
#include "stream-tcp-cache.h"
46

47
#include "util-device-private.h"
48

49
#include "util-debug.h"
50

51
#include "threads.h"
52
#include "detect-engine-threshold.h"
53

54
#include "host-timeout.h"
55
#include "defrag-hash.h"
56
#include "defrag-timeout.h"
57
#include "ippair-timeout.h"
58
#include "rust.h"
59
#include "app-layer-htp-range.h"
60

61
#include "output-flow.h"
62

63
#include "runmode-unix-socket.h"
64

65
/** queue to pass flows to cleanup/log thread(s) */
66
FlowQueue flow_recycle_q;
67

68
/* multi flow manager support */
69
static uint32_t flowmgr_number = 1;
70
/* atomic counter for flow managers, to assign instance id */
71
SC_ATOMIC_DECLARE(uint32_t, flowmgr_cnt);
72

73
/* multi flow recycler support */
74
static uint32_t flowrec_number = 1;
75
/* atomic counter for flow recyclers, to assign instance id */
76
SC_ATOMIC_DECLARE(uint32_t, flowrec_cnt);
77
SC_ATOMIC_DECLARE(uint32_t, flowrec_busy);
78
SC_ATOMIC_EXTERN(unsigned int, flow_flags);
79

80
static SCCtrlCondT flow_manager_ctrl_cond = PTHREAD_COND_INITIALIZER;
81
static SCCtrlMutex flow_manager_ctrl_mutex = PTHREAD_MUTEX_INITIALIZER;
82
static SCCtrlCondT flow_recycler_ctrl_cond = PTHREAD_COND_INITIALIZER;
83
static SCCtrlMutex flow_recycler_ctrl_mutex = PTHREAD_MUTEX_INITIALIZER;
84

85
void FlowWakeupFlowManagerThread(void)
86
{
3✔
87
    SCCtrlMutexLock(&flow_manager_ctrl_mutex);
3✔
88
    SCCtrlCondSignal(&flow_manager_ctrl_cond);
3✔
89
    SCCtrlMutexUnlock(&flow_manager_ctrl_mutex);
3✔
90
}
3✔
91

92
void FlowWakeupFlowRecyclerThread(void)
93
{
29,443✔
94
    SCCtrlMutexLock(&flow_recycler_ctrl_mutex);
29,443✔
95
    SCCtrlCondSignal(&flow_recycler_ctrl_cond);
29,443✔
96
    SCCtrlMutexUnlock(&flow_recycler_ctrl_mutex);
29,443✔
97
}
29,443✔
98

99
void FlowTimeoutsInit(void)
100
{
3,760✔
101
    SC_ATOMIC_SET(flow_timeouts, flow_timeouts_normal);
3,760✔
102
}
3,760✔
103

104
void FlowTimeoutsEmergency(void)
105
{
3✔
106
    SC_ATOMIC_SET(flow_timeouts, flow_timeouts_emerg);
3✔
107
}
3✔
108

109
typedef struct FlowTimeoutCounters_ {
110
    uint32_t rows_checked;
111
    uint32_t rows_skipped;
112
    uint32_t rows_empty;
113
    uint32_t rows_maxlen;
114

115
    uint32_t flows_checked;
116
    uint32_t flows_notimeout;
117
    uint32_t flows_timeout;
118
    uint32_t flows_removed;
119
    uint32_t flows_aside;
120
    uint32_t flows_aside_needs_work;
121

122
    uint32_t bypassed_count;
123
    uint64_t bypassed_pkts;
124
    uint64_t bypassed_bytes;
125
} FlowTimeoutCounters;
126

127
/**
128
 * \brief Used to disable flow manager thread(s).
129
 *
130
 * \todo Kinda hackish since it uses the tv name to identify flow manager
131
 *       thread.  We need an all weather identification scheme.
132
 */
133
void FlowDisableFlowManagerThread(void)
134
{
3,427✔
135
    SCMutexLock(&tv_root_lock);
3,427✔
136
    /* flow manager thread(s) is/are a part of mgmt threads */
137
    for (ThreadVars *tv = tv_root[TVT_MGMT]; tv != NULL; tv = tv->next) {
16,165✔
138
        if (strncasecmp(tv->name, thread_name_flow_mgr,
12,738✔
139
            strlen(thread_name_flow_mgr)) == 0)
12,738✔
140
        {
3,427✔
141
            TmThreadsSetFlag(tv, THV_KILL);
3,427✔
142
        }
3,427✔
143
    }
12,738✔
144
    SCMutexUnlock(&tv_root_lock);
3,427✔
145

146
    struct timeval start_ts;
3,427✔
147
    struct timeval cur_ts;
3,427✔
148
    gettimeofday(&start_ts, NULL);
3,427✔
149

150
again:
23,471✔
151
    gettimeofday(&cur_ts, NULL);
23,471✔
152
    if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
23,471✔
153
        FatalError("unable to get all flow manager "
×
154
                   "threads to shutdown in time");
×
155
    }
×
156

157
    SCMutexLock(&tv_root_lock);
23,471✔
158
    for (ThreadVars *tv = tv_root[TVT_MGMT]; tv != NULL; tv = tv->next) {
36,209✔
159
        if (strncasecmp(tv->name, thread_name_flow_mgr,
32,782✔
160
            strlen(thread_name_flow_mgr)) == 0)
32,782✔
161
        {
23,471✔
162
            if (!TmThreadsCheckFlag(tv, THV_RUNNING_DONE)) {
23,471✔
163
                SCMutexUnlock(&tv_root_lock);
20,044✔
164
                /* sleep outside lock */
165
                SleepMsec(1);
20,044✔
166
                goto again;
20,044✔
167
            }
20,044✔
168
        }
23,471✔
169
    }
32,782✔
170
    SCMutexUnlock(&tv_root_lock);
3,427✔
171

172
    /* reset count, so we can kill and respawn (unix socket) */
173
    SC_ATOMIC_SET(flowmgr_cnt, 0);
3,427✔
174
}
3,427✔
175

176
/** \internal
177
 *  \brief check if a flow is timed out
178
 *
179
 *  Takes lastts, adds the timeout policy to it, compared to current time `ts`.
180
 *  In case of emergency mode, timeout_policy is ignored and the emerg table
181
 *  is used.
182
 *
183
 *  \param f flow
184
 *  \param ts timestamp - realtime or a minimum of active threads in offline mode
185
 *  \param next_ts tracking the next timeout ts, so FM can skip the row until that time
186
 *  \param emerg bool to indicate if emergency timeout settings should be used
187
 *
188
 *  \retval false not timed out
189
 *  \retval true timed out
190
 */
191
static bool FlowManagerFlowTimeout(Flow *f, SCTime_t ts, uint32_t *next_ts, const bool emerg)
192
{
2,894✔
193
    SCTime_t timesout_at;
2,894✔
194

195
    if (emerg) {
2,894✔
196
        extern FlowProtoTimeout flow_timeouts_emerg[FLOW_PROTO_MAX];
×
197
        timesout_at = SCTIME_ADD_SECS(f->lastts,
×
198
                FlowGetFlowTimeoutDirect(flow_timeouts_emerg, f->flow_state, f->protomap));
×
199
    } else {
2,894✔
200
        timesout_at = SCTIME_ADD_SECS(f->lastts, f->timeout_policy);
2,894✔
201
    }
2,894✔
202
    /* update next_ts if needed */
203
    if (*next_ts == 0 || (uint32_t)SCTIME_SECS(timesout_at) < *next_ts)
2,894✔
204
        *next_ts = (uint32_t)SCTIME_SECS(timesout_at);
2,892✔
205

206
    /* if time is live, we just use the `ts` */
207
    if (TimeModeIsLive() || f->thread_id[0] == 0) {
2,894✔
208
        /* do the timeout check */
209
        if (SCTIME_CMP_LT(ts, timesout_at)) {
189✔
210
            return false;
130✔
211
        }
130✔
212
    } else {
2,891✔
213
        /* offline: take last ts from "owning" thread */
214
        SCTime_t checkts = TmThreadsGetThreadTime(f->thread_id[0]);
2,705✔
215
        /* do the timeout check */
216
        if (SCTIME_CMP_LT(checkts, timesout_at)) {
2,705✔
217
            return false;
1,878✔
218
        }
1,878✔
219
    }
2,705✔
220

221
    return true;
886✔
222
}
2,894✔
223

224
#ifdef CAPTURE_OFFLOAD
225
/** \internal
226
 *  \brief check timeout of captured bypassed flow by querying capture method
227
 *
228
 *  \param f Flow
229
 *  \param ts timestamp
230
 *  \param counters Flow timeout counters
231
 *
232
 *  \retval false not timeout
233
 *  \retval true timeout (or not capture bypassed)
234
 */
235
static inline bool FlowBypassedTimeout(Flow *f, SCTime_t ts, FlowTimeoutCounters *counters)
236
{
59✔
237
    if (f->flow_state != FLOW_STATE_CAPTURE_BYPASSED) {
59✔
238
        return true;
59✔
239
    }
59✔
240

241
    FlowBypassInfo *fc = FlowGetStorageById(f, GetFlowBypassInfoID());
242
    if (fc && fc->BypassUpdate) {
243
        /* flow will be possibly updated */
244
        uint64_t pkts_tosrc = fc->tosrcpktcnt;
245
        uint64_t bytes_tosrc = fc->tosrcbytecnt;
246
        uint64_t pkts_todst = fc->todstpktcnt;
247
        uint64_t bytes_todst = fc->todstbytecnt;
248
        bool update = fc->BypassUpdate(f, fc->bypass_data, SCTIME_SECS(ts));
249
        if (update) {
250
            SCLogDebug("Updated flow: %" PRIu64 "", FlowGetId(f));
251
            pkts_tosrc = fc->tosrcpktcnt - pkts_tosrc;
252
            bytes_tosrc = fc->tosrcbytecnt - bytes_tosrc;
253
            pkts_todst = fc->todstpktcnt - pkts_todst;
254
            bytes_todst = fc->todstbytecnt - bytes_todst;
255
            if (f->livedev) {
256
                SC_ATOMIC_ADD(f->livedev->bypassed,
257
                        pkts_tosrc + pkts_todst);
258
            }
259
            counters->bypassed_pkts += pkts_tosrc + pkts_todst;
260
            counters->bypassed_bytes += bytes_tosrc + bytes_todst;
261
            return false;
262
        }
263
        SCLogDebug("No new packet, dead flow %" PRIu64 "", FlowGetId(f));
264
        if (f->livedev) {
265
            if (FLOW_IS_IPV4(f)) {
266
                LiveDevSubBypassStats(f->livedev, 1, AF_INET);
267
            } else if (FLOW_IS_IPV6(f)) {
268
                LiveDevSubBypassStats(f->livedev, 1, AF_INET6);
269
            }
270
        }
271
        counters->bypassed_count++;
272
    }
273
    return true;
274
}
275
#endif /* CAPTURE_OFFLOAD */
276

277
typedef struct FlowManagerTimeoutThread {
278
    /* used to temporarily store flows that have timed out and are
279
     * removed from the hash to reduce locking contention */
280
    FlowQueuePrivate aside_queue;
281
} FlowManagerTimeoutThread;
282

283
/**
284
 * \internal
285
 *
286
 * \brief Process the temporary Aside Queue
287
 *        This means that as long as a flow f is not waiting on detection
288
 *        engine to finish dealing with it, f will be put in the recycle
289
 *        queue for further processing later on.
290
 *
291
 * \param td FM Timeout Thread instance
292
 * \param counters Flow Timeout counters to be updated
293
 *
294
 * \retval Number of flows that were recycled
295
 */
296
static uint32_t ProcessAsideQueue(FlowManagerTimeoutThread *td, FlowTimeoutCounters *counters)
297
{
868✔
298
    FlowQueuePrivate recycle = { NULL, NULL, 0 };
868✔
299
    counters->flows_aside += td->aside_queue.len;
868✔
300

301
    uint32_t cnt = 0;
868✔
302
    Flow *f;
868✔
303
    while ((f = FlowQueuePrivateGetFromTop(&td->aside_queue)) != NULL) {
1,754✔
304
        /* flow is still locked */
305

306
        if (f->proto == IPPROTO_TCP &&
886✔
307
                !(f->flags & (FLOW_TIMEOUT_REASSEMBLY_DONE | FLOW_ACTION_DROP)) &&
886✔
308
                !FlowIsBypassed(f) && FlowNeedsReassembly(f)) {
886✔
309
            /* Send the flow to its thread */
310
            FlowSendToLocalThread(f);
170✔
311
            FLOWLOCK_UNLOCK(f);
170✔
312
            /* flow ownership is already passed to the worker thread */
313

314
            counters->flows_aside_needs_work++;
170✔
315
            continue;
170✔
316
        }
170✔
317
        FLOWLOCK_UNLOCK(f);
716✔
318

319
        FlowQueuePrivateAppendFlow(&recycle, f);
716✔
320
        if (recycle.len == 100) {
716✔
321
            FlowQueueAppendPrivate(&flow_recycle_q, &recycle);
×
322
            FlowWakeupFlowRecyclerThread();
×
323
        }
×
324
        cnt++;
716✔
325
    }
716✔
326
    if (recycle.len) {
868✔
327
        FlowQueueAppendPrivate(&flow_recycle_q, &recycle);
703✔
328
        FlowWakeupFlowRecyclerThread();
703✔
329
    }
703✔
330
    return cnt;
868✔
331
}
868✔
332

333
/**
334
 *  \internal
335
 *
336
 *  \brief check all flows in a hash row for timing out
337
 *
338
 *  \param f last flow in the hash row
339
 *  \param ts timestamp
340
 *  \param emergency bool indicating emergency mode
341
 *  \param counters ptr to FlowTimeoutCounters structure
342
 */
343
static void FlowManagerHashRowTimeout(FlowManagerTimeoutThread *td, Flow *f, SCTime_t ts,
344
        int emergency, FlowTimeoutCounters *counters, uint32_t *next_ts)
345
{
2,892✔
346
    uint32_t checked = 0;
2,892✔
347
    Flow *prev_f = NULL;
2,892✔
348

349
    do {
2,894✔
350
        checked++;
2,894✔
351

352
        FLOWLOCK_WRLOCK(f);
2,894✔
353

354
        /* check flow timeout based on lastts and state. Both can be
355
         * accessed w/o Flow lock as we do have the hash row lock (so flow
356
         * can't disappear) and flow_state is atomic. lastts can only
357
         * be modified when we have both the flow and hash row lock */
358

359
        /* timeout logic goes here */
360
        if (!FlowManagerFlowTimeout(f, ts, next_ts, emergency)) {
2,894✔
361
            FLOWLOCK_UNLOCK(f);
2,008✔
362
            counters->flows_notimeout++;
2,008✔
363

364
            prev_f = f;
2,008✔
365
            f = f->next;
2,008✔
366
            continue;
2,008✔
367
        }
2,008✔
368

369
        Flow *next_flow = f->next;
886✔
370

371
#ifdef CAPTURE_OFFLOAD
59✔
372
        /* never prune a flow that is used by a packet we
373
         * are currently processing in one of the threads */
374
        if (!FlowBypassedTimeout(f, ts, counters)) {
59✔
375
            FLOWLOCK_UNLOCK(f);
376
            prev_f = f;
377
            f = f->next;
378
            continue;
379
        }
380
#endif
59✔
381
        f->flow_end_flags |= FLOW_END_FLAG_TIMEOUT;
886✔
382

383
        counters->flows_timeout++;
886✔
384

385
        RemoveFromHash(f, prev_f);
886✔
386

387
        FlowQueuePrivateAppendFlow(&td->aside_queue, f);
886✔
388
        /* flow is still locked in the queue */
389

390
        f = next_flow;
886✔
391
    } while (f != NULL);
2,894✔
392

393
    counters->flows_checked += checked;
2,579✔
394
    if (checked > counters->rows_maxlen)
2,892✔
395
        counters->rows_maxlen = checked;
617✔
396
}
2,892✔
397

398
/**
399
 * \internal
400
 *
401
 * \brief Clear evicted list from Flow Manager.
402
 *        All the evicted flows are removed from the Flow bucket and added
403
 *        to the temporary Aside Queue.
404
 *
405
 * \param td FM timeout thread instance
406
 * \param f head of the evicted list
407
 */
408
static void FlowManagerHashRowClearEvictedList(FlowManagerTimeoutThread *td, Flow *f)
409
{
×
410
    do {
×
411
        FLOWLOCK_WRLOCK(f);
×
412
        Flow *next_flow = f->next;
×
413
        f->next = NULL;
×
414
        f->fb = NULL;
×
415

416
        FlowQueuePrivateAppendFlow(&td->aside_queue, f);
×
417
        /* flow is still locked in the queue */
418

419
        f = next_flow;
×
420
    } while (f != NULL);
×
421
}
×
422

423
/**
424
 *  \brief time out flows from the hash
425
 *
426
 *  \param ts timestamp
427
 *  \param hash_min min hash index to consider
428
 *  \param hash_max max hash index to consider
429
 *  \param counters ptr to FlowTimeoutCounters structure
430
 *
431
 *  \retval cnt number of timed out flow
432
 */
433
static uint32_t FlowTimeoutHash(FlowManagerTimeoutThread *td, SCTime_t ts, const uint32_t hash_min,
434
        const uint32_t hash_max, FlowTimeoutCounters *counters)
435
{
4,671✔
436
    uint32_t cnt = 0;
4,671✔
437
    const int emergency = ((SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY));
4,671✔
438
    const uint32_t rows_checked = hash_max - hash_min;
4,671✔
439
    uint32_t rows_skipped = 0;
4,671✔
440
    uint32_t rows_empty = 0;
4,671✔
441

442
#if __WORDSIZE==64
4,671✔
443
#define BITS 64
603,877✔
444
#define TYPE uint64_t
603,877✔
445
#else
446
#define BITS 32
447
#define TYPE uint32_t
448
#endif
449

450
    const uint32_t ts_secs = (uint32_t)SCTIME_SECS(ts);
4,671✔
451
    for (uint32_t idx = hash_min; idx < hash_max; idx+=BITS) {
608,548✔
452
        TYPE check_bits = 0;
603,877✔
453
        const uint32_t check = MIN(BITS, (hash_max - idx));
603,877✔
454
        for (uint32_t i = 0; i < check; i++) {
39,045,941✔
455
            FlowBucket *fb = &flow_hash[idx+i];
38,442,064✔
456
            check_bits |= (TYPE)(SC_ATOMIC_LOAD_EXPLICIT(
38,442,064✔
457
                                         fb->next_ts, SC_ATOMIC_MEMORY_ORDER_RELAXED) <= ts_secs)
38,442,064✔
458
                          << (TYPE)i;
38,442,064✔
459
        }
38,442,064✔
460
        if (check_bits == 0)
603,877✔
461
            continue;
138,593✔
462

463
        for (uint32_t i = 0; i < check; i++) {
30,081,955✔
464
            FlowBucket *fb = &flow_hash[idx+i];
29,616,671✔
465
            if ((check_bits & ((TYPE)1 << (TYPE)i)) != 0 && SC_ATOMIC_GET(fb->next_ts) <= ts_secs) {
29,616,671✔
466
                FBLOCK_LOCK(fb);
29,483,411✔
467
                Flow *evicted = NULL;
29,483,411✔
468
                if (fb->evicted != NULL || fb->head != NULL) {
29,483,411✔
469
                    if (fb->evicted != NULL) {
2,892✔
470
                        /* transfer out of bucket so we can do additional work outside
471
                         * of the bucket lock */
472
                        evicted = fb->evicted;
×
473
                        fb->evicted = NULL;
×
474
                    }
×
475
                    if (fb->head != NULL) {
2,892✔
476
                        uint32_t next_ts = 0;
2,892✔
477
                        FlowManagerHashRowTimeout(td, fb->head, ts, emergency, counters, &next_ts);
2,892✔
478

479
                        if (SC_ATOMIC_GET(fb->next_ts) != next_ts)
2,892✔
480
                            SC_ATOMIC_SET(fb->next_ts, next_ts);
2,892✔
481
                    }
2,892✔
482
                    if (fb->evicted == NULL && fb->head == NULL) {
2,892✔
483
                        /* row is empty */
484
                        SC_ATOMIC_SET(fb->next_ts, UINT_MAX);
884✔
485
                    }
884✔
486
                } else {
29,480,519✔
487
                    SC_ATOMIC_SET(fb->next_ts, UINT_MAX);
29,480,519✔
488
                    rows_empty++;
29,480,519✔
489
                }
29,480,519✔
490
                FBLOCK_UNLOCK(fb);
29,483,411✔
491
                /* processed evicted list */
492
                if (evicted) {
29,483,411✔
493
                    FlowManagerHashRowClearEvictedList(td, evicted);
×
494
                }
×
495
            } else {
29,483,411✔
496
                rows_skipped++;
133,260✔
497
            }
133,260✔
498
        }
29,616,671✔
499
        if (td->aside_queue.len) {
465,284✔
500
            cnt += ProcessAsideQueue(td, counters);
868✔
501
        }
868✔
502
    }
465,284✔
503

504
    counters->rows_checked += rows_checked;
4,671✔
505
    counters->rows_skipped += rows_skipped;
4,671✔
506
    counters->rows_empty += rows_empty;
4,671✔
507

508
    if (td->aside_queue.len) {
4,671✔
509
        cnt += ProcessAsideQueue(td, counters);
×
510
    }
×
511
    counters->flows_removed += cnt;
4,671✔
512
    /* coverity[missing_unlock : FALSE] */
513
    return cnt;
4,671✔
514
}
4,671✔
515

516
/** \internal
517
 *
518
 *  \brief handle timeout for a slice of hash rows
519
 *         If we wrap around we call FlowTimeoutHash twice
520
 *  \param td FM timeout thread
521
 *  \param ts timeout timestamp
522
 *  \param hash_min lower bound of the row slice
523
 *  \param hash_max upper bound of the row slice
524
 *  \param counters Flow timeout counters to be passed
525
 *  \param rows number of rows for this worker unit
526
 *  \param pos absolute position of the beginning of row slice in the hash table
527
 *  \param instance instance id of this FM
528
 *
529
 *  \retval number of successfully timed out flows
530
 */
531
static uint32_t FlowTimeoutHashInChunks(FlowManagerTimeoutThread *td, SCTime_t ts,
532
        const uint32_t hash_min, const uint32_t hash_max, FlowTimeoutCounters *counters,
533
        const uint32_t rows, uint32_t *pos, const uint32_t instance)
534
{
4,522✔
535
    uint32_t start = 0;
4,522✔
536
    uint32_t end = 0;
4,522✔
537
    uint32_t cnt = 0;
4,522✔
538
    uint32_t rows_left = rows;
4,522✔
539

540
again:
4,671✔
541
    start = (*pos);
4,671✔
542
    if (start >= hash_max) {
4,671✔
543
        start = hash_min;
×
544
    }
×
545
    end = start + rows_left;
4,671✔
546
    if (end > hash_max) {
4,671✔
547
        end = hash_max;
149✔
548
    }
149✔
549
    *pos = (end == hash_max) ? hash_min : end;
4,671✔
550
    rows_left = rows_left - (end - start);
4,671✔
551

552
    SCLogDebug("instance %u: %u:%u (hash_min %u, hash_max %u *pos %u)", instance, start, end,
4,671✔
553
            hash_min, hash_max, *pos);
4,671✔
554

555
    cnt += FlowTimeoutHash(td, ts, start, end, counters);
4,671✔
556
    if (rows_left) {
4,671✔
557
        goto again;
149✔
558
    }
149✔
559
    return cnt;
4,522✔
560
}
4,671✔
561

562
/**
563
 *  \internal
564
 *
565
 *  \brief move all flows out of a hash row
566
 *
567
 *  \param f last flow in the hash row
568
 *  \param recycle_q Flow recycle queue
569
 *  \param mode emergency or not
570
 *
571
 *  \retval cnt number of flows removed from the hash and added to the recycle queue
572
 */
573
static uint32_t FlowManagerHashRowCleanup(Flow *f, FlowQueuePrivate *recycle_q, const int mode)
574
{
9,137✔
575
    uint32_t cnt = 0;
9,137✔
576

577
    do {
9,230✔
578
        FLOWLOCK_WRLOCK(f);
9,230✔
579

580
        Flow *next_flow = f->next;
9,230✔
581

582
        /* remove from the hash */
583
        if (mode == 0) {
9,230✔
584
            RemoveFromHash(f, NULL);
9,230✔
585
        } else {
9,230✔
586
            FlowBucket *fb = f->fb;
×
587
            fb->evicted = f->next;
×
588
            f->next = NULL;
×
589
            f->fb = NULL;
×
590
        }
×
591
        f->flow_end_flags |= FLOW_END_FLAG_SHUTDOWN;
9,230✔
592

593
        /* no one is referring to this flow, removed from hash
594
         * so we can unlock it and move it to the recycle queue. */
595
        FLOWLOCK_UNLOCK(f);
9,230✔
596
        FlowQueuePrivateAppendFlow(recycle_q, f);
9,230✔
597

598
        cnt++;
9,230✔
599

600
        f = next_flow;
9,230✔
601
    } while (f != NULL);
9,230✔
602

603
    return cnt;
9,137✔
604
}
9,137✔
605

606
#define RECYCLE_MAX_QUEUE_ITEMS 25
224,591,872✔
607
/**
608
 *  \brief remove all flows from the hash
609
 *
610
 *  \retval cnt number of removes out flows
611
 */
612
static uint32_t FlowCleanupHash(void)
613
{
3,427✔
614
    FlowQueuePrivate local_queue = { NULL, NULL, 0 };
3,427✔
615
    uint32_t cnt = 0;
3,427✔
616

617
    for (uint32_t idx = 0; idx < flow_config.hash_size; idx++) {
224,595,299✔
618
        FlowBucket *fb = &flow_hash[idx];
224,591,872✔
619

620
        FBLOCK_LOCK(fb);
224,591,872✔
621

622
        if (fb->head != NULL) {
224,591,872✔
623
            /* we have a flow, or more than one */
624
            cnt += FlowManagerHashRowCleanup(fb->head, &local_queue, 0);
9,137✔
625
        }
9,137✔
626
        if (fb->evicted != NULL) {
224,591,872✔
627
            /* we have a flow, or more than one */
628
            cnt += FlowManagerHashRowCleanup(fb->evicted, &local_queue, 1);
×
629
        }
×
630

631
        FBLOCK_UNLOCK(fb);
224,591,872✔
632
        if (local_queue.len >= RECYCLE_MAX_QUEUE_ITEMS) {
224,591,872✔
633
            FlowQueueAppendPrivate(&flow_recycle_q, &local_queue);
206✔
634
            FlowWakeupFlowRecyclerThread();
206✔
635
        }
206✔
636
    }
224,591,872✔
637
    DEBUG_VALIDATE_BUG_ON(local_queue.len >= RECYCLE_MAX_QUEUE_ITEMS);
3,427✔
638
    FlowQueueAppendPrivate(&flow_recycle_q, &local_queue);
3,427✔
639
    FlowWakeupFlowRecyclerThread();
3,427✔
640

641
    return cnt;
3,427✔
642
}
3,427✔
643

644
typedef struct FlowCounters_ {
645
    StatsCounterId flow_mgr_full_pass;
646
    StatsCounterId flow_mgr_rows_sec;
647

648
    StatsCounterId flow_mgr_spare;
649
    StatsCounterId flow_emerg_mode_enter;
650
    StatsCounterId flow_emerg_mode_over;
651

652
    StatsCounterId flow_mgr_flows_checked;
653
    StatsCounterId flow_mgr_flows_notimeout;
654
    StatsCounterId flow_mgr_flows_timeout;
655
    StatsCounterId flow_mgr_flows_aside;
656
    StatsCounterId flow_mgr_flows_aside_needs_work;
657

658
    StatsCounterMaxId flow_mgr_rows_maxlen;
659

660
    StatsCounterId flow_bypassed_cnt_clo;
661
    StatsCounterId flow_bypassed_pkts;
662
    StatsCounterId flow_bypassed_bytes;
663

664
    StatsCounterId memcap_pressure;
665
    StatsCounterMaxId memcap_pressure_max;
666
} FlowCounters;
667

668
typedef struct FlowManagerThreadData_ {
669
    uint32_t instance;
670
    uint32_t min;
671
    uint32_t max;
672

673
    FlowCounters cnt;
674

675
    FlowManagerTimeoutThread timeout;
676
    StatsCounterId counter_defrag_timeout;
677
    StatsCounterId counter_defrag_memuse;
678
} FlowManagerThreadData;
679

680
static void FlowCountersInit(ThreadVars *t, FlowCounters *fc)
681
{
3,427✔
682
    fc->flow_mgr_full_pass = StatsRegisterCounter("flow.mgr.full_hash_pass", &t->stats);
3,427✔
683
    fc->flow_mgr_rows_sec = StatsRegisterCounter("flow.mgr.rows_per_sec", &t->stats);
3,427✔
684

685
    fc->flow_mgr_spare = StatsRegisterCounter("flow.spare", &t->stats);
3,427✔
686
    fc->flow_emerg_mode_enter = StatsRegisterCounter("flow.emerg_mode_entered", &t->stats);
3,427✔
687
    fc->flow_emerg_mode_over = StatsRegisterCounter("flow.emerg_mode_over", &t->stats);
3,427✔
688

689
    fc->flow_mgr_rows_maxlen = StatsRegisterMaxCounter("flow.mgr.rows_maxlen", &t->stats);
3,427✔
690
    fc->flow_mgr_flows_checked = StatsRegisterCounter("flow.mgr.flows_checked", &t->stats);
3,427✔
691
    fc->flow_mgr_flows_notimeout = StatsRegisterCounter("flow.mgr.flows_notimeout", &t->stats);
3,427✔
692
    fc->flow_mgr_flows_timeout = StatsRegisterCounter("flow.mgr.flows_timeout", &t->stats);
3,427✔
693
    fc->flow_mgr_flows_aside = StatsRegisterCounter("flow.mgr.flows_evicted", &t->stats);
3,427✔
694
    fc->flow_mgr_flows_aside_needs_work =
3,427✔
695
            StatsRegisterCounter("flow.mgr.flows_evicted_needs_work", &t->stats);
3,427✔
696

697
    fc->flow_bypassed_cnt_clo = StatsRegisterCounter("flow_bypassed.closed", &t->stats);
3,427✔
698
    fc->flow_bypassed_pkts = StatsRegisterCounter("flow_bypassed.pkts", &t->stats);
3,427✔
699
    fc->flow_bypassed_bytes = StatsRegisterCounter("flow_bypassed.bytes", &t->stats);
3,427✔
700

701
    fc->memcap_pressure = StatsRegisterCounter("memcap.pressure", &t->stats);
3,427✔
702
    fc->memcap_pressure_max = StatsRegisterMaxCounter("memcap.pressure_max", &t->stats);
3,427✔
703
}
3,427✔
704

705
static void FlowCountersUpdate(
706
        ThreadVars *th_v, const FlowManagerThreadData *ftd, const FlowTimeoutCounters *counters)
707
{
4,522✔
708
    StatsCounterAddI64(
4,522✔
709
            &th_v->stats, ftd->cnt.flow_mgr_flows_checked, (int64_t)counters->flows_checked);
4,522✔
710
    StatsCounterAddI64(
4,522✔
711
            &th_v->stats, ftd->cnt.flow_mgr_flows_notimeout, (int64_t)counters->flows_notimeout);
4,522✔
712

713
    StatsCounterAddI64(
4,522✔
714
            &th_v->stats, ftd->cnt.flow_mgr_flows_timeout, (int64_t)counters->flows_timeout);
4,522✔
715
    StatsCounterAddI64(&th_v->stats, ftd->cnt.flow_mgr_flows_aside, (int64_t)counters->flows_aside);
4,522✔
716
    StatsCounterAddI64(&th_v->stats, ftd->cnt.flow_mgr_flows_aside_needs_work,
4,522✔
717
            (int64_t)counters->flows_aside_needs_work);
4,522✔
718

719
    StatsCounterAddI64(
4,522✔
720
            &th_v->stats, ftd->cnt.flow_bypassed_cnt_clo, (int64_t)counters->bypassed_count);
4,522✔
721
    StatsCounterAddI64(&th_v->stats, ftd->cnt.flow_bypassed_pkts, (int64_t)counters->bypassed_pkts);
4,522✔
722
    StatsCounterAddI64(
4,522✔
723
            &th_v->stats, ftd->cnt.flow_bypassed_bytes, (int64_t)counters->bypassed_bytes);
4,522✔
724

725
    StatsCounterMaxUpdateI64(
4,522✔
726
            &th_v->stats, ftd->cnt.flow_mgr_rows_maxlen, (int64_t)counters->rows_maxlen);
4,522✔
727
}
4,522✔
728

729
static TmEcode FlowManagerThreadInit(ThreadVars *t, const void *initdata, void **data)
730
{
3,427✔
731
    FlowManagerThreadData *ftd = SCCalloc(1, sizeof(FlowManagerThreadData));
3,427✔
732
    if (ftd == NULL)
3,427✔
733
        return TM_ECODE_FAILED;
×
734

735
    ftd->instance = SC_ATOMIC_ADD(flowmgr_cnt, 1);
3,427✔
736
    SCLogDebug("flow manager instance %u", ftd->instance);
3,427✔
737

738
    /* set the min and max value used for hash row walking
739
     * each thread has it's own section of the flow hash */
740
    uint32_t range = flow_config.hash_size / flowmgr_number;
3,427✔
741

742
    ftd->min = ftd->instance * range;
3,427✔
743
    ftd->max = (ftd->instance + 1) * range;
3,427✔
744

745
    /* last flow-manager takes on hash_size % flowmgr_number extra rows */
746
    if ((ftd->instance + 1) == flowmgr_number) {
3,427✔
747
        ftd->max = flow_config.hash_size;
3,427✔
748
    }
3,427✔
749
    BUG_ON(ftd->min > flow_config.hash_size || ftd->max > flow_config.hash_size);
3,427✔
750

751
    SCLogDebug("instance %u hash range %u %u", ftd->instance, ftd->min, ftd->max);
3,427✔
752

753
    /* pass thread data back to caller */
754
    *data = ftd;
3,427✔
755

756
    FlowCountersInit(t, &ftd->cnt);
3,427✔
757
    ftd->counter_defrag_timeout = StatsRegisterCounter("defrag.mgr.tracker_timeout", &t->stats);
3,427✔
758
    ftd->counter_defrag_memuse = StatsRegisterCounter("defrag.memuse", &t->stats);
3,427✔
759

760
    PacketPoolInit();
3,427✔
761
    return TM_ECODE_OK;
3,427✔
762
}
3,427✔
763

764
static TmEcode FlowManagerThreadDeinit(ThreadVars *t, void *data)
765
{
3,427✔
766
    StreamTcpThreadCacheCleanup();
3,427✔
767
    PacketPoolDestroy();
3,427✔
768
    SCFree(data);
3,427✔
769
    return TM_ECODE_OK;
3,427✔
770
}
3,427✔
771

772
/** \internal
773
 *  \brief calculate number of rows to scan and how much time to sleep
774
 *         based on the busy score `mp` (0 idle, 100 max busy).
775
 *
776
 *  We try to to make sure we scan the hash once a second. The number size
777
 *  of the slice of the hash scanned is determined by our busy score 'mp'.
778
 *  We sleep for the remainder of the second after processing the slice,
779
 *  or at least an approximation of it.
780
 *  A minimum busy score of 10 is assumed to avoid a longer than 10 second
781
 *  full hash pass. This is to avoid burstiness in scanning when there is
782
 *  a rapid increase of the busy score, which could lead to the flow manager
783
 *  suddenly scanning a much larger slice of the hash leading to a burst
784
 *  in scan/eviction work.
785
 *
786
 *  \param rows number of rows for the work unit
787
 *  \param mp current memcap pressure value
788
 *  \param emergency emergency mode is set or not
789
 *  \param wu_sleep holds value of sleep time per worker unit
790
 *  \param wu_rows holds value of calculated rows to be processed per second
791
 *  \param rows_sec same as wu_rows, only used for counter updates
792
 */
793
static void GetWorkUnitSizing(const uint32_t rows, const uint32_t mp, const bool emergency,
794
        uint64_t *wu_sleep, uint32_t *wu_rows, uint32_t *rows_sec)
795
{
7,949✔
796
    if (emergency) {
7,949✔
797
        *wu_rows = rows;
×
798
        *wu_sleep = 250;
×
799
        return;
×
800
    }
×
801
    /* minimum busy score is 10 */
802
    const uint32_t emp = MAX(mp, 10);
7,949✔
803
    const uint32_t rows_per_sec = (uint32_t)((float)rows * (float)((float)emp / (float)100));
7,949✔
804
    /* calc how much time we estimate the work will take, in ms. We assume
805
     * each row takes an average of 1usec. Maxing out at 1sec. */
806
    const uint32_t work_per_unit = MIN(rows_per_sec / 1000, 1000);
7,949✔
807
    /* calc how much time we need to sleep to get to the per second cadence
808
     * but sleeping for at least 250ms. */
809
    const uint32_t sleep_per_unit = MAX(250, 1000 - work_per_unit);
7,949✔
810
    SCLogDebug("mp %u emp %u rows %u rows_sec %u sleep %ums", mp, emp, rows, rows_per_sec,
7,949✔
811
            sleep_per_unit);
7,949✔
812

813
    *wu_sleep = sleep_per_unit;
7,949✔
814
    *wu_rows = rows_per_sec;
7,949✔
815
    *rows_sec = rows_per_sec;
7,949✔
816
}
7,949✔
817

818
/** \brief Thread that manages the flow table and times out flows.
819
 *
820
 *  \param td ThreadVars cast to void ptr
821
 *
822
 *  Keeps an eye on the spare list, alloc flows if needed...
823
 */
824
static TmEcode FlowManager(ThreadVars *th_v, void *thread_data)
825
{
3,427✔
826
    FlowManagerThreadData *ftd = thread_data;
3,427✔
827
    const uint32_t rows = ftd->max - ftd->min;
3,427✔
828
    const bool time_is_live = TimeModeIsLive();
3,427✔
829

830
    uint32_t emerg_over_cnt = 0;
3,427✔
831
    uint64_t next_run_ms = 0;
3,427✔
832
    uint32_t pos = ftd->min;
3,427✔
833
    uint32_t rows_sec = 0;
3,427✔
834
    uint32_t rows_per_wu = 0;
3,427✔
835
    uint64_t sleep_per_wu = 0;
3,427✔
836
    bool prev_emerg = false;
3,427✔
837
    uint32_t other_last_sec = 0; /**< last sec stamp when defrag etc ran */
3,427✔
838

839
    uint32_t mp = MemcapsGetPressure() * 100;
3,427✔
840
    if (ftd->instance == 0) {
3,427✔
841
        StatsCounterSetI64(&th_v->stats, ftd->cnt.memcap_pressure, mp);
3,427✔
842
        StatsCounterMaxUpdateI64(&th_v->stats, ftd->cnt.memcap_pressure_max, (int64_t)mp);
3,427✔
843
    }
3,427✔
844
    GetWorkUnitSizing(rows, mp, false, &sleep_per_wu, &rows_per_wu, &rows_sec);
3,427✔
845
    StatsCounterSetI64(&th_v->stats, ftd->cnt.flow_mgr_rows_sec, rows_sec);
3,427✔
846

847
    TmThreadsSetFlag(th_v, THV_RUNNING);
3,427✔
848
    /* don't start our activities until time is setup */
849
    while (!TimeModeIsReady()) {
26,368✔
850
        if (suricata_ctl_flags != 0)
22,971✔
851
            return TM_ECODE_OK;
30✔
852
        SleepUsec(10);
22,941✔
853
    }
22,941✔
854
    bool run = TmThreadsWaitForUnpause(th_v);
3,427✔
855

856
    while (run) {
329,368✔
857
        bool emerg = ((SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY) != 0);
329,365✔
858

859
        /* Get the time: real time in live mode, or a min() of the
860
         * "active" threads in offline mode. See TmThreadsGetMinimalTimestamp */
861
        SCTime_t ts = TimeGet();
329,365✔
862

863
        SCLogDebug("ts %" PRIdMAX "", (intmax_t)SCTIME_SECS(ts));
329,365✔
864
        uint64_t ts_ms = SCTIME_MSECS(ts);
329,365✔
865
        const bool emerge_p = (emerg && !prev_emerg);
329,365✔
866
        if (emerge_p) {
329,365✔
867
            next_run_ms = 0;
×
868
            prev_emerg = true;
×
869
            SCLogNotice("Flow emergency mode entered...");
×
870
            StatsCounterIncr(&th_v->stats, ftd->cnt.flow_emerg_mode_enter);
×
871
        }
×
872
        if (ts_ms >= next_run_ms) {
329,365✔
873
            if (ftd->instance == 0) {
4,522✔
874
                const uint32_t sq_len = FlowSpareGetPoolSize();
4,522✔
875
                const uint32_t spare_perc = sq_len * 100 / MAX(flow_config.prealloc, 1);
4,522✔
876
                /* see if we still have enough spare flows */
877
                if (spare_perc < 90 || spare_perc > 110) {
4,522✔
878
                    FlowSparePoolUpdate(sq_len);
×
879
                }
×
880
            }
4,522✔
881

882
            /* try to time out flows */
883
            // clang-format off
884
            FlowTimeoutCounters counters = { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, };
4,522✔
885
            // clang-format on
886

887
            if (emerg) {
4,522✔
888
                /* in emergency mode, do a full pass of the hash table */
889
                FlowTimeoutHash(&ftd->timeout, ts, ftd->min, ftd->max, &counters);
×
890
                StatsCounterIncr(&th_v->stats, ftd->cnt.flow_mgr_full_pass);
×
891
            } else {
4,522✔
892
                SCLogDebug("hash %u:%u slice starting at %u with %u rows", ftd->min, ftd->max, pos,
4,522✔
893
                        rows_per_wu);
4,522✔
894

895
                const uint32_t ppos = pos;
4,522✔
896
                FlowTimeoutHashInChunks(&ftd->timeout, ts, ftd->min, ftd->max, &counters,
4,522✔
897
                        rows_per_wu, &pos, ftd->instance);
4,522✔
898
                if (ppos > pos) {
4,522✔
899
                    StatsCounterIncr(&th_v->stats, ftd->cnt.flow_mgr_full_pass);
149✔
900
                }
149✔
901
            }
4,522✔
902

903
            const uint32_t spare_pool_len = FlowSpareGetPoolSize();
4,522✔
904
            StatsCounterSetI64(&th_v->stats, ftd->cnt.flow_mgr_spare, (uint64_t)spare_pool_len);
4,522✔
905

906
            FlowCountersUpdate(th_v, ftd, &counters);
4,522✔
907

908
            if (emerg) {
4,522✔
909
                SCLogDebug("flow_sparse_q.len = %" PRIu32 " prealloc: %" PRIu32
×
910
                           "flow_spare_q status: %" PRIu32 "%% flows at the queue",
×
911
                        spare_pool_len, flow_config.prealloc,
×
912
                        spare_pool_len * 100 / MAX(flow_config.prealloc, 1));
×
913

914
                /* only if we have pruned this "emergency_recovery" percentage
915
                 * of flows, we will unset the emergency bit */
916
                if ((spare_pool_len * 100 / MAX(flow_config.prealloc, 1)) >
×
917
                        flow_config.emergency_recovery) {
×
918
                    emerg_over_cnt++;
×
919
                } else {
×
920
                    emerg_over_cnt = 0;
×
921
                }
×
922

923
                if (emerg_over_cnt >= 30) {
×
924
                    SC_ATOMIC_AND(flow_flags, ~FLOW_EMERGENCY);
×
925
                    FlowTimeoutsReset();
×
926

927
                    emerg = false;
×
928
                    prev_emerg = false;
×
929
                    emerg_over_cnt = 0;
×
930
                    SCLogNotice("Flow emergency mode over, back to normal... unsetting"
×
931
                                " FLOW_EMERGENCY bit (ts.tv_sec: %" PRIuMAX ", "
×
932
                                "ts.tv_usec:%" PRIuMAX ") flow_spare_q status(): %" PRIu32
×
933
                                "%% flows at the queue",
×
934
                            (uintmax_t)SCTIME_SECS(ts), (uintmax_t)SCTIME_USECS(ts),
×
935
                            spare_pool_len * 100 / MAX(flow_config.prealloc, 1));
×
936

937
                    StatsCounterIncr(&th_v->stats, ftd->cnt.flow_emerg_mode_over);
×
938
                }
×
939
            }
×
940

941
            /* update work units */
942
            const uint32_t pmp = mp;
4,522✔
943
            mp = MemcapsGetPressure() * 100;
4,522✔
944
            if (ftd->instance == 0) {
4,522✔
945
                StatsCounterSetI64(&th_v->stats, ftd->cnt.memcap_pressure, mp);
4,522✔
946
                StatsCounterMaxUpdateI64(&th_v->stats, ftd->cnt.memcap_pressure_max, (int64_t)mp);
4,522✔
947
            }
4,522✔
948
            GetWorkUnitSizing(rows, mp, emerg, &sleep_per_wu, &rows_per_wu, &rows_sec);
4,522✔
949
            if (pmp != mp) {
4,522✔
950
                StatsCounterSetI64(&th_v->stats, ftd->cnt.flow_mgr_rows_sec, rows_sec);
×
951
            }
×
952

953
            next_run_ms = ts_ms + sleep_per_wu;
4,522✔
954
        }
4,522✔
955
        if (other_last_sec == 0 || other_last_sec < (uint32_t)SCTIME_SECS(ts)) {
329,365✔
956
            if (ftd->instance == 0) {
5,287✔
957
                StatsCounterSetI64(
5,287✔
958
                        &th_v->stats, ftd->counter_defrag_memuse, DefragTrackerGetMemcap());
5,287✔
959
                uint32_t defrag_cnt = DefragTimeoutHash(ts);
5,287✔
960
                if (defrag_cnt) {
5,287✔
961
                    StatsCounterAddI64(
59✔
962
                            &th_v->stats, ftd->counter_defrag_timeout, (int64_t)defrag_cnt);
59✔
963
                }
59✔
964
                HostTimeoutHash(ts);
5,287✔
965
                IPPairTimeoutHash(ts);
5,287✔
966
                HttpRangeContainersTimeoutHash(ts);
5,287✔
967
                ThresholdsExpire(ts);
5,287✔
968
                other_last_sec = (uint32_t)SCTIME_SECS(ts);
5,287✔
969
            }
5,287✔
970
        }
5,287✔
971

972
        if (TmThreadsCheckFlag(th_v, THV_KILL)) {
329,365✔
973
            StatsSyncCounters(&th_v->stats);
3,394✔
974
            break;
3,394✔
975
        }
3,394✔
976

977
        if (emerg || !time_is_live) {
325,971✔
978
            SleepUsec(250);
325,226✔
979
        } else {
325,971✔
980
            struct timeval cond_tv;
745✔
981
            gettimeofday(&cond_tv, NULL);
745✔
982
            struct timeval add_tv;
745✔
983
            add_tv.tv_sec = sleep_per_wu / 1000;
745✔
984
            add_tv.tv_usec = (sleep_per_wu % 1000) * 1000;
745✔
985
            timeradd(&cond_tv, &add_tv, &cond_tv);
745✔
986

987
            struct timespec cond_time = FROM_TIMEVAL(cond_tv);
745✔
988
            SCCtrlMutexLock(&flow_manager_ctrl_mutex);
745✔
989
            while (1) {
745✔
990
                if (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY) {
745✔
991
                    break;
×
992
                }
×
993
                int rc = SCCtrlCondTimedwait(
745✔
994
                        &flow_manager_ctrl_cond, &flow_manager_ctrl_mutex, &cond_time);
745✔
995
                if (rc == ETIMEDOUT || rc < 0) {
745✔
996
                    break;
745✔
997
                }
745✔
998
            }
745✔
999
            SCCtrlMutexUnlock(&flow_manager_ctrl_mutex);
745✔
1000
        }
745✔
1001

1002
        SCLogDebug("woke up... %s", SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY ? "emergency":"");
325,971✔
1003

1004
        StatsSyncCountersIfSignalled(&th_v->stats);
325,971✔
1005
    }
325,971✔
1006
    return TM_ECODE_OK;
3,397✔
1007
}
3,427✔
1008

1009
/** \brief spawn the flow manager thread */
1010
void FlowManagerThreadSpawn(void)
1011
{
3,427✔
1012
    intmax_t setting = 1;
3,427✔
1013
    (void)SCConfGetInt("flow.managers", &setting);
3,427✔
1014

1015
    if (setting < 1 || setting > 1024) {
3,427✔
1016
        FatalError("invalid flow.managers setting %" PRIdMAX, setting);
×
1017
    }
×
1018
    flowmgr_number = (uint32_t)setting;
3,427✔
1019

1020
    SCLogConfig("using %u flow manager threads", flowmgr_number);
3,427✔
1021
    StatsRegisterGlobalCounter("flow.memuse", FlowGetMemuse);
3,427✔
1022

1023
    for (uint32_t u = 0; u < flowmgr_number; u++) {
6,854✔
1024
        char name[TM_THREAD_NAME_MAX];
3,427✔
1025
        snprintf(name, sizeof(name), "%s#%02u", thread_name_flow_mgr, u+1);
3,427✔
1026

1027
        ThreadVars *tv_flowmgr = TmThreadCreateMgmtThreadByName(name,
3,427✔
1028
                "FlowManager", 0);
3,427✔
1029
        BUG_ON(tv_flowmgr == NULL);
3,427✔
1030

1031
        if (tv_flowmgr == NULL) {
3,427✔
1032
            FatalError("flow manager thread creation failed");
×
1033
        }
×
1034
        if (TmThreadSpawn(tv_flowmgr) != TM_ECODE_OK) {
3,427✔
1035
            FatalError("flow manager thread spawn failed");
×
1036
        }
×
1037
    }
3,427✔
1038
}
3,427✔
1039

1040
typedef struct FlowRecyclerThreadData_ {
1041
    void *output_thread_data;
1042

1043
    StatsCounterId counter_flows;
1044
    StatsCounterAvgId counter_queue_avg;
1045
    StatsCounterMaxId counter_queue_max;
1046

1047
    StatsCounterId counter_flow_active;
1048
    StatsCounterId counter_tcp_active_sessions;
1049
    FlowEndCounters fec;
1050
} FlowRecyclerThreadData;
1051

1052
static TmEcode FlowRecyclerThreadInit(ThreadVars *t, const void *initdata, void **data)
1053
{
3,427✔
1054
    FlowRecyclerThreadData *ftd = SCCalloc(1, sizeof(FlowRecyclerThreadData));
3,427✔
1055
    if (ftd == NULL)
3,427✔
1056
        return TM_ECODE_FAILED;
×
1057
    if (OutputFlowLogThreadInit(t, &ftd->output_thread_data) != TM_ECODE_OK) {
3,427✔
1058
        SCLogError("initializing flow log API for thread failed");
×
1059
        SCFree(ftd);
×
1060
        return TM_ECODE_FAILED;
×
1061
    }
×
1062
    SCLogDebug("output_thread_data %p", ftd->output_thread_data);
3,427✔
1063

1064
    ftd->counter_flows = StatsRegisterCounter("flow.recycler.recycled", &t->stats);
3,427✔
1065
    ftd->counter_queue_avg = StatsRegisterAvgCounter("flow.recycler.queue_avg", &t->stats);
3,427✔
1066
    ftd->counter_queue_max = StatsRegisterMaxCounter("flow.recycler.queue_max", &t->stats);
3,427✔
1067

1068
    ftd->counter_flow_active = StatsRegisterCounter("flow.active", &t->stats);
3,427✔
1069
    ftd->counter_tcp_active_sessions = StatsRegisterCounter("tcp.active_sessions", &t->stats);
3,427✔
1070

1071
    FlowEndCountersRegister(t, &ftd->fec);
3,427✔
1072

1073
    *data = ftd;
3,427✔
1074
    return TM_ECODE_OK;
3,427✔
1075
}
3,427✔
1076

1077
static TmEcode FlowRecyclerThreadDeinit(ThreadVars *t, void *data)
1078
{
3,427✔
1079
    StreamTcpThreadCacheCleanup();
3,427✔
1080

1081
    FlowRecyclerThreadData *ftd = (FlowRecyclerThreadData *)data;
3,427✔
1082
    if (ftd->output_thread_data != NULL)
3,427✔
1083
        OutputFlowLogThreadDeinit(t, ftd->output_thread_data);
3,427✔
1084

1085
    SCFree(data);
3,427✔
1086
    return TM_ECODE_OK;
3,427✔
1087
}
3,427✔
1088

1089
static void Recycler(ThreadVars *tv, FlowRecyclerThreadData *ftd, Flow *f)
1090
{
9,946✔
1091
    FLOWLOCK_WRLOCK(f);
9,946✔
1092

1093
    (void)OutputFlowLog(tv, ftd->output_thread_data, f);
9,946✔
1094

1095
    FlowEndCountersUpdate(tv, &ftd->fec, f);
9,946✔
1096
    if (f->proto == IPPROTO_TCP && f->protoctx != NULL) {
9,946✔
1097
        StatsCounterDecr(&tv->stats, ftd->counter_tcp_active_sessions);
4,701✔
1098
    }
4,701✔
1099
    StatsCounterDecr(&tv->stats, ftd->counter_flow_active);
9,946✔
1100
    SCFlowRunFinishCallbacks(tv, f);
9,946✔
1101
    FlowClearMemory(f, f->protomap);
9,946✔
1102
    FLOWLOCK_UNLOCK(f);
9,946✔
1103
}
9,946✔
1104

1105
/** \brief Thread that manages timed out flows.
1106
 *
1107
 *  \param td ThreadVars cast to void ptr
1108
 */
1109
static TmEcode FlowRecycler(ThreadVars *th_v, void *thread_data)
1110
{
3,427✔
1111
    FlowRecyclerThreadData *ftd = (FlowRecyclerThreadData *)thread_data;
3,427✔
1112
    BUG_ON(ftd == NULL);
3,427✔
1113
    const bool time_is_live = TimeModeIsLive();
3,427✔
1114
    uint64_t recycled_cnt = 0;
3,427✔
1115
    FlowQueuePrivate ret_queue = { NULL, NULL, 0 };
3,427✔
1116

1117
    TmThreadsSetFlag(th_v, THV_RUNNING);
3,427✔
1118
    bool run = TmThreadsWaitForUnpause(th_v);
3,427✔
1119

1120
    while (run) {
533,583✔
1121
        SC_ATOMIC_ADD(flowrec_busy,1);
533,583✔
1122
        FlowQueuePrivate list = FlowQueueExtractPrivate(&flow_recycle_q);
533,583✔
1123

1124
        StatsCounterAvgAddI64(&th_v->stats, ftd->counter_queue_avg, (int64_t)list.len);
533,583✔
1125
        StatsCounterMaxUpdateI64(&th_v->stats, ftd->counter_queue_max, (int64_t)list.len);
533,583✔
1126

1127
        const int bail = (TmThreadsCheckFlag(th_v, THV_KILL));
533,583✔
1128

1129
        /* Get the time */
1130
        SCLogDebug("ts %" PRIdMAX "", (intmax_t)SCTIME_SECS(TimeGet()));
533,583✔
1131

1132
        int64_t cnt = 0;
533,583✔
1133
        Flow *f;
533,583✔
1134
        while ((f = FlowQueuePrivateGetFromTop(&list)) != NULL) {
543,529✔
1135
            Recycler(th_v, ftd, f);
9,946✔
1136
            cnt++;
9,946✔
1137

1138
            /* for every full sized block, add it to the spare pool */
1139
            FlowQueuePrivateAppendFlow(&ret_queue, f);
9,946✔
1140
            if (ret_queue.len == FLOW_SPARE_POOL_BLOCK_SIZE) {
9,946✔
1141
                FlowSparePoolReturnFlows(&ret_queue);
22✔
1142
            }
22✔
1143
        }
9,946✔
1144
        if (ret_queue.len > 0) {
533,583✔
1145
            FlowSparePoolReturnFlows(&ret_queue);
2,520✔
1146
        }
2,520✔
1147
        if (cnt > 0) {
533,583✔
1148
            recycled_cnt += cnt;
2,522✔
1149
            StatsCounterAddI64(&th_v->stats, ftd->counter_flows, cnt);
2,522✔
1150
        }
2,522✔
1151
        SC_ATOMIC_SUB(flowrec_busy,1);
533,583✔
1152

1153
        if (bail) {
533,583✔
1154
            break;
3,427✔
1155
        }
3,427✔
1156

1157
        const bool emerg = (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY);
530,156✔
1158
        if (emerg || !time_is_live) {
530,156✔
1159
            SleepUsec(250);
529,392✔
1160
        } else {
530,156✔
1161
            struct timeval cond_tv;
764✔
1162
            gettimeofday(&cond_tv, NULL);
764✔
1163
            cond_tv.tv_sec += 1;
764✔
1164
            struct timespec cond_time = FROM_TIMEVAL(cond_tv);
764✔
1165
            SCCtrlMutexLock(&flow_recycler_ctrl_mutex);
764✔
1166
            while (1) {
13,790✔
1167
                if (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY) {
13,790✔
1168
                    break;
×
1169
                }
×
1170
                if (SC_ATOMIC_GET(flow_recycle_q.non_empty)) {
13,790✔
1171
                    break;
45✔
1172
                }
45✔
1173
                int rc = SCCtrlCondTimedwait(
13,745✔
1174
                        &flow_recycler_ctrl_cond, &flow_recycler_ctrl_mutex, &cond_time);
13,745✔
1175
                if (rc == ETIMEDOUT || rc < 0) {
13,745✔
1176
                    break;
719✔
1177
                }
719✔
1178
            }
13,745✔
1179
            SCCtrlMutexUnlock(&flow_recycler_ctrl_mutex);
764✔
1180
        }
764✔
1181

1182
        SCLogDebug("woke up...");
530,156✔
1183

1184
        StatsSyncCountersIfSignalled(&th_v->stats);
530,156✔
1185
    }
530,156✔
1186
    StatsSyncCounters(&th_v->stats);
3,427✔
1187
    SCLogPerf("%"PRIu64" flows processed", recycled_cnt);
3,427✔
1188
    return TM_ECODE_OK;
3,427✔
1189
}
3,427✔
1190

1191
static bool FlowRecyclerReadyToShutdown(void)
1192
{
8,735✔
1193
    if (SC_ATOMIC_GET(flowrec_busy) != 0) {
8,735✔
1194
        return false;
2,181✔
1195
    }
2,181✔
1196
    uint32_t len = 0;
6,554✔
1197
    FQLOCK_LOCK(&flow_recycle_q);
6,554✔
1198
    len = flow_recycle_q.qlen;
6,554✔
1199
    FQLOCK_UNLOCK(&flow_recycle_q);
6,554✔
1200

1201
    return ((len == 0));
6,554✔
1202
}
8,735✔
1203

1204
/** \brief spawn the flow recycler thread */
1205
void FlowRecyclerThreadSpawn(void)
1206
{
3,427✔
1207
    intmax_t setting = 1;
3,427✔
1208
    (void)SCConfGetInt("flow.recyclers", &setting);
3,427✔
1209

1210
    if (setting < 1 || setting > 1024) {
3,427✔
1211
        FatalError("invalid flow.recyclers setting %" PRIdMAX, setting);
×
1212
    }
×
1213
    flowrec_number = (uint32_t)setting;
3,427✔
1214

1215
    SCLogConfig("using %u flow recycler threads", flowrec_number);
3,427✔
1216

1217
    for (uint32_t u = 0; u < flowrec_number; u++) {
6,854✔
1218
        char name[TM_THREAD_NAME_MAX];
3,427✔
1219
        snprintf(name, sizeof(name), "%s#%02u", thread_name_flow_rec, u+1);
3,427✔
1220

1221
        ThreadVars *tv_flowrec = TmThreadCreateMgmtThreadByName(name,
3,427✔
1222
                "FlowRecycler", 0);
3,427✔
1223

1224
        if (tv_flowrec == NULL) {
3,427✔
1225
            FatalError("flow recycler thread creation failed");
×
1226
        }
×
1227
        if (TmThreadSpawn(tv_flowrec) != TM_ECODE_OK) {
3,427✔
1228
            FatalError("flow recycler thread spawn failed");
×
1229
        }
×
1230
    }
3,427✔
1231
}
3,427✔
1232

1233
/**
1234
 * \brief Used to disable flow recycler thread(s).
1235
 *
1236
 * \note this should only be called when the flow manager is already gone
1237
 *
1238
 * \todo Kinda hackish since it uses the tv name to identify flow recycler
1239
 *       thread.  We need an all weather identification scheme.
1240
 */
1241
void FlowDisableFlowRecyclerThread(void)
1242
{
3,427✔
1243
    /* move all flows still in the hash to the recycler queue */
1244
#ifndef DEBUG
3,427✔
1245
    (void)FlowCleanupHash();
3,427✔
1246
#else
1247
    uint32_t flows = FlowCleanupHash();
1248
    SCLogDebug("flows to progress: %u", flows);
1249
#endif
1250

1251
    /* make sure all flows are processed */
1252
    do {
8,735✔
1253
        FlowWakeupFlowRecyclerThread();
8,735✔
1254
        SleepUsec(10);
8,735✔
1255
    } while (!FlowRecyclerReadyToShutdown());
8,735✔
1256

1257
    SCMutexLock(&tv_root_lock);
3,427✔
1258
    /* flow recycler thread(s) is/are a part of mgmt threads */
1259
    for (ThreadVars *tv = tv_root[TVT_MGMT]; tv != NULL; tv = tv->next) {
16,165✔
1260
        if (strncasecmp(tv->name, thread_name_flow_rec,
12,738✔
1261
            strlen(thread_name_flow_rec)) == 0)
12,738✔
1262
        {
3,427✔
1263
            TmThreadsSetFlag(tv, THV_KILL);
3,427✔
1264
        }
3,427✔
1265
    }
12,738✔
1266
    SCMutexUnlock(&tv_root_lock);
3,427✔
1267

1268
    struct timeval start_ts;
3,427✔
1269
    struct timeval cur_ts;
3,427✔
1270
    gettimeofday(&start_ts, NULL);
3,427✔
1271

1272
again:
19,799✔
1273
    gettimeofday(&cur_ts, NULL);
19,799✔
1274
    if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
19,799✔
1275
        FatalError("unable to get all flow recycler "
×
1276
                   "threads to shutdown in time");
×
1277
    }
×
1278

1279
    SCMutexLock(&tv_root_lock);
19,799✔
1280
    for (ThreadVars *tv = tv_root[TVT_MGMT]; tv != NULL; tv = tv->next) {
48,909✔
1281
        if (strncasecmp(tv->name, thread_name_flow_rec,
45,482✔
1282
            strlen(thread_name_flow_rec)) == 0)
45,482✔
1283
        {
19,799✔
1284
            if (!TmThreadsCheckFlag(tv, THV_RUNNING_DONE)) {
19,799✔
1285
                SCMutexUnlock(&tv_root_lock);
16,372✔
1286
                FlowWakeupFlowRecyclerThread();
16,372✔
1287
                /* sleep outside lock */
1288
                SleepMsec(1);
16,372✔
1289
                goto again;
16,372✔
1290
            }
16,372✔
1291
        }
19,799✔
1292
    }
45,482✔
1293
    SCMutexUnlock(&tv_root_lock);
3,427✔
1294

1295
    /* reset count, so we can kill and respawn (unix socket) */
1296
    SC_ATOMIC_SET(flowrec_cnt, 0);
3,427✔
1297
}
3,427✔
1298

1299
void TmModuleFlowManagerRegister (void)
1300
{
2,221✔
1301
    tmm_modules[TMM_FLOWMANAGER].name = "FlowManager";
2,221✔
1302
    tmm_modules[TMM_FLOWMANAGER].ThreadInit = FlowManagerThreadInit;
2,221✔
1303
    tmm_modules[TMM_FLOWMANAGER].ThreadDeinit = FlowManagerThreadDeinit;
2,221✔
1304
    tmm_modules[TMM_FLOWMANAGER].Management = FlowManager;
2,221✔
1305
    tmm_modules[TMM_FLOWMANAGER].cap_flags = 0;
2,221✔
1306
    tmm_modules[TMM_FLOWMANAGER].flags = TM_FLAG_MANAGEMENT_TM;
2,221✔
1307
    SCLogDebug("%s registered", tmm_modules[TMM_FLOWMANAGER].name);
2,221✔
1308

1309
    SC_ATOMIC_INIT(flowmgr_cnt);
2,221✔
1310
    SC_ATOMIC_INITPTR(flow_timeouts);
2,221✔
1311
}
2,221✔
1312

1313
void TmModuleFlowRecyclerRegister (void)
1314
{
2,221✔
1315
    tmm_modules[TMM_FLOWRECYCLER].name = "FlowRecycler";
2,221✔
1316
    tmm_modules[TMM_FLOWRECYCLER].ThreadInit = FlowRecyclerThreadInit;
2,221✔
1317
    tmm_modules[TMM_FLOWRECYCLER].ThreadDeinit = FlowRecyclerThreadDeinit;
2,221✔
1318
    tmm_modules[TMM_FLOWRECYCLER].Management = FlowRecycler;
2,221✔
1319
    tmm_modules[TMM_FLOWRECYCLER].cap_flags = 0;
2,221✔
1320
    tmm_modules[TMM_FLOWRECYCLER].flags = TM_FLAG_MANAGEMENT_TM;
2,221✔
1321
    SCLogDebug("%s registered", tmm_modules[TMM_FLOWRECYCLER].name);
2,221✔
1322

1323
    SC_ATOMIC_INIT(flowrec_cnt);
2,221✔
1324
    SC_ATOMIC_INIT(flowrec_busy);
2,221✔
1325
}
2,221✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc