• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jasonish / suricata / 22785952221

06 Mar 2026 11:16PM UTC coverage: 67.722% (-11.5%) from 79.244%
22785952221

push

github

jasonish
github-ci: add schema ordering check for yaml schema

158908 of 234646 relevant lines covered (67.72%)

6673126.15 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

58.7
/src/tmqh-flow.c
1
/* Copyright (C) 2007-2020 Open Information Security Foundation
2
 *
3
 * You can copy, redistribute or modify this Program under the terms of
4
 * the GNU General Public License version 2 as published by the Free
5
 * Software Foundation.
6
 *
7
 * This program is distributed in the hope that it will be useful,
8
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10
 * GNU General Public License for more details.
11
 *
12
 * You should have received a copy of the GNU General Public License
13
 * version 2 along with this program; if not, write to the Free Software
14
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15
 * 02110-1301, USA.
16
 */
17

18
/**
19
 * \file
20
 *
21
 * \author Victor Julien <victor@inliniac.net>
22
 * \author Anoop Saldanha <anoopsaldanha@gmail.com>
23
 *
24
 * Simple output queue handler that makes sure all packets of the same flow
25
 * are sent to the same queue. We support different kind of q handlers.  Have
26
 * a look at "autofp-scheduler" conf to further understand the various q
27
 * handlers we provide.
28
 */
29

30
#include "suricata.h"
31
#include "packet-queue.h"
32
#include "decode.h"
33
#include "threads.h"
34
#include "threadvars.h"
35
#include "tmqh-flow.h"
36
#include "flow-hash.h"
37

38
#include "tm-queuehandlers.h"
39

40
#include "conf.h"
41
#include "util-unittest.h"
42

43
Packet *TmqhInputFlow(ThreadVars *t);
44
void TmqhOutputFlowHash(ThreadVars *t, Packet *p);
45
void TmqhOutputFlowIPPair(ThreadVars *t, Packet *p);
46
static void TmqhOutputFlowFTPHash(ThreadVars *t, Packet *p);
47
void *TmqhOutputFlowSetupCtx(const char *queue_str);
48
void TmqhOutputFlowFreeCtx(void *ctx);
49
void TmqhFlowRegisterTests(void);
50

51
void TmqhFlowRegister(void)
52
{
2,219✔
53
    tmqh_table[TMQH_FLOW].name = "flow";
2,219✔
54
    tmqh_table[TMQH_FLOW].InHandler = TmqhInputFlow;
2,219✔
55
    tmqh_table[TMQH_FLOW].OutHandlerCtxSetup = TmqhOutputFlowSetupCtx;
2,219✔
56
    tmqh_table[TMQH_FLOW].OutHandlerCtxFree = TmqhOutputFlowFreeCtx;
2,219✔
57
    tmqh_table[TMQH_FLOW].RegisterTests = TmqhFlowRegisterTests;
2,219✔
58

59
    const char *scheduler = NULL;
2,219✔
60
    if (SCConfGet("autofp-scheduler", &scheduler) == 1) {
2,219✔
61
        if (strcasecmp(scheduler, "round-robin") == 0) {
×
62
            SCLogNotice("using flow hash instead of round robin");
×
63
            tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowHash;
×
64
        } else if (strcasecmp(scheduler, "active-packets") == 0) {
×
65
            SCLogNotice("using flow hash instead of active packets");
×
66
            tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowHash;
×
67
        } else if (strcasecmp(scheduler, "hash") == 0) {
×
68
            tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowHash;
×
69
        } else if (strcasecmp(scheduler, "ippair") == 0) {
×
70
            tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowIPPair;
×
71
        } else if (strcasecmp(scheduler, "ftp-hash") == 0) {
×
72
            tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowFTPHash;
×
73
        } else {
×
74
            SCLogError("Invalid entry \"%s\" "
×
75
                       "for autofp-scheduler in conf.  Killing engine.",
×
76
                    scheduler);
×
77
            exit(EXIT_FAILURE);
×
78
        }
×
79
    } else {
2,219✔
80
        tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowHash;
2,219✔
81
    }
2,219✔
82
}
2,219✔
83

84
void TmqhFlowPrintAutofpHandler(void)
85
{
3,368✔
86
#define PRINT_IF_FUNC(f, msg)                       \
3,368✔
87
    if (tmqh_table[TMQH_FLOW].OutHandler == (f))    \
10,104✔
88
        SCLogConfig("AutoFP mode using \"%s\" flow load balancer", (msg))
10,104✔
89

90
    PRINT_IF_FUNC(TmqhOutputFlowHash, "Hash");
3,368✔
91
    PRINT_IF_FUNC(TmqhOutputFlowIPPair, "IPPair");
3,368✔
92
    PRINT_IF_FUNC(TmqhOutputFlowFTPHash, "FTPHash");
3,368✔
93

94
#undef PRINT_IF_FUNC
3,368✔
95
}
3,368✔
96

97
/* same as 'simple' */
98
Packet *TmqhInputFlow(ThreadVars *tv)
99
{
2,724,417✔
100
    PacketQueue *q = tv->inq->pq;
2,724,417✔
101

102
    StatsSyncCountersIfSignalled(&tv->stats);
2,724,417✔
103

104
    SCMutexLock(&q->mutex_q);
2,724,417✔
105
    if (q->len == 0) {
2,724,417✔
106
        /* if we have no packets in queue, wait... */
107
        SCCondWait(&q->cond_q, &q->mutex_q);
49,876✔
108
    }
49,876✔
109

110
    if (q->len > 0) {
2,724,417✔
111
        Packet *p = PacketDequeue(q);
2,710,829✔
112
        SCMutexUnlock(&q->mutex_q);
2,710,829✔
113
        return p;
2,710,829✔
114
    } else {
2,710,858✔
115
        /* return NULL if we have no pkt. Should only happen on signals. */
116
        SCMutexUnlock(&q->mutex_q);
13,588✔
117
        return NULL;
13,588✔
118
    }
13,588✔
119
}
2,724,417✔
120

121
static int StoreQueueId(TmqhFlowCtx *ctx, char *name)
122
{
13,524✔
123
    void *ptmp;
13,524✔
124
    Tmq *tmq = TmqGetQueueByName(name);
13,524✔
125
    if (tmq == NULL) {
13,524✔
126
        tmq = TmqCreateQueue(name);
13,488✔
127
        if (tmq == NULL)
13,488✔
128
            return -1;
×
129
    }
13,488✔
130
    tmq->writer_cnt++;
13,524✔
131

132
    if (ctx->queues == NULL) {
13,524✔
133
        ctx->size = 1;
3,375✔
134
        ctx->queues = SCCalloc(1, ctx->size * sizeof(TmqhFlowMode));
3,375✔
135
        if (ctx->queues == NULL) {
3,375✔
136
            return -1;
×
137
        }
×
138
    } else {
10,149✔
139
        ctx->size++;
10,149✔
140
        ptmp = SCRealloc(ctx->queues, ctx->size * sizeof(TmqhFlowMode));
10,149✔
141
        if (ptmp == NULL) {
10,149✔
142
            SCFree(ctx->queues);
×
143
            ctx->queues = NULL;
×
144
            return -1;
×
145
        }
×
146
        ctx->queues = ptmp;
10,149✔
147

148
        memset(ctx->queues + (ctx->size - 1), 0, sizeof(TmqhFlowMode));
10,149✔
149
    }
10,149✔
150
    ctx->queues[ctx->size - 1].q = tmq->pq;
13,524✔
151

152
    return 0;
13,524✔
153
}
13,524✔
154

155
/**
156
 * \brief setup the queue handlers ctx
157
 *
158
 * Parses a comma separated string "queuename1,queuename2,etc"
159
 * and sets the ctx up to devide flows over these queue's.
160
 *
161
 * \param queue_str comma separated string with output queue names
162
 *
163
 * \retval ctx queues handlers ctx or NULL in error
164
 */
165
void *TmqhOutputFlowSetupCtx(const char *queue_str)
166
{
3,375✔
167
    if (queue_str == NULL || strlen(queue_str) == 0)
3,375✔
168
        return NULL;
×
169

170
    SCLogDebug("queue_str %s", queue_str);
3,375✔
171

172
    TmqhFlowCtx *ctx = SCCalloc(1, sizeof(TmqhFlowCtx));
3,375✔
173
    if (unlikely(ctx == NULL))
3,375✔
174
        return NULL;
×
175

176
    char *str = SCStrdup(queue_str);
3,375✔
177
    if (unlikely(str == NULL)) {
3,375✔
178
        goto error;
×
179
    }
×
180
    char *tstr = str;
3,375✔
181

182
    /* parse the comma separated string */
183
    do {
13,524✔
184
        char *comma = strchr(tstr,',');
13,524✔
185
        if (comma != NULL) {
13,524✔
186
            *comma = '\0';
10,149✔
187
            char *qname = tstr;
10,149✔
188
            int r = StoreQueueId(ctx,qname);
10,149✔
189
            if (r < 0)
10,149✔
190
                goto error;
×
191
        } else {
10,149✔
192
            char *qname = tstr;
3,375✔
193
            int r = StoreQueueId(ctx,qname);
3,375✔
194
            if (r < 0)
3,375✔
195
                goto error;
×
196
        }
3,375✔
197
        tstr = comma ? (comma + 1) : comma;
13,524✔
198
    } while (tstr != NULL);
13,524✔
199

200
    SCFree(str);
3,375✔
201
    return (void *)ctx;
3,375✔
202

203
error:
×
204
    SCFree(ctx);
×
205
    if (str != NULL)
×
206
        SCFree(str);
×
207
    return NULL;
×
208
}
3,375✔
209

210
void TmqhOutputFlowFreeCtx(void *ctx)
211
{
3,375✔
212
    TmqhFlowCtx *fctx = (TmqhFlowCtx *)ctx;
3,375✔
213

214
    SCLogPerf("AutoFP - Total flow handler queues - %" PRIu16,
3,375✔
215
              fctx->size);
3,375✔
216
    SCFree(fctx->queues);
3,375✔
217
    SCFree(fctx);
3,375✔
218
}
3,375✔
219

220
void TmqhOutputFlowHash(ThreadVars *tv, Packet *p)
221
{
2,710,900✔
222
    uint32_t qid;
2,710,900✔
223
    TmqhFlowCtx *ctx = (TmqhFlowCtx *)tv->outctx;
2,710,900✔
224

225
    if (p->flags & PKT_WANTS_FLOW) {
2,710,900✔
226
        uint32_t hash = p->flow_hash;
2,587,644✔
227
        qid = hash % ctx->size;
2,587,644✔
228
    } else {
2,587,644✔
229
        qid = ctx->last++;
123,256✔
230

231
        if (ctx->last == ctx->size)
123,256✔
232
            ctx->last = 0;
30,720✔
233
    }
123,256✔
234

235
    PacketQueue *q = ctx->queues[qid].q;
2,710,900✔
236
    SCMutexLock(&q->mutex_q);
2,710,900✔
237
    PacketEnqueue(q, p);
2,710,900✔
238
    SCCondSignal(&q->cond_q);
2,710,900✔
239
    SCMutexUnlock(&q->mutex_q);
2,710,900✔
240
}
2,710,900✔
241

242
/**
243
 * \brief select the queue to output based on IP address pair.
244
 *
245
 * \param tv thread vars.
246
 * \param p packet.
247
 */
248
void TmqhOutputFlowIPPair(ThreadVars *tv, Packet *p)
249
{
×
250
    uint32_t addr_hash = 0;
×
251

252
    TmqhFlowCtx *ctx = (TmqhFlowCtx *)tv->outctx;
×
253

254
    if (p->src.family == AF_INET6) {
×
255
        for (int i = 0; i < 4; i++) {
×
256
            addr_hash += p->src.addr_data32[i] + p->dst.addr_data32[i];
×
257
        }
×
258
    } else {
×
259
        addr_hash = p->src.addr_data32[0] + p->dst.addr_data32[0];
×
260
    }
×
261

262
    uint32_t qid = addr_hash % ctx->size;
×
263
    PacketQueue *q = ctx->queues[qid].q;
×
264
    SCMutexLock(&q->mutex_q);
×
265
    PacketEnqueue(q, p);
×
266
    SCCondSignal(&q->cond_q);
×
267
    SCMutexUnlock(&q->mutex_q);
×
268
}
×
269

270
static void TmqhOutputFlowFTPHash(ThreadVars *tv, Packet *p)
271
{
×
272
    uint32_t qid;
×
273
    TmqhFlowCtx *ctx = (TmqhFlowCtx *)tv->outctx;
×
274

275
    if (p->flags & PKT_WANTS_FLOW) {
×
276
        uint32_t hash = p->flow_hash;
×
277
        if (PacketIsTCP(p) && ((p->sp >= 1024 && p->dp >= 1024) || p->dp == 21 || p->sp == 21 ||
×
278
                                      p->dp == 20 || p->sp == 20)) {
×
279
            hash = FlowGetIpPairProtoHash(p);
×
280
        }
×
281
        qid = hash % ctx->size;
×
282
    } else {
×
283
        qid = ctx->last++;
×
284

285
        if (ctx->last == ctx->size)
×
286
            ctx->last = 0;
×
287
    }
×
288

289
    PacketQueue *q = ctx->queues[qid].q;
×
290
    SCMutexLock(&q->mutex_q);
×
291
    PacketEnqueue(q, p);
×
292
    SCCondSignal(&q->cond_q);
×
293
    SCMutexUnlock(&q->mutex_q);
×
294
}
×
295

296
#ifdef UNITTESTS
297

298
static int TmqhOutputFlowSetupCtxTest01(void)
299
{
300
    TmqResetQueues();
301

302
    Tmq *tmq1 = TmqCreateQueue("queue1");
303
    FAIL_IF_NULL(tmq1);
304
    Tmq *tmq2 = TmqCreateQueue("queue2");
305
    FAIL_IF_NULL(tmq2);
306
    Tmq *tmq3 = TmqCreateQueue("another");
307
    FAIL_IF_NULL(tmq3);
308
    Tmq *tmq4 = TmqCreateQueue("yetanother");
309
    FAIL_IF_NULL(tmq4);
310

311
    const char *str = "queue1,queue2,another,yetanother";
312
    void *ctx = TmqhOutputFlowSetupCtx(str);
313
    FAIL_IF_NULL(ctx);
314

315
    TmqhFlowCtx *fctx = (TmqhFlowCtx *)ctx;
316

317
    FAIL_IF_NOT(fctx->size == 4);
318

319
    FAIL_IF_NULL(fctx->queues);
320

321
    FAIL_IF_NOT(fctx->queues[0].q == tmq1->pq);
322
    FAIL_IF_NOT(fctx->queues[1].q == tmq2->pq);
323
    FAIL_IF_NOT(fctx->queues[2].q == tmq3->pq);
324
    FAIL_IF_NOT(fctx->queues[3].q == tmq4->pq);
325

326
    TmqhOutputFlowFreeCtx(fctx);
327
    TmqResetQueues();
328
    PASS;
329
}
330

331
static int TmqhOutputFlowSetupCtxTest02(void)
332
{
333
    TmqResetQueues();
334

335
    Tmq *tmq1 = TmqCreateQueue("queue1");
336
    FAIL_IF_NULL(tmq1);
337
    Tmq *tmq2 = TmqCreateQueue("queue2");
338
    FAIL_IF_NULL(tmq2);
339
    Tmq *tmq3 = TmqCreateQueue("another");
340
    FAIL_IF_NULL(tmq3);
341
    Tmq *tmq4 = TmqCreateQueue("yetanother");
342
    FAIL_IF_NULL(tmq4);
343

344
    const char *str = "queue1";
345
    void *ctx = TmqhOutputFlowSetupCtx(str);
346
    FAIL_IF_NULL(ctx);
347

348
    TmqhFlowCtx *fctx = (TmqhFlowCtx *)ctx;
349

350
    FAIL_IF_NOT(fctx->size == 1);
351

352
    FAIL_IF_NULL(fctx->queues);
353

354
    FAIL_IF_NOT(fctx->queues[0].q == tmq1->pq);
355
    TmqhOutputFlowFreeCtx(fctx);
356
    TmqResetQueues();
357

358
    PASS;
359
}
360

361
static int TmqhOutputFlowSetupCtxTest03(void)
362
{
363
    TmqResetQueues();
364

365
    const char *str = "queue1,queue2,another,yetanother";
366
    void *ctx = TmqhOutputFlowSetupCtx(str);
367
    FAIL_IF_NULL(ctx);
368

369
    TmqhFlowCtx *fctx = (TmqhFlowCtx *)ctx;
370

371
    FAIL_IF_NOT(fctx->size == 4);
372

373
    FAIL_IF_NULL(fctx->queues);
374

375
    Tmq *tmq1 = TmqGetQueueByName("queue1");
376
    FAIL_IF_NULL(tmq1);
377
    Tmq *tmq2 = TmqGetQueueByName("queue2");
378
    FAIL_IF_NULL(tmq2);
379
    Tmq *tmq3 = TmqGetQueueByName("another");
380
    FAIL_IF_NULL(tmq3);
381
    Tmq *tmq4 = TmqGetQueueByName("yetanother");
382
    FAIL_IF_NULL(tmq4);
383

384
    FAIL_IF_NOT(fctx->queues[0].q == tmq1->pq);
385
    FAIL_IF_NOT(fctx->queues[1].q == tmq2->pq);
386
    FAIL_IF_NOT(fctx->queues[2].q == tmq3->pq);
387
    FAIL_IF_NOT(fctx->queues[3].q == tmq4->pq);
388

389
    TmqhOutputFlowFreeCtx(fctx);
390
    TmqResetQueues();
391
    PASS;
392
}
393

394
#endif /* UNITTESTS */
395

396
void TmqhFlowRegisterTests(void)
397
{
×
398
#ifdef UNITTESTS
399
    UtRegisterTest("TmqhOutputFlowSetupCtxTest01",
400
                   TmqhOutputFlowSetupCtxTest01);
401
    UtRegisterTest("TmqhOutputFlowSetupCtxTest02",
402
                   TmqhOutputFlowSetupCtxTest02);
403
    UtRegisterTest("TmqhOutputFlowSetupCtxTest03",
404
                   TmqhOutputFlowSetupCtxTest03);
405
#endif
406
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc