• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ben-manes / caffeine / #5264

01 Feb 2026 08:27AM UTC coverage: 99.72% (-0.3%) from 100.0%
#5264

push

github

ben-manes
migrate test suite to junit

3831 of 3838 branches covered (99.82%)

7848 of 7870 relevant lines covered (99.72%)

1.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

99.47
/caffeine/src/main/java/com/github/benmanes/caffeine/cache/MpscGrowableArrayQueue.java
1
/*
2
 * Licensed under the Apache License, Version 2.0 (the "License");
3
 * you may not use this file except in compliance with the License.
4
 * You may obtain a copy of the License at
5
 *
6
 *     http://www.apache.org/licenses/LICENSE-2.0
7
 *
8
 * Unless required by applicable law or agreed to in writing, software
9
 * distributed under the License is distributed on an "AS IS" BASIS,
10
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
 * See the License for the specific language governing permissions and
12
 * limitations under the License.
13
 */
14
package com.github.benmanes.caffeine.cache;
15

16
import static com.github.benmanes.caffeine.cache.Caffeine.ceilingPowerOfTwo;
17
import static com.github.benmanes.caffeine.cache.Caffeine.requireArgument;
18
import static com.github.benmanes.caffeine.cache.Caffeine.requireState;
19
import static java.util.Objects.requireNonNull;
20

21
import java.lang.invoke.MethodHandles;
22
import java.lang.invoke.MethodHandles.Lookup;
23
import java.lang.invoke.VarHandle;
24
import java.util.AbstractQueue;
25
import java.util.Iterator;
26

27
import org.jspecify.annotations.Nullable;
28

29
import com.google.errorprone.annotations.Var;
30

31
/**
32
 * An MPSC array queue which starts at <i>initialCapacity</i> and grows to <i>maxCapacity</i> in
33
 * linked chunks of the initial size. The queue grows only when the current buffer is full and
34
 * elements are not copied on resize, instead a link to the new buffer is stored in the old buffer
35
 * for the consumer to follow.<br>
36
 * <p>
37
 * This is a shaded copy of <code>MpscGrowableArrayQueue</code> provided by
38
 * <a href="https://github.com/JCTools/JCTools">JCTools</a> from version 2.0.
39
 *
40
 * @author nitsanw@yahoo.com (Nitsan Wakart)
41
 */
42
class MpscGrowableArrayQueue<E> extends MpscChunkedArrayQueue<E> {
43

44
  /**
45
   * @param initialCapacity the queue initial capacity. If chunk size is fixed this will be the
46
   *        chunk size. Must be 2 or more.
47
   * @param maxCapacity the maximum capacity will be rounded up to the closest power of 2 and will
48
   *        be the upper limit of number of elements in this queue. Must be 4 or more and round up
49
   *        to a larger power of 2 than initialCapacity.
50
   */
51
  MpscGrowableArrayQueue(int initialCapacity, int maxCapacity) {
52
    super(initialCapacity, maxCapacity);
1✔
53
  }
1✔
54

55
  @Override
56
  protected int getNextBufferSize(@Nullable E[] buffer) {
57
    long maxSize = maxQueueCapacity / 2;
1✔
58
    requireState(maxSize >= buffer.length);
1✔
59
    int newSize = 2 * (buffer.length - 1);
1✔
60
    return newSize + 1;
1✔
61
  }
62

63
  @Override
64
  protected long getCurrentBufferCapacity(long mask) {
65
    return (mask + 2 == maxQueueCapacity) ? maxQueueCapacity : mask;
1✔
66
  }
67
}
68

69
@SuppressWarnings({"MultiVariableDeclaration",
70
    "OvershadowingSubclassFields", "PMD.OneDeclarationPerLine", "unused"})
71
abstract class MpscChunkedArrayQueue<E> extends MpscChunkedArrayQueueColdProducerFields<E> {
72
  byte p000, p001, p002, p003, p004, p005, p006, p007;
73
  byte p008, p009, p010, p011, p012, p013, p014, p015;
74
  byte p016, p017, p018, p019, p020, p021, p022, p023;
75
  byte p024, p025, p026, p027, p028, p029, p030, p031;
76
  byte p032, p033, p034, p035, p036, p037, p038, p039;
77
  byte p040, p041, p042, p043, p044, p045, p046, p047;
78
  byte p048, p049, p050, p051, p052, p053, p054, p055;
79
  byte p056, p057, p058, p059, p060, p061, p062, p063;
80
  byte p064, p065, p066, p067, p068, p069, p070, p071;
81
  byte p072, p073, p074, p075, p076, p077, p078, p079;
82
  byte p080, p081, p082, p083, p084, p085, p086, p087;
83
  byte p088, p089, p090, p091, p092, p093, p094, p095;
84
  byte p096, p097, p098, p099, p100, p101, p102, p103;
85
  byte p104, p105, p106, p107, p108, p109, p110, p111;
86
  byte p112, p113, p114, p115, p116, p117, p118, p119;
87

88
  MpscChunkedArrayQueue(int initialCapacity, int maxCapacity) {
89
    super(initialCapacity, maxCapacity);
1✔
90
  }
1✔
91

92
  @Override
93
  protected long availableInQueue(long pIndex, long cIndex) {
94
    return maxQueueCapacity - (pIndex - cIndex);
1✔
95
  }
96

97
  @Override
98
  public int capacity() {
99
    return (int) (maxQueueCapacity / 2);
1✔
100
  }
101
}
102

103
abstract class MpscChunkedArrayQueueColdProducerFields<E> extends BaseMpscLinkedArrayQueue<E> {
104
  protected final long maxQueueCapacity;
105

106
  MpscChunkedArrayQueueColdProducerFields(int initialCapacity, int maxCapacity) {
107
    super(initialCapacity);
1✔
108
    requireArgument(maxCapacity >= 4, "Max capacity must be 4 or more");
1✔
109
    requireArgument(ceilingPowerOfTwo(maxCapacity) >= ceilingPowerOfTwo(initialCapacity),
1✔
110
        "Initial capacity cannot exceed maximum capacity(both rounded up to a power of 2)");
111
    maxQueueCapacity = ((long) ceilingPowerOfTwo(maxCapacity)) << 1;
1✔
112
  }
1✔
113
}
114

115
@SuppressWarnings({"MultiVariableDeclaration", "PMD.OneDeclarationPerLine", "unused"})
116
abstract class BaseMpscLinkedArrayQueuePad1<E> extends AbstractQueue<E> {
1✔
117
  byte p000, p001, p002, p003, p004, p005, p006, p007;
118
  byte p008, p009, p010, p011, p012, p013, p014, p015;
119
  byte p016, p017, p018, p019, p020, p021, p022, p023;
120
  byte p024, p025, p026, p027, p028, p029, p030, p031;
121
  byte p032, p033, p034, p035, p036, p037, p038, p039;
122
  byte p040, p041, p042, p043, p044, p045, p046, p047;
123
  byte p048, p049, p050, p051, p052, p053, p054, p055;
124
  byte p056, p057, p058, p059, p060, p061, p062, p063;
125
  byte p064, p065, p066, p067, p068, p069, p070, p071;
126
  byte p072, p073, p074, p075, p076, p077, p078, p079;
127
  byte p080, p081, p082, p083, p084, p085, p086, p087;
128
  byte p088, p089, p090, p091, p092, p093, p094, p095;
129
  byte p096, p097, p098, p099, p100, p101, p102, p103;
130
  byte p104, p105, p106, p107, p108, p109, p110, p111;
131
  byte p112, p113, p114, p115, p116, p117, p118, p119;
132
}
133

134
abstract class BaseMpscLinkedArrayQueueProducerFields<E> extends BaseMpscLinkedArrayQueuePad1<E> {
1✔
135
  protected long producerIndex;
136
}
137

138
@SuppressWarnings({"MultiVariableDeclaration", "OvershadowingSubclassFields",
139
    "PMD.OneDeclarationPerLine", "unused"})
140
abstract class BaseMpscLinkedArrayQueuePad2<E> extends BaseMpscLinkedArrayQueueProducerFields<E> {
1✔
141
  byte p000, p001, p002, p003, p004, p005, p006, p007;
142
  byte p008, p009, p010, p011, p012, p013, p014, p015;
143
  byte p016, p017, p018, p019, p020, p021, p022, p023;
144
  byte p024, p025, p026, p027, p028, p029, p030, p031;
145
  byte p032, p033, p034, p035, p036, p037, p038, p039;
146
  byte p040, p041, p042, p043, p044, p045, p046, p047;
147
  byte p048, p049, p050, p051, p052, p053, p054, p055;
148
  byte p056, p057, p058, p059, p060, p061, p062, p063;
149
  byte p064, p065, p066, p067, p068, p069, p070, p071;
150
  byte p072, p073, p074, p075, p076, p077, p078, p079;
151
  byte p080, p081, p082, p083, p084, p085, p086, p087;
152
  byte p088, p089, p090, p091, p092, p093, p094, p095;
153
  byte p096, p097, p098, p099, p100, p101, p102, p103;
154
  byte p104, p105, p106, p107, p108, p109, p110, p111;
155
  byte p112, p113, p114, p115, p116, p117, p118, p119;
156
}
157

158
abstract class BaseMpscLinkedArrayQueueConsumerFields<E> extends BaseMpscLinkedArrayQueuePad2<E> {
159
  protected @Nullable E[] consumerBuffer;
160
  protected long consumerIndex;
161
  protected long consumerMask;
162

163
  BaseMpscLinkedArrayQueueConsumerFields(int initialCapacity) {
1✔
164
    requireArgument(initialCapacity >= 2, "Initial capacity must be 2 or more");
1✔
165
    int p2capacity = ceilingPowerOfTwo(initialCapacity);
1✔
166
    // leave lower bit of mask clear
167
    long mask = (p2capacity - 1L) << 1;
1✔
168
    // need extra element to point at next array
169
    @Nullable E[] buffer = allocate(p2capacity + 1);
1✔
170
    consumerBuffer = buffer;
1✔
171
    consumerMask = mask;
1✔
172
  }
1✔
173

174
  @SuppressWarnings("unchecked")
175
  public static <E> @Nullable E[] allocate(int capacity) {
176
    return (E[]) new Object[capacity];
1✔
177
  }
178
}
179

180
@SuppressWarnings({"MultiVariableDeclaration",
181
    "OvershadowingSubclassFields", "PMD.OneDeclarationPerLine", "unused"})
182
abstract class BaseMpscLinkedArrayQueuePad3<E> extends BaseMpscLinkedArrayQueueConsumerFields<E> {
183
  byte p000, p001, p002, p003, p004, p005, p006, p007;
184
  byte p008, p009, p010, p011, p012, p013, p014, p015;
185
  byte p016, p017, p018, p019, p020, p021, p022, p023;
186
  byte p024, p025, p026, p027, p028, p029, p030, p031;
187
  byte p032, p033, p034, p035, p036, p037, p038, p039;
188
  byte p040, p041, p042, p043, p044, p045, p046, p047;
189
  byte p048, p049, p050, p051, p052, p053, p054, p055;
190
  byte p056, p057, p058, p059, p060, p061, p062, p063;
191
  byte p064, p065, p066, p067, p068, p069, p070, p071;
192
  byte p072, p073, p074, p075, p076, p077, p078, p079;
193
  byte p080, p081, p082, p083, p084, p085, p086, p087;
194
  byte p088, p089, p090, p091, p092, p093, p094, p095;
195
  byte p096, p097, p098, p099, p100, p101, p102, p103;
196
  byte p104, p105, p106, p107, p108, p109, p110, p111;
197
  byte p112, p113, p114, p115, p116, p117, p118, p119;
198

199
  BaseMpscLinkedArrayQueuePad3(int initialCapacity) {
200
    super(initialCapacity);
1✔
201
  }
1✔
202
}
203

204
abstract class BaseMpscLinkedArrayQueueColdProducerFields<E>
205
    extends BaseMpscLinkedArrayQueuePad3<E> {
206
  protected @Nullable E[] producerBuffer;
207
  protected volatile long producerLimit;
208
  protected long producerMask;
209

210
  BaseMpscLinkedArrayQueueColdProducerFields(int initialCapacity) {
211
    super(initialCapacity);
1✔
212
    producerMask = consumerMask;
1✔
213
    producerBuffer = consumerBuffer;
1✔
214
  }
1✔
215
}
216

217
abstract class BaseMpscLinkedArrayQueue<E> extends BaseMpscLinkedArrayQueueColdProducerFields<E> {
218
  static final VarHandle P_INDEX = findVarHandle(
1✔
219
      BaseMpscLinkedArrayQueueProducerFields.class, "producerIndex", long.class);
220
  static final VarHandle C_INDEX = findVarHandle(
1✔
221
      BaseMpscLinkedArrayQueueConsumerFields.class, "consumerIndex", long.class);
222
  static final VarHandle P_LIMIT = findVarHandle(
1✔
223
      BaseMpscLinkedArrayQueueColdProducerFields.class, "producerLimit", long.class);
224
  static final VarHandle REF_ARRAY = MethodHandles.arrayElementVarHandle(Object[].class);
1✔
225

226
  // No post padding here, subclasses must add
227

228
  private static final Object JUMP = new Object();
1✔
229

230
  /**
231
   * @param initialCapacity the queue initial capacity. If chunk size is fixed this will be the
232
   *        chunk size. Must be 2 or more.
233
   */
234
  BaseMpscLinkedArrayQueue(int initialCapacity) {
235
    super(initialCapacity);
1✔
236
    soProducerLimit(this, producerMask); // we know it's all empty to start with
1✔
237
  }
1✔
238

239
  @Override
240
  public final Iterator<E> iterator() {
241
    throw new UnsupportedOperationException();
1✔
242
  }
243

244
  @Override
245
  public String toString() {
246
    return getClass().getName() + "@" + Integer.toHexString(hashCode());
×
247
  }
248

249
  @Override
250
  @SuppressWarnings({"MissingDefault", "PMD.NonExhaustiveSwitch"})
251
  public boolean offer(E e) {
252
    requireNonNull(e);
1✔
253

254
    @Var long mask;
255
    @Var long pIndex;
256
    @Var @Nullable E[] buffer;
257

258
    while (true) {
259
      long producerLimit = lvProducerLimit();
1✔
260
      pIndex = lvProducerIndex(this);
1✔
261
      // lower bit is indicative of resize, if we see it we spin until it's cleared
262
      if ((pIndex & 1) == 1) {
1✔
263
        continue;
1✔
264
      }
265
      // pIndex is even (lower bit is 0) -> actual index is (pIndex >> 1)
266

267
      // mask/buffer may get changed by resizing -> only use for array access after successful CAS.
268
      mask = this.producerMask;
1✔
269
      buffer = this.producerBuffer;
1✔
270
      // a successful CAS ties the ordering, lv(pIndex)-[mask/buffer]->cas(pIndex)
271

272
      // assumption behind this optimization is that queue is almost always empty or near empty
273
      if (producerLimit <= pIndex) {
1✔
274
        int result = offerSlowPath(mask, pIndex, producerLimit);
1✔
275
        switch (result) {
1✔
276
          case 0:
277
            break;
1✔
278
          case 1:
279
            continue;
1✔
280
          case 2:
281
            return false;
1✔
282
          case 3:
283
            resize(mask, buffer, pIndex, e);
1✔
284
            return true;
1✔
285
        }
286
      }
287

288
      if (casProducerIndex(this, pIndex, pIndex + 2)) {
1✔
289
        break;
1✔
290
      }
291
    }
1✔
292
    // INDEX visible before ELEMENT, consistent with consumer expectation
293
    long offset = modifiedCalcElementOffset(pIndex, mask);
1✔
294
    soElement(buffer, offset, e);
1✔
295
    return true;
1✔
296
  }
297

298
  /**
299
   * We do not inline resize into this method because we do not resize on fill.
300
   */
301
  int offerSlowPath(long mask, long pIndex, long producerLimit) {
302
    @Var int result;
303
    long cIndex = lvConsumerIndex(this);
1✔
304
    long bufferCapacity = getCurrentBufferCapacity(mask);
1✔
305
    result = 0;// 0 - goto pIndex CAS
1✔
306
    if (cIndex + bufferCapacity > pIndex) {
1✔
307
      if (!casProducerLimit(this, producerLimit, cIndex + bufferCapacity)) {
1✔
308
        result = 1;// retry from top
1✔
309
      }
310
    }
311
    // full and cannot grow
312
    else if (availableInQueue(pIndex, cIndex) <= 0) {
1✔
313
      result = 2;// -> return false;
1✔
314
    }
315
    // grab index for resize -> set lower bit
316
    else if (casProducerIndex(this, pIndex, pIndex + 1)) {
1✔
317
      result = 3;// -> resize
1✔
318
    } else {
319
      result = 1;// failed resize attempt, retry from top
1✔
320
    }
321
    return result;
1✔
322
  }
323

324
  /**
325
   * @return available elements in queue * 2
326
   */
327
  protected abstract long availableInQueue(long pIndex, long cIndex);
328

329
  /**
330
   * {@inheritDoc}
331
   * <p>
332
   * This implementation is correct for single consumer thread use only.
333
   */
334
  @Override
335
  @SuppressWarnings({"CastCanBeRemovedNarrowingVariableType", "PMD.ConfusingTernary", "unchecked"})
336
  public @Nullable E poll() {
337
    @Nullable E[] buffer = consumerBuffer;
1✔
338
    long index = consumerIndex;
1✔
339
    long mask = consumerMask;
1✔
340

341
    long offset = modifiedCalcElementOffset(index, mask);
1✔
342
    @Var @Nullable Object e = lvElement(buffer, offset);// LoadLoad
1✔
343
    if (e == null) {
1✔
344
      if (index != lvProducerIndex(this)) {
1✔
345
        // poll() == null iff queue is empty, null element is not strong enough indicator, so we
346
        // must check the producer index. If the queue is indeed not empty we spin until element is
347
        // visible.
348
        do {
349
          e = lvElement(buffer, offset);
1✔
350
        } while (e == null);
1✔
351
      } else {
352
        return null;
1✔
353
      }
354
    }
355
    if (e == JUMP) {
1✔
356
      @Nullable E[] nextBuffer = getNextBuffer(buffer, mask);
1✔
357
      return newBufferPoll(nextBuffer, index);
1✔
358
    }
359
    soElement(buffer, offset, null);
1✔
360
    soConsumerIndex(this, index + 2);
1✔
361
    return (E) e;
1✔
362
  }
363

364
  /**
365
   * {@inheritDoc}
366
   * <p>
367
   * This implementation is correct for single consumer thread use only.
368
   */
369
  @Override
370
  @SuppressWarnings({"CastCanBeRemovedNarrowingVariableType",
371
      "PMD.EmptyControlStatement", "StatementWithEmptyBody", "unchecked"})
372
  public @Nullable E peek() {
373
    @Nullable E[] buffer = consumerBuffer;
1✔
374
    long index = consumerIndex;
1✔
375
    long mask = consumerMask;
1✔
376

377
    long offset = modifiedCalcElementOffset(index, mask);
1✔
378
    @Var @Nullable Object e = lvElement(buffer, offset);// LoadLoad
1✔
379
    if (e == null && index != lvProducerIndex(this)) {
1✔
380
      // peek() == null iff queue is empty, null element is not strong enough indicator, so we must
381
      // check the producer index. If the queue is indeed not empty we spin until element is
382
      // visible.
383
      while ((e = lvElement(buffer, offset)) == null) {
1✔
384
        // retry
385
      }
386
    }
387
    if (e == JUMP) {
1✔
388
      return newBufferPeek(getNextBuffer(buffer, mask), index);
1✔
389
    }
390
    return (E) e;
1✔
391
  }
392

393
  @SuppressWarnings("unchecked")
394
  private @Nullable E[] getNextBuffer(@Nullable E[] buffer, long mask) {
395
    long nextArrayOffset = nextArrayOffset(mask);
1✔
396
    @SuppressWarnings("Varifier")
397
    @Nullable E[] nextBuffer = (@Nullable E[]) lvElement(buffer, nextArrayOffset);
1✔
398
    soElement(buffer, nextArrayOffset, null);
1✔
399
    return requireNonNull(nextBuffer);
1✔
400
  }
401

402
  private static long nextArrayOffset(long mask) {
403
    return modifiedCalcElementOffset(mask + 2, Long.MAX_VALUE);
1✔
404
  }
405

406
  private E newBufferPoll(@Nullable E[] nextBuffer, long index) {
407
    long offsetInNew = newBufferAndOffset(nextBuffer, index);
1✔
408
    @Nullable E n = lvElement(nextBuffer, offsetInNew);// LoadLoad
1✔
409
    requireNonNull(n, "new buffer must have at least one element");
1✔
410
    soElement(nextBuffer, offsetInNew, null);// StoreStore
1✔
411
    soConsumerIndex(this, index + 2);
1✔
412
    return n;
1✔
413
  }
414

415
  private E newBufferPeek(@Nullable E[] nextBuffer, long index) {
416
    long offsetInNew = newBufferAndOffset(nextBuffer, index);
1✔
417
    @Nullable E n = lvElement(nextBuffer, offsetInNew);// LoadLoad
1✔
418
    requireNonNull(n, "new buffer must have at least one element");
1✔
419
    return n;
1✔
420
  }
421

422
  private long newBufferAndOffset(@Nullable E[] nextBuffer, long index) {
423
    consumerBuffer = nextBuffer;
1✔
424
    consumerMask = (nextBuffer.length - 2L) << 1;
1✔
425
    return modifiedCalcElementOffset(index, consumerMask);
1✔
426
  }
427

428
  @Override
429
  public final int size() {
430
    // NOTE: because indices are on even numbers we cannot use the size util.
431

432
    /*
433
     * It is possible for a thread to be interrupted or reschedule between the read of the producer
434
     * and consumer indices, therefore protection is required to ensure size is within valid range.
435
     * In the event of concurrent polls/offers to this method the size is OVER estimated as we read
436
     * consumer index BEFORE the producer index.
437
     */
438
    @Var long after = lvConsumerIndex(this);
1✔
439
    long size;
440
    while (true) {
441
      long before = after;
1✔
442
      long currentProducerIndex = lvProducerIndex(this);
1✔
443
      after = lvConsumerIndex(this);
1✔
444
      if (before == after) {
1✔
445
        size = ((currentProducerIndex - after) >> 1);
1✔
446
        break;
1✔
447
      }
448
    }
1✔
449
    // Long overflow is impossible, so size is always positive. Integer overflow is possible for the
450
    // unbounded indexed queues.
451
    return (int) Math.min(size, Integer.MAX_VALUE);
1✔
452
  }
453

454
  @Override
455
  public final boolean isEmpty() {
456
    // Order matters!
457
    // Loading consumer before producer allows for producer increments after consumer index is read.
458
    // This ensures this method is conservative in its estimate. Note that as this is an MPMC there
459
    // is nothing we can do to make this an exact method.
460
    return (lvConsumerIndex(this) == lvProducerIndex(this));
1✔
461
  }
462

463
  private long lvProducerLimit() {
464
    return producerLimit;
1✔
465
  }
466

467
  public long currentProducerIndex() {
468
    return lvProducerIndex(this) / 2;
1✔
469
  }
470

471
  public long currentConsumerIndex() {
472
    return lvConsumerIndex(this) / 2;
1✔
473
  }
474

475
  public abstract int capacity();
476

477
  public boolean relaxedOffer(E e) {
478
    return offer(e);
1✔
479
  }
480

481
  @SuppressWarnings({"CastCanBeRemovedNarrowingVariableType", "unchecked"})
482
  public @Nullable E relaxedPoll() {
483
    @Nullable E[] buffer = consumerBuffer;
1✔
484
    long index = consumerIndex;
1✔
485
    long mask = consumerMask;
1✔
486

487
    long offset = modifiedCalcElementOffset(index, mask);
1✔
488
    @Nullable Object e = lvElement(buffer, offset);// LoadLoad
1✔
489
    if (e == null) {
1✔
490
      return null;
1✔
491
    }
492
    if (e == JUMP) {
1✔
493
      @Nullable E[] nextBuffer = getNextBuffer(buffer, mask);
1✔
494
      return newBufferPoll(nextBuffer, index);
1✔
495
    }
496
    soElement(buffer, offset, null);
1✔
497
    soConsumerIndex(this, index + 2);
1✔
498
    return (E) e;
1✔
499
  }
500

501
  @SuppressWarnings("unchecked")
502
  public @Nullable E relaxedPeek() {
503
    @Nullable E[] buffer = consumerBuffer;
1✔
504
    long index = consumerIndex;
1✔
505
    long mask = consumerMask;
1✔
506

507
    long offset = modifiedCalcElementOffset(index, mask);
1✔
508
    @Nullable Object e = lvElement(buffer, offset);// LoadLoad
1✔
509
    if (e == JUMP) {
1✔
510
      return newBufferPeek(getNextBuffer(buffer, mask), index);
1✔
511
    }
512
    return (E) e;
1✔
513
  }
514

515
  private void resize(long oldMask, @Nullable E[] oldBuffer, long pIndex, E e) {
516
    int newBufferLength = getNextBufferSize(oldBuffer);
1✔
517
    @Nullable E[] newBuffer = allocate(newBufferLength);
1✔
518

519
    producerBuffer = newBuffer;
1✔
520
    int newMask = (newBufferLength - 2) << 1;
1✔
521
    producerMask = newMask;
1✔
522

523
    long offsetInOld = modifiedCalcElementOffset(pIndex, oldMask);
1✔
524
    long offsetInNew = modifiedCalcElementOffset(pIndex, newMask);
1✔
525

526
    soElement(newBuffer, offsetInNew, e);// element in new array
1✔
527
    soElement(oldBuffer, nextArrayOffset(oldMask), newBuffer);// buffer linked
1✔
528

529
    // ASSERT code
530
    long cIndex = lvConsumerIndex(this);
1✔
531
    long availableInQueue = availableInQueue(pIndex, cIndex);
1✔
532
    requireState(availableInQueue > 0);
1✔
533

534
    // Invalidate racing CASs
535
    // We never set the limit beyond the bounds of a buffer
536
    soProducerLimit(this, pIndex + Math.min(newMask, availableInQueue));
1✔
537

538
    // make resize visible to the other producers
539
    soProducerIndex(this, pIndex + 2);
1✔
540

541
    // INDEX visible before ELEMENT, consistent with consumer expectation
542

543
    // make resize visible to consumer
544
    soElement(oldBuffer, offsetInOld, JUMP);
1✔
545
  }
1✔
546

547
  /**
548
   * @return next buffer size(inclusive of next array pointer)
549
   */
550
  protected abstract int getNextBufferSize(@Nullable E[] buffer);
551

552
  /**
553
   * @return current buffer capacity for elements (excluding next pointer and jump entry) * 2
554
   */
555
  protected abstract long getCurrentBufferCapacity(long mask);
556

557
  @SuppressWarnings("PMD.LooseCoupling")
558
  static long lvProducerIndex(BaseMpscLinkedArrayQueue<?> self) {
559
    return (long) P_INDEX.getVolatile(self);
1✔
560
  }
561
  @SuppressWarnings("PMD.LooseCoupling")
562
  static long lvConsumerIndex(BaseMpscLinkedArrayQueue<?> self) {
563
    return (long) C_INDEX.getVolatile(self);
1✔
564
  }
565
  @SuppressWarnings("PMD.LooseCoupling")
566
  static void soProducerIndex(BaseMpscLinkedArrayQueue<?> self, long v) {
567
    P_INDEX.setRelease(self, v);
1✔
568
  }
1✔
569
  @SuppressWarnings("PMD.LooseCoupling")
570
  static boolean casProducerIndex(BaseMpscLinkedArrayQueue<?> self, long expect, long newValue) {
571
    return P_INDEX.compareAndSet(self, expect, newValue);
1✔
572
  }
573
  @SuppressWarnings("PMD.LooseCoupling")
574
  static void soConsumerIndex(BaseMpscLinkedArrayQueue<?> self, long v) {
575
    C_INDEX.setRelease(self, v);
1✔
576
  }
1✔
577
  @SuppressWarnings("PMD.LooseCoupling")
578
  static boolean casProducerLimit(BaseMpscLinkedArrayQueue<?> self, long expect, long newValue) {
579
    return P_LIMIT.compareAndSet(self, expect, newValue);
1✔
580
  }
581
  @SuppressWarnings("PMD.LooseCoupling")
582
  static void soProducerLimit(BaseMpscLinkedArrayQueue<?> self, long v) {
583
    P_LIMIT.setRelease(self, v);
1✔
584
  }
1✔
585

586
  /*
587
   * A concurrent access enabling class used by circular array based queues this class exposes an
588
   * offset computation method along with differently memory fenced load/store methods into the
589
   * underlying array. The class is pre-padded and the array is padded on either side to help with
590
   * False sharing prevention. It is expected that subclasses handle post padding.
591
   * <p>
592
   * Offset calculation is separate from access to enable the reuse of a give compute offset.
593
   * <p>
594
   * Load/Store methods using a <i>buffer</i> parameter are provided to allow the prevention of
595
   * final field reload after a LoadLoad barrier.
596
   * <p>
597
   */
598

599
  /**
600
   * An ordered store(store + StoreStore barrier) of an element to a given offset
601
   *
602
   * @param buffer this.buffer
603
   * @param offset computed
604
   * @param e an orderly kitty
605
   */
606
  static <E> void soElement(@Nullable E[] buffer, long offset, @Nullable E e) {
607
    REF_ARRAY.setRelease(buffer, (int) offset, e);
1✔
608
  }
1✔
609

610
  /**
611
   * A volatile load (load + LoadLoad barrier) of an element from a given offset.
612
   *
613
   * @param buffer this.buffer
614
   * @param offset computed
615
   * @return the element at the offset
616
   */
617
  @SuppressWarnings("unchecked")
618
  static <E> @Nullable E lvElement(@Nullable E @Nullable[] buffer, long offset) {
619
    return (E) REF_ARRAY.getVolatile(buffer, (int) offset);
1✔
620
  }
621

622
  /**
623
   * This method assumes index is actually (index << 1) because lower bit is used for resize. This
624
   * is compensated for by reducing the element shift. The computation is constant folded, so
625
   * there's no cost.
626
   */
627
  static long modifiedCalcElementOffset(long index, long mask) {
628
    return (index & mask) >> 1;
1✔
629
  }
630

631
  static VarHandle findVarHandle(Class<?> recv, String name, Class<?> type) {
632
    try {
633
      Lookup lookup = MethodHandles.privateLookupIn(recv, MethodHandles.lookup());
1✔
634
      return lookup.findVarHandle(recv, name, type);
1✔
635
    } catch (ReflectiveOperationException e) {
1✔
636
      throw new ExceptionInInitializerError(e);
1✔
637
    }
638
  }
639
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc