• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jhannes / logevents / #110

15 Apr 2024 10:25PM UTC coverage: 88.56% (-0.06%) from 88.622%
#110

push

jhannes
bugfix: NullPointerException on HttpServletResponseMDC.isAsset

2 of 2 new or added lines in 2 files covered. (100.0%)

6 existing lines in 4 files now uncovered.

5682 of 6416 relevant lines covered (88.56%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.91
/logevents/src/main/java/org/logevents/observers/AbstractBatchingLogEventObserver.java
1
package org.logevents.observers;
2

3
import org.logevents.LogEvent;
4
import org.logevents.LogEventObserver;
5
import org.logevents.config.Configuration;
6
import org.logevents.core.AbstractFilteredLogEventObserver;
7
import org.logevents.observers.batch.BatcherFactory;
8
import org.logevents.observers.batch.CooldownBatcher;
9
import org.logevents.observers.batch.CooldownBatcherFactory;
10
import org.logevents.observers.batch.LogEventBatch;
11
import org.logevents.observers.batch.LogEventBatcher;
12
import org.logevents.observers.batch.LogEventBatcherWithMdc;
13
import org.logevents.observers.batch.LogEventShutdownHook;
14
import org.logevents.observers.batch.ThrottleBatcherFactory;
15
import org.logevents.observers.batch.ThrottlingBatcher;
16
import org.logevents.util.DaemonThreadFactory;
17
import org.slf4j.Marker;
18
import org.slf4j.MarkerFactory;
19

20
import java.util.HashMap;
21
import java.util.Iterator;
22
import java.util.List;
23
import java.util.Map;
24
import java.util.concurrent.Executors;
25
import java.util.concurrent.ScheduledExecutorService;
26
import java.util.function.Consumer;
27

28
/**
29
 * Used to gather up a number of log event to process as a batch. This is useful
30
 * when using logging destinations where high frequency of messages would be
31
 * inefficient or noisy, such as email or Slack and for logging asynchronously to
32
 * external systems over network such as databases or Elasticsearch.
33
 * <p>
34
 * {@link AbstractBatchingLogEventObserver} can decide how to batch events with batchers,
35
 * for example {@link ThrottlingBatcher} and {@link CooldownBatcher}. When processing,
36
 * the {@link AbstractBatchingLogEventObserver} calls {@link #processBatch(List)}
37
 * with the whole batch to process the log events, which should be overridden by
38
 * subclasses. Consecutive Log events with the same message pattern and log level
39
 * are grouped together so the processor can easily ignore duplicate messages.
40
 *
41
 * <h3>Configuration example with cooldown (don't send messages too frequently)</h3>
42
 *
43
 *  The following will send the send messages at level WARN that don't have the {@link Marker}
44
 *  "PERSONAL". After each batch, wait at least 10 seconds before sending the next batch. But after
45
 *  each message is received, wait 2 seconds to see if more messages are coming. In any case, never
46
 *  wait more than 30 seconds before sending a batch.
47
 *
48
 * <pre>
49
 * observer.sample.threshold=WARN
50
 * observer.sample.suppressMarkers=PERSONAL_DATA
51
 * observer.sample.idleThreshold=PT2S
52
 * observer.sample.cooldownTime=PT10S
53
 * observer.sample.maximumWaitTime=PT30S
54
 * </pre>
55
 *
56
 * <h3>Configuration example with throttle (increasingly larger offsets)</h3>
57
 *
58
 *  The following will send the first message at level WARN with the {@link Marker} "DAEMON" immediately,
59
 *  then wait at least 30 seconds before sending the next, then 5 minutes, then 15 minutes between each
60
 *  batch.
61
 *
62
 * <pre>
63
 * observer.sample.threshold=WARN
64
 * observer.sample.requireMarker=DAEMON
65
 * observer.sample.throttle=PT30S PT5M PT15M
66
 * </pre>
67
 *
68
 * <h3>Marker-specific configuration</h3>
69
 *
70
 * The following will throttle messages with MY_MARKER, grouped by MDC value userId. If there is a log
71
 * event with an unused userId, it will be sent immediately. Then all events for the next minute
72
 * <em>for the same userId</em> will be collected, then the next hour and then the next day. If there
73
 * are log events for other userIds, they will be batched separately. This is useful for situations like
74
 * API batch users where the credentials have expired, for example.
75
 *
76
 * <pre>
77
 * observer.sample.markers.MY_MARKER.throttle=PT1M PT1H PT24H
78
 * observer.sample.markers.MY_MARKER.mdc=userId
79
 * </pre>
80
 *
81
 * @see SlackLogEventObserver
82
 * @see MicrosoftTeamsLogEventObserver
83
 * @see SmtpLogEventObserver
84
 * @see ElasticsearchLogEventObserver
85
 * @see DatabaseLogEventObserver
86
 *
87
 * @author Johannes Brodwall
88
 */
89
public abstract class AbstractBatchingLogEventObserver extends AbstractFilteredLogEventObserver {
90

91
    protected static ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3,
1✔
92
            new DaemonThreadFactory("BatchingLogEventObserver", 3));
93

94
    protected static LogEventShutdownHook shutdownHook = new LogEventShutdownHook();
1✔
95
    static {
96
        Runtime.getRuntime().addShutdownHook(shutdownHook);
1✔
97
    }
1✔
98

99
    private final Map<Marker, LogEventObserver> markerBatchers = new HashMap<>();
1✔
100
    protected LogEventObserver defaultBatcher;
101
    protected ScheduledExecutorService executor;
102

103
    public AbstractBatchingLogEventObserver() {
104
        this(scheduledExecutorService);
1✔
105
    }
1✔
106

107
    public AbstractBatchingLogEventObserver(ScheduledExecutorService executor) {
1✔
108
        defaultBatcher = new LogEventBatcher(new CooldownBatcherFactory(executor, shutdownHook).createBatcher(this::processBatch));
1✔
109
        this.executor = executor;
1✔
110
    }
1✔
111

112
    /**
113
     * Read <code>idleThreshold</code>, <code>cooldownTime</code> and <code>maximumWaitTime</code>
114
     * from configuration
115
     */
116
    protected void configureBatching(Configuration configuration) {
117
        this.defaultBatcher = new LogEventBatcher(getBatcherFactory(configuration, "").createBatcher(this::processBatch));
1✔
118
    }
1✔
119

120
    @Override
121
    protected final void doLogEvent(LogEvent logEvent) {
122
        getBatcher(logEvent).logEvent(logEvent);
1✔
123
    }
1✔
124

125
    protected LogEventObserver getBatcher(LogEvent logEvent) {
126
        if (logEvent.getMarker() != null) {
1✔
127
            if (markerBatchers.containsKey(logEvent.getMarker())) {
1✔
128
                return markerBatchers.get(logEvent.getMarker());
1✔
129
            }
130
            Iterator<Marker> iterator = logEvent.getMarker().iterator();
1✔
131
            while (iterator.hasNext()) {
1✔
132
                Marker next = iterator.next();
1✔
133
                if (markerBatchers.containsKey(next)) {
1✔
134
                    return markerBatchers.get(next);
1✔
135
                }
136
            }
×
137
        }
138
        return defaultBatcher;
1✔
139
    }
140

141
    protected void processBatch(List<LogEvent> batch) {
UNCOV
142
        if (!batch.isEmpty()) {
×
UNCOV
143
            processBatch(new LogEventBatch(batch));
×
144
        }
UNCOV
145
    }
×
146

147
    protected abstract void processBatch(LogEventBatch batch);
148

149
    @Override
150
    public String toString() {
151
        return getClass().getSimpleName();
1✔
152
    }
153

154
    /**
155
     * Configure throttling for the provided marker. Throttling is a string like <code>PT10M PT30M</code>
156
     * indicating a list of periods (PT). In this example, after one message is sent, all messages
157
     * with this marker will be batched up for the next ten minutes (PT10M). If any messages were collected,
158
     * the next batch will be sent after 30 minutes (PT30M). The last throttling period will repeat
159
     * until a period passes with no batched messages.
160
     */
161
    public void configureMarkers(Configuration configuration) {
162
        for (String markerName : configuration.listProperties("markers")) {
1✔
163
            markerBatchers.put(MarkerFactory.getMarker(markerName), createBatcher(configuration, markerName));
1✔
164
        }
1✔
165
    }
1✔
166

167
    protected LogEventObserver createBatcher(Configuration configuration, String markerName) {
168
        BatcherFactory batcherFactory = getBatcherFactory(configuration, "markers." + markerName + ".");
1✔
169
        return configuration.optionalString("markers." + markerName + ".mdc")
1✔
170
                .map(mdcKey -> createMdcBatcher(batcherFactory, configuration, markerName, configuration.getString("markers." + markerName + ".mdc")))
1✔
171
                .orElseGet(() -> new LogEventBatcher(batcherFactory.createBatcher(createProcessor(configuration, markerName))));
1✔
172
    }
173

174
    protected Consumer<List<LogEvent>> createProcessor(Configuration configuration, String markerName) {
175
        return this::processBatch;
1✔
176
    }
177

178
    protected BatcherFactory getBatcherFactory(Configuration configuration, String prefix) {
179
        return configuration.optionalString(prefix + "throttle")
1✔
180
                .map(t -> (BatcherFactory)new ThrottleBatcherFactory(executor, shutdownHook, t))
1✔
181
                .orElseGet(() -> new CooldownBatcherFactory(executor, shutdownHook, configuration, prefix));
1✔
182
    }
183

184
    protected LogEventObserver createMdcBatcher(BatcherFactory batcherFactory, Configuration configuration, String markerName, String mdcKey) {
185
        return new LogEventBatcherWithMdc(batcherFactory, markerName, mdcKey, this::processBatch);
1✔
186
    }
187

188
    protected LogEventObserver getMarkerBatcher(Marker myMarker) {
189
        return markerBatchers.get(myMarker);
1✔
190
    }
191

192
    protected LogEventObserver getDefaultBatcher() {
193
        return defaultBatcher;
1✔
194
    }
195
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc