• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

OISF / suricata / 23374838686

21 Mar 2026 07:29AM UTC coverage: 59.341% (-20.0%) from 79.315%
23374838686

Pull #15075

github

web-flow
Merge 90b4e834f into 6587e363a
Pull Request #15075: Stack 8001 v16.4

38 of 70 new or added lines in 10 files covered. (54.29%)

34165 existing lines in 563 files now uncovered.

119621 of 201584 relevant lines covered (59.34%)

650666.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/log-flush.c
1
/* Copyright (C) 2024 Open Information Security Foundation
2
 *
3
 * You can copy, redistribute or modify this Program under the terms of
4
 * the GNU General Public License version 2 as published by the Free
5
 * Software Foundation.
6
 *
7
 * This program is distributed in the hope that it will be useful,
8
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10
 * GNU General Public License for more details.
11
 *
12
 * You should have received a copy of the GNU General Public License
13
 * version 2 along with this program; if not, write to the Free Software
14
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15
 * 02110-1301, USA.
16
 */
17

18
/**
19
 * \file
20
 *
21
 * \author Jeff Lucovsky <jlucovsky@oisf.net>
22
 */
23

24
#include "suricata-common.h"
25
#include "suricata.h"
26
#include "detect.h"
27
#include "detect-engine.h"
28
#include "flow-worker.h"
29
#include "log-flush.h"
30
#include "tm-threads.h"
31
#include "conf.h"
32
#include "conf-yaml-loader.h"
33
#include "util-privs.h"
34

35
/**
36
 * \brief Trigger detect threads to flush their output logs
37
 *
38
 * This function is intended to be called at regular intervals to force
39
 * buffered log data to be persisted
40
 */
41
static void WorkerFlushLogs(void)
42
{
×
43
    SCEnter();
×
44

45
    /* count detect threads in use */
46
    uint32_t no_of_detect_tvs = TmThreadCountThreadsByTmmFlags(TM_FLAG_FLOWWORKER_TM);
×
47
    /* can be zero in unix socket mode */
48
    if (no_of_detect_tvs == 0) {
×
49
        return;
×
50
    }
×
51

52
    /* prepare swap structures */
53
    void *fw_threads[no_of_detect_tvs];
×
54
    ThreadVars *detect_tvs[no_of_detect_tvs];
×
55
    memset(fw_threads, 0x00, (no_of_detect_tvs * sizeof(void *)));
×
56
    memset(detect_tvs, 0x00, (no_of_detect_tvs * sizeof(ThreadVars *)));
×
57

58
    /* start by initiating the log flushes */
59

60
    uint32_t i = 0;
×
61
    SCMutexLock(&tv_root_lock);
×
62
    /* get reference to tv's and setup fw_threads array */
63
    for (ThreadVars *tv = tv_root[TVT_PPT]; tv != NULL; tv = tv->next) {
×
64
        if ((tv->tmm_flags & TM_FLAG_FLOWWORKER_TM) == 0) {
×
65
            continue;
×
66
        }
×
67
        for (TmSlot *s = tv->tm_slots; s != NULL; s = s->slot_next) {
×
68
            TmModule *tm = TmModuleGetById(s->tm_id);
×
69
            if (!(tm->flags & TM_FLAG_FLOWWORKER_TM)) {
×
70
                continue;
×
71
            }
×
72

73
            if (suricata_ctl_flags != 0) {
×
74
                SCMutexUnlock(&tv_root_lock);
×
75
                goto error;
×
76
            }
×
77

78
            fw_threads[i] = FlowWorkerGetThreadData(SC_ATOMIC_GET(s->slot_data));
×
79
            if (fw_threads[i]) {
×
80
                FlowWorkerSetFlushAck(fw_threads[i]);
×
81
                SCLogDebug("Setting flush-ack for thread %s[i=%d]", tv->printable_name, i);
×
82
                detect_tvs[i] = tv;
×
83
            }
×
84

85
            i++;
×
86
            break;
×
87
        }
×
88
    }
×
89
    BUG_ON(i != no_of_detect_tvs);
×
90

91
    SCMutexUnlock(&tv_root_lock);
×
92

93
    SCLogDebug("Creating flush pseudo packets for %d threads", no_of_detect_tvs);
×
94
    InjectPacketsForFlush(detect_tvs, no_of_detect_tvs);
×
95

96
    uint32_t threads_done = 0;
×
97
retry:
×
98
    for (i = 0; i < no_of_detect_tvs; i++) {
×
99
        if (suricata_ctl_flags != 0) {
×
100
            threads_done = no_of_detect_tvs;
×
101
            break;
×
102
        }
×
103
        SleepMsec(1);
×
104
        if (fw_threads[i] && FlowWorkerGetFlushAck(fw_threads[i])) {
×
105
            SCLogDebug("thread slot %d has ack'd flush request", i);
×
106
            threads_done++;
×
107
        } else if (detect_tvs[i]) {
×
108
            SCLogDebug("thread slot %d not yet ack'd flush request", i);
×
109
            TmThreadsCaptureBreakLoop(detect_tvs[i]);
×
110
        }
×
111
    }
×
112
    if (threads_done < no_of_detect_tvs) {
×
113
        threads_done = 0;
×
114
        SleepMsec(250);
×
115
        goto retry;
×
116
    }
×
117

118
error:
×
119
    return;
×
120
}
×
121

122
static int OutputFlushInterval(void)
UNCOV
123
{
×
UNCOV
124
    intmax_t output_flush_interval = 0;
×
UNCOV
125
    if (SCConfGetInt("heartbeat.output-flush-interval", &output_flush_interval) == 0) {
×
UNCOV
126
        output_flush_interval = 0;
×
UNCOV
127
    }
×
UNCOV
128
    if (output_flush_interval < 0 || output_flush_interval > 60) {
×
129
        SCLogConfig("flush_interval must be 0 or less than 60; using 0");
×
130
        output_flush_interval = 0;
×
131
    }
×
132

UNCOV
133
    return (int)output_flush_interval;
×
UNCOV
134
}
×
135

136
static void *LogFlusherWakeupThread(void *arg)
137
{
×
138
    int output_flush_interval = OutputFlushInterval();
×
139
    /* This was checked by the logic creating this thread */
140
    BUG_ON(output_flush_interval == 0);
×
141

142
    SCLogConfig("Using output-flush-interval of %d seconds", output_flush_interval);
×
143
    /*
144
     * Calculate the number of sleep intervals based on the output flush interval. This is necessary
145
     * because this thread pauses a fixed amount of time to react to shutdown situations more
146
     * quickly.
147
     */
148
    const int log_flush_sleep_time = 500; /* milliseconds */
×
149
    const int flush_wait_count = (1000 * output_flush_interval) / log_flush_sleep_time;
×
150

151
    ThreadVars *tv_local = (ThreadVars *)arg;
×
152
    SCSetThreadName(tv_local->name);
×
153

154
    if (tv_local->thread_setup_flags != 0)
×
155
        TmThreadSetupOptions(tv_local);
×
156

157
    /* Set the threads capability */
158
    tv_local->cap_flags = 0;
×
159
    SCDropCaps(tv_local);
×
160

161
    TmThreadsSetFlag(tv_local, THV_INIT_DONE | THV_RUNNING);
×
162

163
    int wait_count = 0;
×
164
    uint64_t worker_flush_count = 0;
×
165
    bool run = TmThreadsWaitForUnpause(tv_local);
×
166
    while (run) {
×
167
        SleepMsec(log_flush_sleep_time);
×
168

169
        if (++wait_count == flush_wait_count) {
×
170
            worker_flush_count++;
×
171
            WorkerFlushLogs();
×
172
            wait_count = 0;
×
173
        }
×
174

175
        if (TmThreadsCheckFlag(tv_local, THV_KILL)) {
×
176
            break;
×
177
        }
×
178
    }
×
179

180
    TmThreadsSetFlag(tv_local, THV_RUNNING_DONE);
×
181
    TmThreadWaitForFlag(tv_local, THV_DEINIT);
×
182
    TmThreadsSetFlag(tv_local, THV_CLOSED);
×
183
    SCLogInfo("%s: initiated %" PRIu64 " flushes", tv_local->name, worker_flush_count);
×
184
    return NULL;
×
185
}
×
186

187
void LogFlushThreads(void)
UNCOV
188
{
×
UNCOV
189
    if (0 == OutputFlushInterval()) {
×
UNCOV
190
        SCLogConfig("log flusher thread not used with heartbeat.output-flush-interval of 0");
×
UNCOV
191
        return;
×
UNCOV
192
    }
×
193

194
    ThreadVars *tv_log_flush =
×
195
            TmThreadCreateMgmtThread(thread_name_heartbeat, LogFlusherWakeupThread, 1);
×
196
    if (!tv_log_flush || (TmThreadSpawn(tv_log_flush) != 0)) {
×
197
        FatalError("Unable to create and start log flush thread");
198
    }
×
199
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc