• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ben-manes / caffeine / #5173

29 Dec 2025 05:27AM UTC coverage: 0.0% (-100.0%) from 100.0%
#5173

push

github

ben-manes
speed up development ci build

0 of 3838 branches covered (0.0%)

0 of 7869 relevant lines covered (0.0%)

0.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/jcache/src/main/java/com/github/benmanes/caffeine/jcache/event/EventDispatcher.java
1
/*
2
 * Copyright 2015 Ben Manes. All Rights Reserved.
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16
package com.github.benmanes.caffeine.jcache.event;
17

18
import static java.util.Objects.requireNonNull;
19

20
import java.lang.System.Logger;
21
import java.lang.System.Logger.Level;
22
import java.util.ArrayList;
23
import java.util.Collections;
24
import java.util.List;
25
import java.util.Set;
26
import java.util.concurrent.CompletableFuture;
27
import java.util.concurrent.CompletionException;
28
import java.util.concurrent.ConcurrentHashMap;
29
import java.util.concurrent.ConcurrentMap;
30
import java.util.concurrent.Executor;
31

32
import javax.cache.Cache;
33
import javax.cache.configuration.CacheEntryListenerConfiguration;
34
import javax.cache.event.CacheEntryEventFilter;
35
import javax.cache.event.CacheEntryListener;
36
import javax.cache.event.EventType;
37

38
import org.jspecify.annotations.Nullable;
39

40
import com.google.errorprone.annotations.Var;
41

42
/**
43
 * A dispatcher that publishes cache events to listeners for asynchronous execution.
44
 * <p>
45
 * A {@link CacheEntryListener} is required to receive events in the order of the actions being
46
 * performed on the associated key. This implementation supports this by using a dispatch queue for
47
 * each listener and key pair, and provides the following characteristics:
48
 * <ul>
49
 *   <li>A listener may be executed in parallel for events with different keys
50
 *   <li>A listener is executed sequentially for events with the same key. This creates a dependency
51
 *       relationship between events and waiting dependents do not consume threads.
52
 *   <li>A listener receives a single event per invocation; batch processing is not supported
53
 *   <li>Multiple listeners may be executed in parallel for the same event
54
 *   <li>Listeners process events at their own rate and do not explicitly block each other
55
 *   <li>Listeners share a pool of threads for event processing. A slow listener may limit the
56
 *       throughput if all threads are busy handling distinct events, causing the execution of other
57
 *       listeners to be delayed until the executor is able to process the work.
58
 * </ul>
59
 * <p>
60
 * Some listeners may be configured as <code>synchronous</code>, meaning that the publishing thread
61
 * should wait until the listener has processed the event. The calling thread should publish within
62
 * an atomic block that mutates the entry, and complete the operation by calling
63
 * {@link #awaitSynchronous()} or {@link #ignoreSynchronous()}.
64
 *
65
 * @author ben.manes@gmail.com (Ben Manes)
66
 */
67
public final class EventDispatcher<K, V> {
68
  static final Logger logger = System.getLogger(EventDispatcher.class.getName());
×
69

70
  final ConcurrentMap<
71
      Registration<K, V>,
72
      ConcurrentMap<K, CompletableFuture<@Nullable Void>>> dispatchQueues;
73
  final ThreadLocal<List<CompletableFuture<@Nullable Void>>> pending;
74
  final Executor executor;
75

76
  public EventDispatcher(Executor executor) {
×
77
    this.pending = ThreadLocal.withInitial(ArrayList::new);
×
78
    this.dispatchQueues = new ConcurrentHashMap<>();
×
79
    this.executor = requireNonNull(executor);
×
80
  }
×
81

82
  /** Returns the cache entry listener registrations. */
83
  public Set<Registration<K, V>> registrations() {
84
    return Collections.unmodifiableSet(dispatchQueues.keySet());
×
85
  }
86

87
  /**
88
   * Registers a cache entry listener based on the supplied configuration.
89
   *
90
   * @param configuration the listener's configuration.
91
   */
92
  @SuppressWarnings("PMD.CloseResource")
93
  public void register(CacheEntryListenerConfiguration<K, V> configuration) {
94
    if (configuration.getCacheEntryListenerFactory() == null) {
×
95
      return;
×
96
    }
97
    var listener = new EventTypeAwareListener<K, V>(
×
98
        configuration.getCacheEntryListenerFactory().create());
×
99

100
    var factory = configuration.getCacheEntryEventFilterFactory();
×
101
    CacheEntryEventFilter<K, V> filter = (factory == null)
×
102
        ? event -> true
×
103
        : new EventTypeFilter<>(listener, factory.create());
×
104

105
    var registration = new Registration<>(configuration, filter, listener);
×
106
    dispatchQueues.putIfAbsent(registration, new ConcurrentHashMap<>());
×
107
  }
×
108

109
  /**
110
   * Deregisters a cache entry listener based on the supplied configuration.
111
   *
112
   * @param configuration the listener's configuration.
113
   */
114
  public void deregister(CacheEntryListenerConfiguration<K, V> configuration) {
115
    requireNonNull(configuration);
×
116
    dispatchQueues.keySet().removeIf(registration ->
×
117
        configuration.equals(registration.getConfiguration()));
×
118
  }
×
119

120
  /**
121
   * Publishes a creation event for the entry to the interested listeners.
122
   *
123
   * @param cache the cache where the entry was created
124
   * @param key the entry's key
125
   * @param value the entry's value
126
   */
127
  public void publishCreated(Cache<K, V> cache, K key, V value) {
128
    publish(cache, EventType.CREATED, key, /* hasOldValue= */ false,
×
129
        /* oldValue= */ null, /* newValue= */ value, /* quiet= */ false);
130
  }
×
131

132
  /**
133
   * Publishes an update event for the entry to the interested listeners.
134
   *
135
   * @param cache the cache where the entry was updated
136
   * @param key the entry's key
137
   * @param oldValue the entry's old value
138
   * @param newValue the entry's new value
139
   */
140
  public void publishUpdated(Cache<K, V> cache, K key, V oldValue, V newValue) {
141
    publish(cache, EventType.UPDATED, key, /* hasOldValue= */ true,
×
142
        oldValue, newValue, /* quiet= */ false);
143
  }
×
144

145
  /**
146
   * Publishes a removal event for the entry to the interested listeners.
147
   *
148
   * @param cache the cache where the entry was removed
149
   * @param key the entry's key
150
   * @param value the entry's value
151
   */
152
  public void publishRemoved(Cache<K, V> cache, K key, V value) {
153
    publish(cache, EventType.REMOVED, key, /* hasOldValue= */ true,
×
154
        /* oldValue= */ value, /* newValue= */ value, /* quiet= */ false);
155
  }
×
156

157
  /**
158
   * Publishes a removal event for the entry to the interested listeners. This method does not
159
   * register the synchronous listener's future with {@link #awaitSynchronous()}.
160
   *
161
   * @param cache the cache where the entry was removed
162
   * @param key the entry's key
163
   * @param value the entry's value
164
   */
165
  public void publishRemovedQuietly(Cache<K, V> cache, K key, V value) {
166
    publish(cache, EventType.REMOVED, key, /* hasOldValue= */ true,
×
167
        /* oldValue= */ value, /* newValue= */ value, /* quiet= */ true);
168
  }
×
169

170
  /**
171
   * Publishes an expiration event for the entry to the interested listeners.
172
   *
173
   * @param cache the cache where the entry expired
174
   * @param key the entry's key
175
   * @param value the entry's value
176
   */
177
  public void publishExpired(Cache<K, V> cache, K key, V value) {
178
    publish(cache, EventType.EXPIRED, key, /* hasOldValue= */ true,
×
179
        /* oldValue= */ value, /* newValue= */ value, /* quiet= */ false);
180
  }
×
181

182
  /**
183
   * Publishes an expiration event for the entry to the interested listeners. This method does not
184
   * register the synchronous listener's future with {@link #awaitSynchronous()}.
185
   *
186
   * @param cache the cache where the entry expired
187
   * @param key the entry's key
188
   * @param value the entry's value
189
   */
190
  public void publishExpiredQuietly(Cache<K, V> cache, K key, V value) {
191
    publish(cache, EventType.EXPIRED, key, /* hasOldValue= */ true,
×
192
        /* oldValue= */ value, /* newValue= */ value, /* quiet= */ true);
193
  }
×
194

195
  /**
196
   * Blocks until all of the synchronous listeners have finished processing the events this thread
197
   * published.
198
   */
199
  public void awaitSynchronous() {
200
    var futures = pending.get();
×
201
    if (futures.isEmpty()) {
×
202
      return;
×
203
    }
204
    try {
205
      CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new)).join();
×
206
    } catch (CompletionException e) {
×
207
      logger.log(Level.WARNING, "", e);
×
208
    } finally {
209
      futures.clear();
×
210
    }
211
  }
×
212

213
  /**
214
   * Ignores and clears the queued futures to the synchronous listeners that are processing events
215
   * this thread published.
216
   */
217
  public void ignoreSynchronous() {
218
    pending.get().clear();
×
219
  }
×
220

221
  /** Broadcasts the event to the interested listener's dispatch queues. */
222
  @SuppressWarnings("FutureReturnValueIgnored")
223
  private void publish(Cache<K, V> cache, EventType eventType, K key,
224
      boolean hasOldValue, @Nullable V oldValue, @Nullable V newValue, boolean quiet) {
225
    if (dispatchQueues.isEmpty()) {
×
226
      return;
×
227
    }
228

229
    @Var JCacheEntryEvent<K, V> event = null;
×
230
    for (var entry : dispatchQueues.entrySet()) {
×
231
      var registration = entry.getKey();
×
232
      if (!registration.getCacheEntryListener().isCompatible(eventType)) {
×
233
        continue;
×
234
      }
235
      if (event == null) {
×
236
        event = new JCacheEntryEvent<>(cache, eventType, key, hasOldValue, oldValue, newValue);
×
237
      }
238
      if (!registration.getCacheEntryFilter().evaluate(event)) {
×
239
        continue;
×
240
      }
241

242
      JCacheEntryEvent<K, V> e = event;
×
243
      var dispatchQueue = entry.getValue();
×
244
      var future = dispatchQueue.compute(key, (k, queue) -> {
×
245
        Runnable action = () -> registration.getCacheEntryListener().dispatch(e);
×
246
        return (queue == null)
×
247
            ? CompletableFuture.runAsync(action, executor)
×
248
            : queue.thenRunAsync(action, executor);
×
249
      });
250
      future.whenComplete((result, error) -> {
×
251
        // optimistic check to avoid locking if not a match
252
        if (dispatchQueue.get(key) == future) {
×
253
          dispatchQueue.remove(key, future);
×
254
        }
255
      });
×
256
      if (registration.isSynchronous() && !quiet) {
×
257
        pending.get().add(future);
×
258
      }
259
    }
×
260
  }
×
261
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc