• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ben-manes / caffeine / #4956

12 Jul 2025 09:17PM UTC coverage: 99.987% (-0.01%) from 100.0%
#4956

push

github

ben-manes
minor touchups

3794 of 3800 branches covered (99.84%)

1 of 1 new or added line in 1 file covered. (100.0%)

1 existing line in 1 file now uncovered.

7811 of 7812 relevant lines covered (99.99%)

1.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.78
/caffeine/src/main/java/com/github/benmanes/caffeine/cache/StripedBuffer.java
1
/*
2
 * Copyright 2015 Ben Manes. All Rights Reserved.
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16
/*
17
 * Written by Doug Lea with assistance from members of JCP JSR-166
18
 * Expert Group and released to the public domain, as explained at
19
 * http://creativecommons.org/publicdomain/zero/1.0/
20
 */
21
package com.github.benmanes.caffeine.cache;
22

23
import static com.github.benmanes.caffeine.cache.Caffeine.ceilingPowerOfTwo;
24

25
import java.lang.invoke.MethodHandles;
26
import java.lang.invoke.VarHandle;
27
import java.util.Arrays;
28
import java.util.function.Consumer;
29

30
import org.jspecify.annotations.Nullable;
31

32
import com.google.errorprone.annotations.Var;
33

34
/**
35
 * A base class providing the mechanics for supporting dynamic striping of bounded buffers. This
36
 * implementation is an adaption of the numeric 64-bit <i>java.util.concurrent.atomic.Striped64</i>
37
 * class, which is used by atomic counters. The approach was modified to lazily grow an array of
38
 * buffers in order to minimize memory usage for caches that are not heavily contended on.
39
 *
40
 * @author dl@cs.oswego.edu (Doug Lea)
41
 * @author ben.manes@gmail.com (Ben Manes)
42
 */
43
abstract class StripedBuffer<E> implements Buffer<E> {
1✔
44
  /*
45
   * This class maintains a lazily-initialized table of atomically updated buffers. The table size
46
   * is a power of two. Indexing uses masked per-thread hash codes. Nearly all declarations in this
47
   * class are package-private, accessed directly by subclasses.
48
   *
49
   * Table entries are of class Buffer and should be padded to reduce cache contention. Padding is
50
   * overkill for most atomics because they are usually irregularly scattered in memory and thus
51
   * don't interfere much with each other. But atomic objects residing in arrays will tend to be
52
   * placed adjacent to each other, and so will most often share cache lines (with a huge negative
53
   * performance impact) without this precaution.
54
   *
55
   * In part because Buffers are relatively large, we avoid creating them until they are needed.
56
   * When there is no contention, all updates are made to a single buffer. Upon contention (a failed
57
   * CAS inserting into the buffer), the table is expanded to size 2. The table size is doubled upon
58
   * further contention until reaching the nearest power of two greater than or equal to the number
59
   * of CPUS. Table slots remain empty (null) until they are needed.
60
   *
61
   * A single spinlock ("tableBusy") is used for initializing and resizing the table, as well as
62
   * populating slots with new Buffers. There is no need for a blocking lock; when the lock is not
63
   * available, threads try other slots. During these retries, there is increased contention and
64
   * reduced locality, which is still better than alternatives.
65
   *
66
   * Contention and/or table collisions are indicated by failed CASes when performing an update
67
   * operation. Upon a collision, if the table size is less than the capacity, it is doubled in size
68
   * unless some other thread holds the lock. If a hashed slot is empty, and lock is available, a
69
   * new Buffer is created. Otherwise, if the slot exists, a CAS is tried. The thread id serves as
70
   * the base for per-thread hash codes. Retries proceed by "incremental hashing", using the top
71
   * half of the seed to increment the bottom half which is used as a probe to try to find a free
72
   * slot.
73
   *
74
   * The table size is capped because, when there are more threads than CPUs, supposing that each
75
   * thread were bound to a CPU, there would exist a perfect hash function mapping threads to slots
76
   * that eliminates collisions. When we reach capacity, we search for this mapping by varying the
77
   * hash codes of colliding threads. Because search is random, and collisions only become known via
78
   * CAS failures, convergence can be slow, and because threads are typically not bound to CPUs
79
   * forever, may not occur at all. However, despite these limitations, observed contention rates
80
   * are typically low in these cases.
81
   *
82
   * It is possible for a Buffer to become unused when threads that once hashed to it terminate, as
83
   * well as in the case where doubling the table causes no thread to hash to it under expanded
84
   * mask. We do not try to detect or remove buffers, under the assumption that for long-running
85
   * instances, observed contention levels will recur, so the buffers will eventually be needed
86
   * again; and for short-lived ones, it does not matter.
87
   */
88

89
  static final VarHandle TABLE_BUSY = findVarHandle(StripedBuffer.class, "tableBusy", int.class);
1✔
90

91
  /** Number of CPUS. */
92
  static final int NCPU = Runtime.getRuntime().availableProcessors();
1✔
93

94
  /** The bound on the table size. */
95
  static final int MAXIMUM_TABLE_SIZE = 4 * ceilingPowerOfTwo(NCPU);
1✔
96

97
  /** The maximum number of attempts when trying to expand the table. */
98
  static final int ATTEMPTS = 3;
99

100
  /** Table of buffers. When non-null, size is a power of 2. */
101
  volatile Buffer<E> @Nullable[] table;
102

103
  /** Spinlock (locked via CAS) used when resizing and/or creating Buffers. */
104
  volatile int tableBusy;
105

106
  /** CASes the tableBusy field from 0 to 1 to acquire lock. */
107
  final boolean casTableBusy() {
108
    return TABLE_BUSY.compareAndSet(this, 0, 1);
1✔
109
  }
110

111
  /**
112
   * Creates a new buffer instance after resizing to accommodate a producer.
113
   *
114
   * @param e the producer's element
115
   * @return a newly created buffer populated with a single element
116
   */
117
  protected abstract Buffer<E> create(E e);
118

119
  @Override
120
  @SuppressWarnings("Varifier")
121
  public int offer(E e) {
122
    @SuppressWarnings("deprecation")
123
    long z = mix64(Thread.currentThread().getId());
1✔
124
    int increment = ((int) (z >>> 32)) | 1;
1✔
125
    int h = (int) z;
1✔
126

127
    int mask;
128
    int result;
129
    Buffer<E> buffer;
130
    @Var boolean uncontended = true;
1✔
131
    Buffer<E>[] buffers = table;
1✔
132
    if ((buffers == null)
1✔
133
        || ((mask = buffers.length - 1) < 0)
134
        || ((buffer = buffers[h & mask]) == null)
135
        || !(uncontended = ((result = buffer.offer(e)) != Buffer.FAILED))) {
1✔
136
      return expandOrRetry(e, h, increment, uncontended);
1✔
137
    }
138
    return result;
1✔
139
  }
140

141
  /**
142
   * Handles cases of updates involving initialization, resizing, creating new Buffers, and/or
143
   * contention. See above for explanation. This method suffers the usual non-modularity problems of
144
   * optimistic retry code, relying on rechecked sets of reads.
145
   *
146
   * @param e the element to add
147
   * @param h the thread's hash
148
   * @param increment the amount to increment by when rehashing
149
   * @param wasUncontended false if CAS failed before this call
150
   * @return {@code Buffer.SUCCESS}, {@code Buffer.FAILED}, or {@code Buffer.FULL}
151
   */
152
  final int expandOrRetry(E e, @Var int h, int increment, @Var boolean wasUncontended) {
153
    @Var int result = Buffer.FAILED;
1✔
154
    @Var boolean collide = false; // True if last slot nonempty
1✔
155
    for (int attempt = 0; attempt < ATTEMPTS; attempt++) {
1✔
156
      Buffer<E>[] buffers;
157
      Buffer<E> buffer;
158
      int n;
159
      if (((buffers = table) != null) && ((n = buffers.length) > 0)) {
1✔
160
        if ((buffer = buffers[(n - 1) & h]) == null) {
1✔
161
          if ((tableBusy == 0) && casTableBusy()) { // Try to attach new Buffer
1✔
162
            try { // Recheck under lock
163
              Buffer<E>[] rs;
164
              int mask;
165
              int j;
166
              if (((rs = table) != null) && ((mask = rs.length) > 0)
1!
167
                  && (rs[j = (mask - 1) & h] == null)) {
168
                result = Buffer.SUCCESS;
1✔
169
                rs[j] = create(e);
1✔
170
                break;
171
              }
172
            } finally {
173
              tableBusy = 0;
1✔
174
            }
UNCOV
175
            continue; // Slot is now non-empty
×
176
          }
177
          collide = false;
1✔
178
        } else if (!wasUncontended) { // CAS already known to fail
1✔
179
          wasUncontended = true;      // Continue after rehash
1✔
180
        } else if ((result = buffer.offer(e)) != Buffer.FAILED) {
1✔
181
          break;
1✔
182
        } else if ((n >= MAXIMUM_TABLE_SIZE) || (table != buffers)) {
1✔
183
          collide = false; // At max size or stale
1✔
184
        } else if (!collide) {
1✔
185
          collide = true;
1✔
186
        } else if ((tableBusy == 0) && casTableBusy()) {
1✔
187
          try {
188
            if (table == buffers) { // Expand table unless stale
1!
189
              table = Arrays.copyOf(buffers, n << 1);
1✔
190
            }
191
          } finally {
192
            tableBusy = 0;
1✔
193
          }
194
          collide = false;
1✔
195
          continue; // Retry with expanded table
1✔
196
        }
197
        h += increment;
1✔
198
      } else if ((tableBusy == 0) && (table == buffers) && casTableBusy()) {
1✔
199
        @Var boolean init = false;
1✔
200
        try { // Initialize table
201
          if (table == buffers) {
1!
202
            @SuppressWarnings({"rawtypes", "unchecked"})
203
            Buffer<E>[] rs = new Buffer[1];
1✔
204
            rs[0] = create(e);
1✔
205
            table = rs;
1✔
206
            init = true;
1✔
207
          }
208
        } finally {
209
          tableBusy = 0;
1✔
210
        }
211
        if (init) {
1!
212
          result = Buffer.SUCCESS;
1✔
213
          break;
1✔
214
        }
215
      }
216
    }
217
    return result;
1✔
218
  }
219

220
  @Override
221
  public void drainTo(Consumer<E> consumer) {
222
    Buffer<E>[] buffers = table;
1✔
223
    if (buffers == null) {
1✔
224
      return;
1✔
225
    }
226
    for (Buffer<E> buffer : buffers) {
1✔
227
      if (buffer != null) {
1✔
228
        buffer.drainTo(consumer);
1✔
229
      }
230
    }
231
  }
1✔
232

233
  @Override
234
  public long reads() {
235
    Buffer<E>[] buffers = table;
1✔
236
    if (buffers == null) {
1✔
237
      return 0;
1✔
238
    }
239
    @Var long reads = 0;
1✔
240
    for (Buffer<E> buffer : buffers) {
1✔
241
      if (buffer != null) {
1✔
242
        reads += buffer.reads();
1✔
243
      }
244
    }
245
    return reads;
1✔
246
  }
247

248
  @Override
249
  public long writes() {
250
    Buffer<E>[] buffers = table;
1✔
251
    if (buffers == null) {
1✔
252
      return 0;
1✔
253
    }
254
    @Var long writes = 0;
1✔
255
    for (Buffer<E> buffer : buffers) {
1✔
256
      if (buffer != null) {
1✔
257
        writes += buffer.writes();
1✔
258
      }
259
    }
260
    return writes;
1✔
261
  }
262

263
  /** Computes Stafford variant 13 of 64-bit mix function. */
264
  static long mix64(@Var long z) {
265
    z = (z ^ (z >>> 30)) * 0xbf58476d1ce4e5b9L;
1✔
266
    z = (z ^ (z >>> 27)) * 0x94d049bb133111ebL;
1✔
267
    return z ^ (z >>> 31);
1✔
268
  }
269

270
  static VarHandle findVarHandle(Class<?> recv, String name, Class<?> type) {
271
    try {
272
      return MethodHandles.lookup().findVarHandle(recv, name, type);
1✔
273
    } catch (ReflectiveOperationException e) {
1✔
274
      throw new ExceptionInInitializerError(e);
1✔
275
    }
276
  }
277
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc