• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

OISF / suricata / 23374838686

21 Mar 2026 07:29AM UTC coverage: 59.341% (-20.0%) from 79.315%
23374838686

Pull #15075

github

web-flow
Merge 90b4e834f into 6587e363a
Pull Request #15075: Stack 8001 v16.4

38 of 70 new or added lines in 10 files covered. (54.29%)

34165 existing lines in 563 files now uncovered.

119621 of 201584 relevant lines covered (59.34%)

650666.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.26
/src/source-dpdk.c
1
/* Copyright (C) 2021-2025 Open Information Security Foundation
2
 *
3
 * You can copy, redistribute or modify this Program under the terms of
4
 * the GNU General Public License version 2 as published by the Free
5
 * Software Foundation.
6
 *
7
 * This program is distributed in the hope that it will be useful,
8
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10
 * GNU General Public License for more details.
11
 *
12
 * You should have received a copy of the GNU General Public License
13
 * version 2 along with this program; if not, write to the Free Software
14
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15
 * 02110-1301, USA.
16
 */
17

18
/**
19
 *  \defgroup dpdk DPDK running mode
20
 *
21
 *  @{
22
 */
23

24
/**
25
 * \file
26
 *
27
 * \author Lukas Sismis <lukas.sismis@gmail.com>
28
 *
29
 * DPDK capture interface
30
 *
31
 */
32

33
#include "suricata-common.h"
34
#include "runmodes.h"
35
#include "decode.h"
36
#include "packet.h"
37
#include "source-dpdk.h"
38
#include "suricata.h"
39
#include "threads.h"
40
#include "threadvars.h"
41
#include "tm-threads.h"
42
#include "tmqh-packetpool.h"
43
#include "util-privs.h"
44
#include "util-device-private.h"
45
#include "action-globals.h"
46

47
#ifndef HAVE_DPDK
48

49
TmEcode NoDPDKSupportExit(ThreadVars *, const void *, void **);
50

51
void TmModuleReceiveDPDKRegister(void)
52
{
2✔
53
    tmm_modules[TMM_RECEIVEDPDK].name = "ReceiveDPDK";
2✔
54
    tmm_modules[TMM_RECEIVEDPDK].ThreadInit = NoDPDKSupportExit;
2✔
55
    tmm_modules[TMM_RECEIVEDPDK].Func = NULL;
2✔
56
    tmm_modules[TMM_RECEIVEDPDK].ThreadExitPrintStats = NULL;
2✔
57
    tmm_modules[TMM_RECEIVEDPDK].ThreadDeinit = NULL;
2✔
58
    tmm_modules[TMM_RECEIVEDPDK].cap_flags = 0;
2✔
59
    tmm_modules[TMM_RECEIVEDPDK].flags = TM_FLAG_RECEIVE_TM;
2✔
60
}
2✔
61

62
/**
63
 * \brief Registration Function for DecodeDPDK.
64
 */
65
void TmModuleDecodeDPDKRegister(void)
66
{
2✔
67
    tmm_modules[TMM_DECODEDPDK].name = "DecodeDPDK";
2✔
68
    tmm_modules[TMM_DECODEDPDK].ThreadInit = NoDPDKSupportExit;
2✔
69
    tmm_modules[TMM_DECODEDPDK].Func = NULL;
2✔
70
    tmm_modules[TMM_DECODEDPDK].ThreadExitPrintStats = NULL;
2✔
71
    tmm_modules[TMM_DECODEDPDK].ThreadDeinit = NULL;
2✔
72
    tmm_modules[TMM_DECODEDPDK].cap_flags = 0;
2✔
73
    tmm_modules[TMM_DECODEDPDK].flags = TM_FLAG_DECODE_TM;
2✔
74
}
2✔
75

76
/**
77
 * \brief this function prints an error message and exits.
78
 */
79
TmEcode NoDPDKSupportExit(ThreadVars *tv, const void *initdata, void **data)
UNCOV
80
{
×
81
    FatalError("Error creating thread %s: you do not have "
UNCOV
82
               "support for DPDK enabled, on Linux host please recompile "
×
UNCOV
83
               "with --enable-dpdk",
×
UNCOV
84
            tv->name);
×
UNCOV
85
}
×
86

87
#else /* We have DPDK support */
88

89
#include "util-affinity.h"
90
#include "util-dpdk.h"
91
#include "util-dpdk-i40e.h"
92
#include "util-dpdk-ice.h"
93
#include "util-dpdk-ixgbe.h"
94
#include "util-dpdk-mlx5.h"
95
#include "util-dpdk-bonding.h"
96
#include <numa.h>
97

98
#define BURST_SIZE 32
99
// interrupt mode constants
100
#define MIN_ZERO_POLL_COUNT          10U
101
#define MIN_ZERO_POLL_COUNT_TO_SLEEP 10U
102
#define MINIMUM_SLEEP_TIME_US        1U
103
#define STANDARD_SLEEP_TIME_US       100U
104
#define MAX_EPOLL_TIMEOUT_MS         500U
105
static rte_spinlock_t intr_lock[RTE_MAX_ETHPORTS];
106

107
/**
108
 * \brief Structure to hold thread specific variables.
109
 */
110
typedef struct DPDKThreadVars_ {
111
    /* counters */
112
    uint64_t pkts;
113
    ThreadVars *tv;
114
    TmSlot *slot;
115
    LiveDevice *livedev;
116
    ChecksumValidationMode checksum_mode;
117
    bool intr_enabled;
118
    /* references to packet and drop counters */
119
    StatsCounterId capture_dpdk_packets;
120
    StatsCounterId capture_dpdk_rx_errs;
121
    StatsCounterId capture_dpdk_imissed;
122
    StatsCounterId capture_dpdk_rx_no_mbufs;
123
    StatsCounterId capture_dpdk_ierrors;
124
    StatsCounterId capture_dpdk_tx_errs;
125
    unsigned int flags;
126
    uint16_t threads;
127
    /* for IPS */
128
    DpdkCopyModeEnum copy_mode;
129
    uint16_t out_port_id;
130
    /* Entry in the peers_list */
131

132
    uint64_t bytes;
133
    uint64_t accepted;
134
    uint64_t dropped;
135
    uint16_t port_id;
136
    uint16_t queue_id;
137
    int32_t port_socket_id;
138
    struct rte_mbuf *received_mbufs[BURST_SIZE];
139
    DPDKWorkerSync *workers_sync;
140
} DPDKThreadVars;
141

142
static TmEcode ReceiveDPDKThreadInit(ThreadVars *, const void *, void **);
143
static TmEcode ReceiveDPDKThreadDeinit(ThreadVars *, void *);
144
static TmEcode ReceiveDPDKLoop(ThreadVars *tv, void *data, void *slot);
145

146
static TmEcode DecodeDPDKThreadInit(ThreadVars *, const void *, void **);
147
static TmEcode DecodeDPDKThreadDeinit(ThreadVars *tv, void *data);
148
static TmEcode DecodeDPDK(ThreadVars *, Packet *, void *);
149

150
static void DPDKFreeMbufArray(struct rte_mbuf **mbuf_array, uint16_t mbuf_cnt, uint16_t offset);
151
static bool InterruptsRXEnable(uint16_t port_id, uint16_t queue_id)
152
{
153
    uint32_t event_data = (uint32_t)port_id << UINT16_WIDTH | queue_id;
154
    int32_t ret = rte_eth_dev_rx_intr_ctl_q(port_id, queue_id, RTE_EPOLL_PER_THREAD,
155
            RTE_INTR_EVENT_ADD, (void *)((uintptr_t)event_data));
156

157
    if (ret != 0) {
158
        SCLogError("%s-Q%d: failed to enable interrupt mode: %s", DPDKGetPortNameByPortID(port_id),
159
                queue_id, rte_strerror(-ret));
160
        return false;
161
    }
162
    return true;
163
}
164

165
static inline uint32_t InterruptsSleepHeuristic(uint32_t no_pkt_polls_count)
166
{
167
    if (no_pkt_polls_count < MIN_ZERO_POLL_COUNT_TO_SLEEP)
168
        return MINIMUM_SLEEP_TIME_US;
169

170
    return STANDARD_SLEEP_TIME_US;
171
}
172

173
static inline void InterruptsTurnOnOff(uint16_t port_id, uint16_t queue_id, bool on)
174
{
175
    rte_spinlock_lock(&(intr_lock[port_id]));
176

177
    if (on)
178
        rte_eth_dev_rx_intr_enable(port_id, queue_id);
179
    else
180
        rte_eth_dev_rx_intr_disable(port_id, queue_id);
181

182
    rte_spinlock_unlock(&(intr_lock[port_id]));
183
}
184

185
static inline void DPDKFreeMbufArray(
186
        struct rte_mbuf **mbuf_array, uint16_t mbuf_cnt, uint16_t offset)
187
{
188
    for (int i = offset; i < mbuf_cnt; i++) {
189
        rte_pktmbuf_free(mbuf_array[i]);
190
    }
191
}
192

193
static void DevicePostStartPMDSpecificActions(DPDKThreadVars *ptv, const char *driver_name)
194
{
195
    if (strcmp(driver_name, "net_bonding") == 0)
196
        driver_name = BondingDeviceDriverGet(ptv->port_id);
197
    if (strcmp(driver_name, "net_i40e") == 0)
198
        i40eDeviceSetRSS(ptv->port_id, ptv->threads, ptv->livedev->dev);
199
    else if (strcmp(driver_name, "net_ixgbe") == 0)
200
        ixgbeDeviceSetRSS(ptv->port_id, ptv->threads, ptv->livedev->dev);
201
    else if (strcmp(driver_name, "net_ice") == 0)
202
        iceDeviceSetRSS(ptv->port_id, ptv->threads, ptv->livedev->dev);
203
    else if (strcmp(driver_name, "mlx5_pci") == 0)
204
        mlx5DeviceSetRSS(ptv->port_id, ptv->threads, ptv->livedev->dev);
205
}
206

207
static void DevicePreClosePMDSpecificActions(DPDKThreadVars *ptv, const char *driver_name)
208
{
209
    if (strcmp(driver_name, "net_bonding") == 0) {
210
        driver_name = BondingDeviceDriverGet(ptv->port_id);
211
    }
212

213
    if (
214
#if RTE_VERSION > RTE_VERSION_NUM(20, 0, 0, 0)
215
            strcmp(driver_name, "net_i40e") == 0 ||
216
#endif /* RTE_VERSION > RTE_VERSION_NUM(20, 0, 0, 0) */
217
            strcmp(driver_name, "net_ixgbe") == 0 || strcmp(driver_name, "net_ice") == 0 ||
218
            strcmp(driver_name, "mlx5_pci") == 0) {
219
        // Flush the RSS rules that have been inserted in the post start section
220
        struct rte_flow_error flush_error = { 0 };
221
        int32_t retval = rte_flow_flush(ptv->port_id, &flush_error);
222
        if (retval != 0) {
223
            SCLogError("%s: unable to flush rte_flow rules: %s Flush error msg: %s",
224
                    ptv->livedev->dev, rte_strerror(-retval), flush_error.message);
225
        }
226
    }
227
}
228

229
/**
230
 * Attempts to retrieve NUMA node id on which the caller runs
231
 * @return NUMA id on success, -1 otherwise
232
 */
233
static int GetNumaNode(void)
234
{
235
    int cpu = 0;
236
    int node = -1;
237

238
#if defined(__linux__)
239
    cpu = sched_getcpu();
240
    node = numa_node_of_cpu(cpu);
241
#else
242
    SCLogWarning("NUMA node retrieval is not supported on this OS.");
243
#endif
244

245
    return node;
246
}
247

248
/**
249
 * \brief Registration Function for ReceiveDPDK.
250
 * \todo Unit tests are needed for this module.
251
 */
252
void TmModuleReceiveDPDKRegister(void)
253
{
254
    tmm_modules[TMM_RECEIVEDPDK].name = "ReceiveDPDK";
255
    tmm_modules[TMM_RECEIVEDPDK].ThreadInit = ReceiveDPDKThreadInit;
256
    tmm_modules[TMM_RECEIVEDPDK].Func = NULL;
257
    tmm_modules[TMM_RECEIVEDPDK].PktAcqLoop = ReceiveDPDKLoop;
258
    tmm_modules[TMM_RECEIVEDPDK].PktAcqBreakLoop = NULL;
259
    tmm_modules[TMM_RECEIVEDPDK].ThreadExitPrintStats = NULL;
260
    tmm_modules[TMM_RECEIVEDPDK].ThreadDeinit = ReceiveDPDKThreadDeinit;
261
    tmm_modules[TMM_RECEIVEDPDK].cap_flags = SC_CAP_NET_RAW;
262
    tmm_modules[TMM_RECEIVEDPDK].flags = TM_FLAG_RECEIVE_TM;
263
}
264

265
/**
266
 * \brief Registration Function for DecodeDPDK.
267
 * \todo Unit tests are needed for this module.
268
 */
269
void TmModuleDecodeDPDKRegister(void)
270
{
271
    tmm_modules[TMM_DECODEDPDK].name = "DecodeDPDK";
272
    tmm_modules[TMM_DECODEDPDK].ThreadInit = DecodeDPDKThreadInit;
273
    tmm_modules[TMM_DECODEDPDK].Func = DecodeDPDK;
274
    tmm_modules[TMM_DECODEDPDK].ThreadExitPrintStats = NULL;
275
    tmm_modules[TMM_DECODEDPDK].ThreadDeinit = DecodeDPDKThreadDeinit;
276
    tmm_modules[TMM_DECODEDPDK].cap_flags = 0;
277
    tmm_modules[TMM_DECODEDPDK].flags = TM_FLAG_DECODE_TM;
278
}
279

280
static inline void DPDKDumpCounters(DPDKThreadVars *ptv)
281
{
282
    /* Some NICs (e.g. Intel) do not support queue statistics and the drops can be fetched only on
283
     * the port level. Therefore setting it to the first worker to have at least continuous update
284
     * on the dropped packets. */
285
    if (ptv->queue_id == 0) {
286
        struct rte_eth_stats eth_stats;
287
        int retval = rte_eth_stats_get(ptv->port_id, &eth_stats);
288
        if (unlikely(retval != 0)) {
289
            SCLogError("%s: failed to get stats: %s", ptv->livedev->dev, rte_strerror(-retval));
290
            return;
291
        }
292

293
        StatsCounterSetI64(&ptv->tv->stats, ptv->capture_dpdk_packets,
294
                ptv->pkts + eth_stats.imissed + eth_stats.ierrors + eth_stats.rx_nombuf);
295
        SC_ATOMIC_SET(ptv->livedev->pkts,
296
                eth_stats.ipackets + eth_stats.imissed + eth_stats.ierrors + eth_stats.rx_nombuf);
297
        StatsCounterSetI64(&ptv->tv->stats, ptv->capture_dpdk_rx_errs,
298
                eth_stats.imissed + eth_stats.ierrors + eth_stats.rx_nombuf);
299
        StatsCounterSetI64(&ptv->tv->stats, ptv->capture_dpdk_imissed, eth_stats.imissed);
300
        StatsCounterSetI64(&ptv->tv->stats, ptv->capture_dpdk_rx_no_mbufs, eth_stats.rx_nombuf);
301
        StatsCounterSetI64(&ptv->tv->stats, ptv->capture_dpdk_ierrors, eth_stats.ierrors);
302
        StatsCounterSetI64(&ptv->tv->stats, ptv->capture_dpdk_tx_errs, eth_stats.oerrors);
303
        SC_ATOMIC_SET(
304
                ptv->livedev->drop, eth_stats.imissed + eth_stats.ierrors + eth_stats.rx_nombuf);
305
    } else {
306
        StatsCounterSetI64(&ptv->tv->stats, ptv->capture_dpdk_packets, ptv->pkts);
307
    }
308
}
309

310
static void DPDKReleasePacket(Packet *p)
311
{
312
    int retval;
313
    /* Need to be in copy mode and need to detect early release
314
       where Ethernet header could not be set (and pseudo packet)
315
       When enabling promiscuous mode on Intel cards, 2 ICMPv6 packets are generated.
316
       These get into the infinite cycle between the NIC and the switch in some cases */
317
    if ((p->dpdk_v.copy_mode == DPDK_COPY_MODE_TAP ||
318
                (p->dpdk_v.copy_mode == DPDK_COPY_MODE_IPS && !PacketCheckAction(p, ACTION_DROP)))
319
#if defined(RTE_LIBRTE_I40E_PMD) || defined(RTE_LIBRTE_IXGBE_PMD) || defined(RTE_LIBRTE_ICE_PMD)
320
            && !(PacketIsICMPv6(p) && PacketGetICMPv6(p)->type == 143)
321
#endif
322
    ) {
323
        BUG_ON(PKT_IS_PSEUDOPKT(p));
324
        retval =
325
                rte_eth_tx_burst(p->dpdk_v.out_port_id, p->dpdk_v.out_queue_id, &p->dpdk_v.mbuf, 1);
326
        // rte_eth_tx_burst can return only 0 (failure) or 1 (success) because we are only
327
        // transmitting burst of size 1 and the function rte_eth_tx_burst returns number of
328
        // successfully sent packets.
329
        if (unlikely(retval < 1)) {
330
            // sometimes a repeated transmit can help to send out the packet
331
            rte_delay_us(DPDK_BURST_TX_WAIT_US);
332
            retval = rte_eth_tx_burst(
333
                    p->dpdk_v.out_port_id, p->dpdk_v.out_queue_id, &p->dpdk_v.mbuf, 1);
334
            if (unlikely(retval < 1)) {
335
                SCLogDebug("Unable to transmit the packet on port %u queue %u",
336
                        p->dpdk_v.out_port_id, p->dpdk_v.out_queue_id);
337
                rte_pktmbuf_free(p->dpdk_v.mbuf);
338
                p->dpdk_v.mbuf = NULL;
339
            }
340
        }
341
    } else {
342
        rte_pktmbuf_free(p->dpdk_v.mbuf);
343
        p->dpdk_v.mbuf = NULL;
344
    }
345

346
    PacketFreeOrRelease(p);
347
}
348

349
static TmEcode ReceiveDPDKLoopInit(ThreadVars *tv, DPDKThreadVars *ptv)
350
{
351
    SCEnter();
352
    // Indicate that the thread is actually running its application level
353
    // code (i.e., it can poll packets)
354
    TmThreadsSetFlag(tv, THV_RUNNING);
355
    PacketPoolWait();
356

357
    rte_eth_stats_reset(ptv->port_id);
358
    rte_eth_xstats_reset(ptv->port_id);
359

360
    if (ptv->intr_enabled && !InterruptsRXEnable(ptv->port_id, ptv->queue_id))
361
        SCReturnInt(TM_ECODE_FAILED);
362

363
    SCReturnInt(TM_ECODE_OK);
364
}
365

366
static inline void LoopHandleTimeoutOnIdle(ThreadVars *tv)
367
{
368
    static thread_local uint64_t last_timeout_msec = 0;
369
    SCTime_t t = TimeGet();
370
    uint64_t msecs = SCTIME_MSECS(t);
371
    if (msecs > last_timeout_msec + 100) {
372
        TmThreadsCaptureHandleTimeout(tv, NULL);
373
        last_timeout_msec = msecs;
374
    }
375
}
376

377
/**
378
 * \brief Decides if it should retry the packet poll or continue with the packet processing
379
 * \return true if the poll should be retried, false otherwise
380
 */
381
static inline bool RXPacketCountHeuristic(ThreadVars *tv, DPDKThreadVars *ptv, uint16_t nb_rx)
382
{
383
    static thread_local uint32_t zero_pkt_polls_cnt = 0;
384

385
    if (nb_rx > 0) {
386
        zero_pkt_polls_cnt = 0;
387
        return false;
388
    }
389

390
    LoopHandleTimeoutOnIdle(tv);
391
    if (!ptv->intr_enabled)
392
        return true;
393

394
    zero_pkt_polls_cnt++;
395
    if (zero_pkt_polls_cnt <= MIN_ZERO_POLL_COUNT)
396
        return true;
397

398
    uint32_t pwd_idle_hint = InterruptsSleepHeuristic(zero_pkt_polls_cnt);
399
    if (pwd_idle_hint < STANDARD_SLEEP_TIME_US) {
400
        rte_delay_us(pwd_idle_hint);
401
    } else {
402
        InterruptsTurnOnOff(ptv->port_id, ptv->queue_id, true);
403
        struct rte_epoll_event event;
404
        rte_epoll_wait(RTE_EPOLL_PER_THREAD, &event, 1, MAX_EPOLL_TIMEOUT_MS);
405
        InterruptsTurnOnOff(ptv->port_id, ptv->queue_id, false);
406
        return true;
407
    }
408

409
    return false;
410
}
411

412
/**
413
 * \brief Initializes a packet from an mbuf
414
 * \return true if the packet was initialized successfully, false otherwise
415
 */
416
static inline Packet *PacketInitFromMbuf(DPDKThreadVars *ptv, struct rte_mbuf *mbuf)
417
{
418
    Packet *p = PacketGetFromQueueOrAlloc();
419
    if (unlikely(p == NULL)) {
420
        return NULL;
421
    }
422
    PKT_SET_SRC(p, PKT_SRC_WIRE);
423
    p->datalink = LINKTYPE_ETHERNET;
424
    if (ptv->checksum_mode == CHECKSUM_VALIDATION_DISABLE) {
425
        p->flags |= PKT_IGNORE_CHECKSUM;
426
    }
427

428
    p->ts = TimeGet();
429
    p->dpdk_v.mbuf = mbuf;
430
    p->ReleasePacket = DPDKReleasePacket;
431
    p->dpdk_v.copy_mode = ptv->copy_mode;
432
    p->dpdk_v.out_port_id = ptv->out_port_id;
433
    p->dpdk_v.out_queue_id = ptv->queue_id;
434
    p->livedev = ptv->livedev;
435

436
    if (ptv->checksum_mode == CHECKSUM_VALIDATION_DISABLE) {
437
        p->flags |= PKT_IGNORE_CHECKSUM;
438
    } else if (ptv->checksum_mode == CHECKSUM_VALIDATION_OFFLOAD) {
439
        uint64_t ol_flags = p->dpdk_v.mbuf->ol_flags;
440
        if ((ol_flags & RTE_MBUF_F_RX_IP_CKSUM_MASK) == RTE_MBUF_F_RX_IP_CKSUM_GOOD &&
441
                (ol_flags & RTE_MBUF_F_RX_L4_CKSUM_MASK) == RTE_MBUF_F_RX_L4_CKSUM_GOOD) {
442
            SCLogDebug("HW detected GOOD IP and L4 chsum, ignoring validation");
443
            p->flags |= PKT_IGNORE_CHECKSUM;
444
        } else {
445
            if ((ol_flags & RTE_MBUF_F_RX_IP_CKSUM_MASK) == RTE_MBUF_F_RX_IP_CKSUM_BAD) {
446
                SCLogDebug("HW detected BAD IP checksum");
447
                // chsum recalc will not be triggered but rule keyword check will be
448
                p->l3.csum_set = true;
449
                p->l3.csum = 0;
450
            }
451
            if ((ol_flags & RTE_MBUF_F_RX_L4_CKSUM_MASK) == RTE_MBUF_F_RX_L4_CKSUM_BAD) {
452
                SCLogDebug("HW detected BAD L4 chsum");
453
                p->l4.csum_set = true;
454
                p->l4.csum = 0;
455
            }
456
        }
457
    }
458

459
    return p;
460
}
461

462
static inline void DPDKSegmentedMbufWarning(struct rte_mbuf *mbuf)
463
{
464
    static thread_local bool segmented_mbufs_warned = false;
465
    if (!segmented_mbufs_warned && !rte_pktmbuf_is_contiguous(mbuf)) {
466
        char warn_s[] = "Segmented mbufs detected! Redmine Ticket #6012 "
467
                        "Check your configuration or report the issue";
468
        enum rte_proc_type_t eal_t = rte_eal_process_type();
469
        if (eal_t == RTE_PROC_SECONDARY) {
470
            SCLogWarning("%s. To avoid segmented mbufs, "
471
                         "try to increase mbuf size in your primary application",
472
                    warn_s);
473
        } else if (eal_t == RTE_PROC_PRIMARY) {
474
            SCLogWarning("%s. To avoid segmented mbufs, "
475
                         "try to increase MTU in your suricata.yaml",
476
                    warn_s);
477
        }
478

479
        segmented_mbufs_warned = true;
480
    }
481
}
482

483
static void PrintDPDKPortXstats(uint16_t port_id, const char *port_name)
484
{
485
    int ret = rte_eth_xstats_get(port_id, NULL, 0);
486
    if (ret <= 0) {
487
        SCLogPerf("%s: unable to obtain rte_eth_xstats (%s)", port_name,
488
                ret == 0 ? "not supported" : rte_strerror(-ret));
489
        return;
490
    }
491
    unsigned int len = (unsigned int)ret;
492
    struct rte_eth_xstat_name *xstats_names = NULL;
493
    struct rte_eth_xstat *xstats = SCCalloc(len, sizeof(*xstats));
494
    if (xstats == NULL) {
495
        SCLogWarning("Failed to allocate memory for the rte_eth_xstat structure");
496
        return;
497
    }
498

499
    ret = rte_eth_xstats_get(port_id, xstats, len);
500
    if (ret < 0 || (unsigned int)ret > len) {
501
        SCLogPerf("%s: unable to obtain rte_eth_xstats (%s)", port_name,
502
                ret < 0 ? rte_strerror(-ret) : "table size too small");
503
        goto cleanup;
504
    }
505
    xstats_names = SCCalloc(len, sizeof(*xstats_names));
506
    if (xstats_names == NULL) {
507
        SCLogWarning("Failed to allocate memory for the rte_eth_xstat_name array");
508
        goto cleanup;
509
    }
510
    ret = rte_eth_xstats_get_names(port_id, xstats_names, len);
511
    if (ret < 0 || (unsigned int)ret > len) {
512
        SCLogPerf("%s: unable to obtain names of rte_eth_xstats (%s)", port_name,
513
                ret < 0 ? rte_strerror(-ret) : "table size too small");
514
        goto cleanup;
515
    }
516
    for (unsigned int i = 0; i < len; i++) {
517
        if (xstats[i].value > 0)
518
            SCLogPerf("Port %u (%s) - %s: %" PRIu64, port_id, port_name, xstats_names[i].name,
519
                    xstats[i].value);
520
    }
521

522
cleanup:
523
    if (xstats != NULL)
524
        SCFree(xstats);
525
    if (xstats_names != NULL)
526
        SCFree(xstats_names);
527
}
528

529
static void HandleShutdown(DPDKThreadVars *ptv)
530
{
531
    SCLogDebug("Stopping Suricata!");
532
    SC_ATOMIC_ADD(ptv->workers_sync->worker_checked_in, 1);
533
    while (SC_ATOMIC_GET(ptv->workers_sync->worker_checked_in) < ptv->workers_sync->worker_cnt) {
534
        rte_delay_us(10);
535
    }
536
    // Dump counters while device is still running - some drivers (e.g. BNXT) fail
537
    // to report stats after the device is stopped
538
    DPDKDumpCounters(ptv);
539
    if (ptv->queue_id == 0) {
540
        PrintDPDKPortXstats(ptv->port_id, ptv->livedev->dev);
541
        rte_delay_us(20); // wait for all threads to get out of the sync loop
542
        SC_ATOMIC_SET(ptv->workers_sync->worker_checked_in, 0);
543
        // If Suricata runs in peered mode, the peer threads might still want to send
544
        // packets to our port. Instead, we know, that we are done with the peered port, so
545
        // we stop it. The peered threads will stop our port.
546
        if (ptv->copy_mode == DPDK_COPY_MODE_TAP || ptv->copy_mode == DPDK_COPY_MODE_IPS) {
547
            rte_eth_dev_stop(ptv->out_port_id);
548
        } else {
549
            // in IDS we stop our port - no peer threads are running
550
            rte_eth_dev_stop(ptv->port_id);
551
        }
552
    }
553
}
554

555
static void PeriodicDPDKDumpCounters(DPDKThreadVars *ptv)
556
{
557
    static thread_local SCTime_t last_dump = { 0 };
558
    SCTime_t current_time = TimeGet();
559
    /* Trigger one dump of stats every second */
560
    if (current_time.secs != last_dump.secs) {
561
        DPDKDumpCounters(ptv);
562
        last_dump = current_time;
563
    }
564
}
565

566
/**
567
 *  \brief Main DPDK reading Loop function
568
 */
569
static TmEcode ReceiveDPDKLoop(ThreadVars *tv, void *data, void *slot)
570
{
571
    SCEnter();
572
    DPDKThreadVars *ptv = (DPDKThreadVars *)data;
573
    ptv->slot = ((TmSlot *)slot)->slot_next;
574
    uint32_t mp_sz = ptv->livedev->dpdk_vars->pkt_mp[ptv->queue_id]->size;
575
    uint16_t burst_size = (uint16_t)MIN(BURST_SIZE, mp_sz);
576

577
    TmEcode ret = ReceiveDPDKLoopInit(tv, ptv);
578
    if (ret != TM_ECODE_OK) {
579
        SCReturnInt(ret);
580
    }
581
    while (true) {
582
        if (unlikely(suricata_ctl_flags != 0)) {
583
            HandleShutdown(ptv);
584
            break;
585
        }
586

587
        uint16_t nb_rx =
588
                rte_eth_rx_burst(ptv->port_id, ptv->queue_id, ptv->received_mbufs, burst_size);
589
        if (RXPacketCountHeuristic(tv, ptv, nb_rx)) {
590
            continue;
591
        }
592

593
        ptv->pkts += (uint64_t)nb_rx;
594
        for (uint16_t i = 0; i < nb_rx; i++) {
595
            Packet *p = PacketInitFromMbuf(ptv, ptv->received_mbufs[i]);
596
            if (p == NULL) {
597
                rte_pktmbuf_free(ptv->received_mbufs[i]);
598
                continue;
599
            }
600
            DPDKSegmentedMbufWarning(ptv->received_mbufs[i]);
601
            PacketSetData(p, rte_pktmbuf_mtod(p->dpdk_v.mbuf, uint8_t *),
602
                    rte_pktmbuf_pkt_len(p->dpdk_v.mbuf));
603
            if (TmThreadsSlotProcessPkt(ptv->tv, ptv->slot, p) != TM_ECODE_OK) {
604
                TmqhOutputPacketpool(ptv->tv, p);
605
                DPDKFreeMbufArray(ptv->received_mbufs, nb_rx - i - 1, i + 1);
606
                SCReturnInt(EXIT_FAILURE);
607
            }
608
        }
609

610
        PeriodicDPDKDumpCounters(ptv);
611
        StatsSyncCountersIfSignalled(&tv->stats);
612
    }
613

614
    SCReturnInt(TM_ECODE_OK);
615
}
616

617
/**
618
 * \brief Init function for ReceiveDPDK.
619
 *
620
 * \param tv pointer to ThreadVars
621
 * \param initdata pointer to the interface passed from the user
622
 * \param data pointer gets populated with DPDKThreadVars
623
 *
624
 */
625
static TmEcode ReceiveDPDKThreadInit(ThreadVars *tv, const void *initdata, void **data)
626
{
627
    SCEnter();
628
    int retval, thread_numa;
629
    DPDKThreadVars *ptv = NULL;
630
    DPDKIfaceConfig *dpdk_config = (DPDKIfaceConfig *)initdata;
631

632
    if (initdata == NULL) {
633
        SCLogError("DPDK configuration is NULL in thread initialization");
634
        goto fail;
635
    }
636

637
    ptv = SCCalloc(1, sizeof(DPDKThreadVars));
638
    if (unlikely(ptv == NULL)) {
639
        SCLogError("Unable to allocate memory");
640
        goto fail;
641
    }
642

643
    ptv->tv = tv;
644
    ptv->pkts = 0;
645
    ptv->bytes = 0;
646
    ptv->livedev = LiveGetDevice(dpdk_config->iface);
647

648
    ptv->capture_dpdk_packets = StatsRegisterCounter("capture.packets", &ptv->tv->stats);
649
    ptv->capture_dpdk_rx_errs = StatsRegisterCounter("capture.rx_errors", &ptv->tv->stats);
650
    ptv->capture_dpdk_tx_errs = StatsRegisterCounter("capture.tx_errors", &ptv->tv->stats);
651
    ptv->capture_dpdk_imissed = StatsRegisterCounter("capture.dpdk.imissed", &ptv->tv->stats);
652
    ptv->capture_dpdk_rx_no_mbufs = StatsRegisterCounter("capture.dpdk.no_mbufs", &ptv->tv->stats);
653
    ptv->capture_dpdk_ierrors = StatsRegisterCounter("capture.dpdk.ierrors", &ptv->tv->stats);
654

655
    ptv->copy_mode = dpdk_config->copy_mode;
656
    ptv->checksum_mode = dpdk_config->checksum_mode;
657

658
    ptv->threads = dpdk_config->threads;
659
    ptv->intr_enabled = (dpdk_config->flags & DPDK_IRQ_MODE) ? true : false;
660
    ptv->port_id = dpdk_config->port_id;
661
    ptv->out_port_id = dpdk_config->out_port_id;
662
    ptv->port_socket_id = dpdk_config->socket_id;
663

664
    thread_numa = GetNumaNode();
665
    if (thread_numa >= 0 && ptv->port_socket_id != SOCKET_ID_ANY &&
666
            thread_numa != ptv->port_socket_id) {
667
        SC_ATOMIC_ADD(dpdk_config->inconsistent_numa_cnt, 1);
668
        SCLogPerf("%s: NIC is on NUMA %d, thread on NUMA %d", dpdk_config->iface,
669
                ptv->port_socket_id, thread_numa);
670
    }
671

672
    ptv->workers_sync = dpdk_config->workers_sync;
673
    uint16_t queue_id = SC_ATOMIC_ADD(dpdk_config->queue_id, 1);
674
    ptv->queue_id = queue_id;
675

676
    // the last thread starts the device
677
    if (queue_id == dpdk_config->threads - 1) {
678
        retval = rte_eth_dev_start(ptv->port_id);
679
        if (retval < 0) {
680
            SCLogError("%s: error (%s) during device startup", dpdk_config->iface,
681
                    rte_strerror(-retval));
682
            goto fail;
683
        }
684

685
        struct rte_eth_dev_info dev_info;
686
        retval = rte_eth_dev_info_get(ptv->port_id, &dev_info);
687
        if (retval != 0) {
688
            SCLogError("%s: error (%s) when getting device info", dpdk_config->iface,
689
                    rte_strerror(-retval));
690
            goto fail;
691
        }
692

693
        uint32_t timeout = dpdk_config->linkup_timeout * 10;
694
        while (timeout > 0) {
695
            struct rte_eth_link link = { 0 };
696
            retval = rte_eth_link_get_nowait(ptv->port_id, &link);
697
            if (retval != 0) {
698
                if (retval == -ENOTSUP) {
699
                    SCLogInfo("%s: link status not supported, skipping", dpdk_config->iface);
700
                } else {
701
                    SCLogInfo("%s: error (%s) when getting link status, skipping",
702
                            dpdk_config->iface, rte_strerror(-retval));
703
                }
704
                break;
705
            }
706
            if (link.link_status) {
707
                char link_status_str[RTE_ETH_LINK_MAX_STR_LEN];
708
#if RTE_VERSION >= RTE_VERSION_NUM(20, 11, 0, 0)
709
#pragma GCC diagnostic push
710
#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
711
                rte_eth_link_to_str(link_status_str, sizeof(link_status_str), &link);
712
#pragma GCC diagnostic pop
713
#else
714
                snprintf(link_status_str, sizeof(link_status_str),
715
                        "Link Up, speed %u Mbps, %s", // 22 chars + 10 for digits + 11 for duplex
716
                        link.link_speed,
717
                        (link.link_duplex == ETH_LINK_FULL_DUPLEX) ? "full-duplex" : "half-duplex");
718
#endif
719

720
                SCLogInfo("%s: %s", dpdk_config->iface, link_status_str);
721
                break;
722
            }
723

724
            rte_delay_ms(100);
725
            timeout--;
726
        }
727

728
        if (dpdk_config->linkup_timeout && timeout == 0) {
729
            SCLogWarning("%s: link is down, trying to continue anyway", dpdk_config->iface);
730
        }
731

732
        // some PMDs requires additional actions only after the device has started
733
        DevicePostStartPMDSpecificActions(ptv, dev_info.driver_name);
734

735
        uint16_t inconsistent_numa_cnt = SC_ATOMIC_GET(dpdk_config->inconsistent_numa_cnt);
736
        if (inconsistent_numa_cnt > 0 && ptv->port_socket_id != SOCKET_ID_ANY) {
737
            SCLogWarning("%s: NIC is on NUMA %d, %u threads on different NUMA node(s)",
738
                    dpdk_config->iface, ptv->port_socket_id, inconsistent_numa_cnt);
739
        } else if (ptv->port_socket_id == SOCKET_ID_ANY && rte_socket_count() > 1) {
740
            SCLogNotice(
741
                    "%s: unable to determine NIC's NUMA node, degraded performance can be expected",
742
                    dpdk_config->iface);
743
        }
744
        if (ptv->intr_enabled) {
745
            rte_spinlock_init(&intr_lock[ptv->port_id]);
746
        }
747
    }
748

749
    *data = (void *)ptv;
750
    dpdk_config->DerefFunc(dpdk_config);
751
    SCReturnInt(TM_ECODE_OK);
752

753
fail:
754
    if (dpdk_config != NULL)
755
        dpdk_config->DerefFunc(dpdk_config);
756
    if (ptv != NULL)
757
        SCFree(ptv);
758
    SCReturnInt(TM_ECODE_FAILED);
759
}
760

761
/**
762
 * \brief DeInit function closes dpdk at exit.
763
 * \param tv pointer to ThreadVars
764
 * \param data pointer that gets cast into DPDKThreadVars for ptv
765
 */
766
static TmEcode ReceiveDPDKThreadDeinit(ThreadVars *tv, void *data)
767
{
768
    SCEnter();
769
    DPDKThreadVars *ptv = (DPDKThreadVars *)data;
770

771
    if (ptv->queue_id == 0) {
772
        struct rte_eth_dev_info dev_info;
773
        int retval = rte_eth_dev_info_get(ptv->port_id, &dev_info);
774
        if (retval != 0) {
775
            SCLogError("%s: error (%s) when getting device info", ptv->livedev->dev,
776
                    rte_strerror(-retval));
777
            SCReturnInt(TM_ECODE_FAILED);
778
        }
779

780
        DevicePreClosePMDSpecificActions(ptv, dev_info.driver_name);
781

782
        if (ptv->workers_sync) {
783
            SCFree(ptv->workers_sync);
784
        }
785
    }
786

787
    SCFree(ptv);
788
    SCReturnInt(TM_ECODE_OK);
789
}
790

791
/**
792
 * \brief This function passes off to link type decoders.
793
 *
794
 * DecodeDPDK decodes packets from DPDK and passes
795
 * them off to the proper link type decoder.
796
 *
797
 * \param t pointer to ThreadVars
798
 * \param p pointer to the current packet
799
 * \param data pointer that gets cast into DPDKThreadVars for ptv
800
 */
801
static TmEcode DecodeDPDK(ThreadVars *tv, Packet *p, void *data)
802
{
803
    SCEnter();
804
    DecodeThreadVars *dtv = (DecodeThreadVars *)data;
805

806
    BUG_ON(PKT_IS_PSEUDOPKT(p));
807

808
    /* update counters */
809
    DecodeUpdatePacketCounters(tv, dtv, p);
810

811
    /* If suri has set vlan during reading, we increase vlan counter */
812
    if (p->vlan_idx) {
813
        StatsCounterIncr(&tv->stats, dtv->counter_vlan);
814
    }
815

816
    /* call the decoder */
817
    DecodeLinkLayer(tv, dtv, p->datalink, p, GET_PKT_DATA(p), GET_PKT_LEN(p));
818

819
    PacketDecodeFinalize(tv, dtv, p);
820

821
    SCReturnInt(TM_ECODE_OK);
822
}
823

824
static TmEcode DecodeDPDKThreadInit(ThreadVars *tv, const void *initdata, void **data)
825
{
826
    SCEnter();
827
    DecodeThreadVars *dtv = NULL;
828

829
    dtv = DecodeThreadVarsAlloc(tv);
830

831
    if (dtv == NULL)
832
        SCReturnInt(TM_ECODE_FAILED);
833

834
    DecodeRegisterPerfCounters(dtv, tv);
835

836
    *data = (void *)dtv;
837

838
    SCReturnInt(TM_ECODE_OK);
839
}
840

841
static TmEcode DecodeDPDKThreadDeinit(ThreadVars *tv, void *data)
842
{
843
    SCEnter();
844
    if (data != NULL)
845
        DecodeThreadVarsFree(tv, data);
846
    SCReturnInt(TM_ECODE_OK);
847
}
848

849
#endif /* HAVE_DPDK */
850
/* eof */
851
/**
852
 * @}
853
 */
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc