• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ben-manes / caffeine / #3896

pending completion
#3896

push

github-actions

ben-manes
upgrade jamm library (memory meter)

7542 of 7616 relevant lines covered (99.03%)

0.99 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.04
/caffeine/src/main/java/com/github/benmanes/caffeine/cache/MpscGrowableArrayQueue.java
1
/*
2
 * Licensed under the Apache License, Version 2.0 (the "License");
3
 * you may not use this file except in compliance with the License.
4
 * You may obtain a copy of the License at
5
 *
6
 *     http://www.apache.org/licenses/LICENSE-2.0
7
 *
8
 * Unless required by applicable law or agreed to in writing, software
9
 * distributed under the License is distributed on an "AS IS" BASIS,
10
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
 * See the License for the specific language governing permissions and
12
 * limitations under the License.
13
 */
14
package com.github.benmanes.caffeine.cache;
15

16
import static com.github.benmanes.caffeine.cache.Caffeine.ceilingPowerOfTwo;
17

18
import java.lang.invoke.MethodHandles;
19
import java.lang.invoke.MethodHandles.Lookup;
20
import java.lang.invoke.VarHandle;
21
import java.util.AbstractQueue;
22
import java.util.Iterator;
23

24
/**
25
 * An MPSC array queue which starts at <i>initialCapacity</i> and grows to <i>maxCapacity</i> in
26
 * linked chunks of the initial size. The queue grows only when the current buffer is full and
27
 * elements are not copied on resize, instead a link to the new buffer is stored in the old buffer
28
 * for the consumer to follow.<br>
29
 * <p>
30
 * This is a shaded copy of <tt>MpscGrowableArrayQueue</tt> provided by
31
 * <a href="https://github.com/JCTools/JCTools">JCTools</a> from version 2.0.
32
 *
33
 * @author nitsanw@yahoo.com (Nitsan Wakart)
34
 */
35
class MpscGrowableArrayQueue<E> extends MpscChunkedArrayQueue<E> {
36

37
  /**
38
   * @param initialCapacity the queue initial capacity. If chunk size is fixed this will be the
39
   *        chunk size. Must be 2 or more.
40
   * @param maxCapacity the maximum capacity will be rounded up to the closest power of 2 and will
41
   *        be the upper limit of number of elements in this queue. Must be 4 or more and round up
42
   *        to a larger power of 2 than initialCapacity.
43
   */
44
  MpscGrowableArrayQueue(int initialCapacity, int maxCapacity) {
45
    super(initialCapacity, maxCapacity);
1✔
46
  }
1✔
47

48
  @Override
49
  protected int getNextBufferSize(E[] buffer) {
50
    long maxSize = maxQueueCapacity / 2;
1✔
51
    if (buffer.length > maxSize) {
1✔
52
      throw new IllegalStateException();
1✔
53
    }
54
    final int newSize = 2 * (buffer.length - 1);
1✔
55
    return newSize + 1;
1✔
56
  }
57

58
  @Override
59
  protected long getCurrentBufferCapacity(long mask) {
60
    return (mask + 2 == maxQueueCapacity) ? maxQueueCapacity : mask;
1✔
61
  }
62
}
63

64
@SuppressWarnings("OvershadowingSubclassFields")
65
abstract class MpscChunkedArrayQueue<E> extends MpscChunkedArrayQueueColdProducerFields<E> {
66
  byte p000, p001, p002, p003, p004, p005, p006, p007;
67
  byte p008, p009, p010, p011, p012, p013, p014, p015;
68
  byte p016, p017, p018, p019, p020, p021, p022, p023;
69
  byte p024, p025, p026, p027, p028, p029, p030, p031;
70
  byte p032, p033, p034, p035, p036, p037, p038, p039;
71
  byte p040, p041, p042, p043, p044, p045, p046, p047;
72
  byte p048, p049, p050, p051, p052, p053, p054, p055;
73
  byte p056, p057, p058, p059, p060, p061, p062, p063;
74
  byte p064, p065, p066, p067, p068, p069, p070, p071;
75
  byte p072, p073, p074, p075, p076, p077, p078, p079;
76
  byte p080, p081, p082, p083, p084, p085, p086, p087;
77
  byte p088, p089, p090, p091, p092, p093, p094, p095;
78
  byte p096, p097, p098, p099, p100, p101, p102, p103;
79
  byte p104, p105, p106, p107, p108, p109, p110, p111;
80
  byte p112, p113, p114, p115, p116, p117, p118, p119;
81

82
  MpscChunkedArrayQueue(int initialCapacity, int maxCapacity) {
83
    super(initialCapacity, maxCapacity);
1✔
84
  }
1✔
85

86
  @Override
87
  protected long availableInQueue(long pIndex, long cIndex) {
88
    return maxQueueCapacity - (pIndex - cIndex);
1✔
89
  }
90

91
  @Override
92
  public int capacity() {
93
    return (int) (maxQueueCapacity / 2);
1✔
94
  }
95
}
96

97
abstract class MpscChunkedArrayQueueColdProducerFields<E> extends BaseMpscLinkedArrayQueue<E> {
98
  protected final long maxQueueCapacity;
99

100
  MpscChunkedArrayQueueColdProducerFields(int initialCapacity, int maxCapacity) {
101
    super(initialCapacity);
1✔
102
    if (maxCapacity < 4) {
1✔
103
      throw new IllegalArgumentException("Max capacity must be 4 or more");
1✔
104
    }
105
    if (ceilingPowerOfTwo(initialCapacity) >= ceilingPowerOfTwo(maxCapacity)) {
1✔
106
      throw new IllegalArgumentException(
1✔
107
          "Initial capacity cannot exceed maximum capacity(both rounded up to a power of 2)");
108
    }
109
    maxQueueCapacity = ((long) ceilingPowerOfTwo(maxCapacity)) << 1;
1✔
110
  }
1✔
111
}
112

113
abstract class BaseMpscLinkedArrayQueuePad1<E> extends AbstractQueue<E> {
1✔
114
  byte p000, p001, p002, p003, p004, p005, p006, p007;
115
  byte p008, p009, p010, p011, p012, p013, p014, p015;
116
  byte p016, p017, p018, p019, p020, p021, p022, p023;
117
  byte p024, p025, p026, p027, p028, p029, p030, p031;
118
  byte p032, p033, p034, p035, p036, p037, p038, p039;
119
  byte p040, p041, p042, p043, p044, p045, p046, p047;
120
  byte p048, p049, p050, p051, p052, p053, p054, p055;
121
  byte p056, p057, p058, p059, p060, p061, p062, p063;
122
  byte p064, p065, p066, p067, p068, p069, p070, p071;
123
  byte p072, p073, p074, p075, p076, p077, p078, p079;
124
  byte p080, p081, p082, p083, p084, p085, p086, p087;
125
  byte p088, p089, p090, p091, p092, p093, p094, p095;
126
  byte p096, p097, p098, p099, p100, p101, p102, p103;
127
  byte p104, p105, p106, p107, p108, p109, p110, p111;
128
  byte p112, p113, p114, p115, p116, p117, p118, p119;
129
}
130

131
abstract class BaseMpscLinkedArrayQueueProducerFields<E> extends BaseMpscLinkedArrayQueuePad1<E> {
1✔
132
  protected long producerIndex;
133
}
134

135
@SuppressWarnings("OvershadowingSubclassFields")
136
abstract class BaseMpscLinkedArrayQueuePad2<E> extends BaseMpscLinkedArrayQueueProducerFields<E> {
1✔
137
  byte p000, p001, p002, p003, p004, p005, p006, p007;
138
  byte p008, p009, p010, p011, p012, p013, p014, p015;
139
  byte p016, p017, p018, p019, p020, p021, p022, p023;
140
  byte p024, p025, p026, p027, p028, p029, p030, p031;
141
  byte p032, p033, p034, p035, p036, p037, p038, p039;
142
  byte p040, p041, p042, p043, p044, p045, p046, p047;
143
  byte p048, p049, p050, p051, p052, p053, p054, p055;
144
  byte p056, p057, p058, p059, p060, p061, p062, p063;
145
  byte p064, p065, p066, p067, p068, p069, p070, p071;
146
  byte p072, p073, p074, p075, p076, p077, p078, p079;
147
  byte p080, p081, p082, p083, p084, p085, p086, p087;
148
  byte p088, p089, p090, p091, p092, p093, p094, p095;
149
  byte p096, p097, p098, p099, p100, p101, p102, p103;
150
  byte p104, p105, p106, p107, p108, p109, p110, p111;
151
  byte p112, p113, p114, p115, p116, p117, p118, p119;
152
}
153

154
@SuppressWarnings("NullAway")
155
abstract class BaseMpscLinkedArrayQueueConsumerFields<E> extends BaseMpscLinkedArrayQueuePad2<E> {
1✔
156
  protected long consumerMask;
157
  protected E[] consumerBuffer;
158
  protected long consumerIndex;
159
}
160

161
@SuppressWarnings("OvershadowingSubclassFields")
162
abstract class BaseMpscLinkedArrayQueuePad3<E> extends BaseMpscLinkedArrayQueueConsumerFields<E> {
1✔
163
  byte p000, p001, p002, p003, p004, p005, p006, p007;
164
  byte p008, p009, p010, p011, p012, p013, p014, p015;
165
  byte p016, p017, p018, p019, p020, p021, p022, p023;
166
  byte p024, p025, p026, p027, p028, p029, p030, p031;
167
  byte p032, p033, p034, p035, p036, p037, p038, p039;
168
  byte p040, p041, p042, p043, p044, p045, p046, p047;
169
  byte p048, p049, p050, p051, p052, p053, p054, p055;
170
  byte p056, p057, p058, p059, p060, p061, p062, p063;
171
  byte p064, p065, p066, p067, p068, p069, p070, p071;
172
  byte p072, p073, p074, p075, p076, p077, p078, p079;
173
  byte p080, p081, p082, p083, p084, p085, p086, p087;
174
  byte p088, p089, p090, p091, p092, p093, p094, p095;
175
  byte p096, p097, p098, p099, p100, p101, p102, p103;
176
  byte p104, p105, p106, p107, p108, p109, p110, p111;
177
  byte p112, p113, p114, p115, p116, p117, p118, p119;
178
}
179

180
@SuppressWarnings("NullAway")
181
abstract class BaseMpscLinkedArrayQueueColdProducerFields<E>
1✔
182
    extends BaseMpscLinkedArrayQueuePad3<E> {
183
  protected volatile long producerLimit;
184
  protected long producerMask;
185
  protected E[] producerBuffer;
186
}
187

188
@SuppressWarnings({"NullAway", "PMD"})
189
abstract class BaseMpscLinkedArrayQueue<E> extends BaseMpscLinkedArrayQueueColdProducerFields<E> {
190
  static final VarHandle REF_ARRAY;
191
  static final VarHandle P_INDEX;
192
  static final VarHandle C_INDEX;
193
  static final VarHandle P_LIMIT;
194

195
  // No post padding here, subclasses must add
196

197
  private static final Object JUMP = new Object();
1✔
198

199
  /**
200
   * @param initialCapacity the queue initial capacity. If chunk size is fixed this will be the
201
   *        chunk size. Must be 2 or more.
202
   */
203
  BaseMpscLinkedArrayQueue(final int initialCapacity) {
1✔
204
    if (initialCapacity < 2) {
1✔
205
      throw new IllegalArgumentException("Initial capacity must be 2 or more");
1✔
206
    }
207

208
    int p2capacity = ceilingPowerOfTwo(initialCapacity);
1✔
209
    // leave lower bit of mask clear
210
    long mask = (p2capacity - 1L) << 1;
1✔
211
    // need extra element to point at next array
212
    E[] buffer = allocate(p2capacity + 1);
1✔
213
    producerBuffer = buffer;
1✔
214
    producerMask = mask;
1✔
215
    consumerBuffer = buffer;
1✔
216
    consumerMask = mask;
1✔
217
    soProducerLimit(this, mask); // we know it's all empty to start with
1✔
218
  }
1✔
219

220
  @Override
221
  public final Iterator<E> iterator() {
222
    throw new UnsupportedOperationException();
1✔
223
  }
224

225
  @Override
226
  public String toString() {
227
    return getClass().getName() + "@" + Integer.toHexString(hashCode());
1✔
228
  }
229

230
  @Override
231
  @SuppressWarnings("MissingDefault")
232
  public boolean offer(final E e) {
233
    if (e == null) {
1✔
234
      throw new NullPointerException();
1✔
235
    }
236

237
    long mask;
238
    E[] buffer;
239
    long pIndex;
240

241
    while (true) {
242
      long producerLimit = lvProducerLimit();
1✔
243
      pIndex = lvProducerIndex(this);
1✔
244
      // lower bit is indicative of resize, if we see it we spin until it's cleared
245
      if ((pIndex & 1) == 1) {
1✔
246
        continue;
1✔
247
      }
248
      // pIndex is even (lower bit is 0) -> actual index is (pIndex >> 1)
249

250
      // mask/buffer may get changed by resizing -> only use for array access after successful CAS.
251
      mask = this.producerMask;
1✔
252
      buffer = this.producerBuffer;
1✔
253
      // a successful CAS ties the ordering, lv(pIndex)-[mask/buffer]->cas(pIndex)
254

255
      // assumption behind this optimization is that queue is almost always empty or near empty
256
      if (producerLimit <= pIndex) {
1✔
257
        int result = offerSlowPath(mask, pIndex, producerLimit);
1✔
258
        switch (result) {
1✔
259
          case 0:
260
            break;
1✔
261
          case 1:
262
            continue;
1✔
263
          case 2:
264
            return false;
1✔
265
          case 3:
266
            resize(mask, buffer, pIndex, e);
1✔
267
            return true;
1✔
268
        }
269
      }
270

271
      if (casProducerIndex(this, pIndex, pIndex + 2)) {
1✔
272
        break;
1✔
273
      }
274
    }
1✔
275
    // INDEX visible before ELEMENT, consistent with consumer expectation
276
    final long offset = modifiedCalcElementOffset(pIndex, mask);
1✔
277
    soElement(buffer, offset, e);
1✔
278
    return true;
1✔
279
  }
280

281
  /**
282
   * We do not inline resize into this method because we do not resize on fill.
283
   */
284
  private int offerSlowPath(long mask, long pIndex, long producerLimit) {
285
    int result;
286
    final long cIndex = lvConsumerIndex(this);
1✔
287
    long bufferCapacity = getCurrentBufferCapacity(mask);
1✔
288
    result = 0;// 0 - goto pIndex CAS
1✔
289
    if (cIndex + bufferCapacity > pIndex) {
1✔
290
      if (!casProducerLimit(this, producerLimit, cIndex + bufferCapacity)) {
1✔
291
        result = 1;// retry from top
1✔
292
      }
293
    }
294
    // full and cannot grow
295
    else if (availableInQueue(pIndex, cIndex) <= 0) {
1✔
296
      result = 2;// -> return false;
1✔
297
    }
298
    // grab index for resize -> set lower bit
299
    else if (casProducerIndex(this, pIndex, pIndex + 1)) {
1✔
300
      result = 3;// -> resize
1✔
301
    } else {
302
      result = 1;// failed resize attempt, retry from top
1✔
303
    }
304
    return result;
1✔
305
  }
306

307
  /**
308
   * @return available elements in queue * 2
309
   */
310
  protected abstract long availableInQueue(long pIndex, final long cIndex);
311

312
  /**
313
   * {@inheritDoc}
314
   * <p>
315
   * This implementation is correct for single consumer thread use only.
316
   */
317
  @Override
318
  @SuppressWarnings("unchecked")
319
  public E poll() {
320
    final E[] buffer = consumerBuffer;
1✔
321
    final long index = consumerIndex;
1✔
322
    final long mask = consumerMask;
1✔
323

324
    final long offset = modifiedCalcElementOffset(index, mask);
1✔
325
    Object e = lvElement(buffer, offset);// LoadLoad
1✔
326
    if (e == null) {
1✔
327
      if (index != lvProducerIndex(this)) {
1✔
328
        // poll() == null iff queue is empty, null element is not strong enough indicator, so we
329
        // must check the producer index. If the queue is indeed not empty we spin until element is
330
        // visible.
331
        do {
332
          e = lvElement(buffer, offset);
1✔
333
        } while (e == null);
1✔
334
      } else {
335
        return null;
1✔
336
      }
337
    }
338
    if (e == JUMP) {
1✔
339
      final E[] nextBuffer = getNextBuffer(buffer, mask);
1✔
340
      return newBufferPoll(nextBuffer, index);
1✔
341
    }
342
    soElement(buffer, offset, null);
1✔
343
    soConsumerIndex(this, index + 2);
1✔
344
    return (E) e;
1✔
345
  }
346

347
  /**
348
   * {@inheritDoc}
349
   * <p>
350
   * This implementation is correct for single consumer thread use only.
351
   */
352
  @SuppressWarnings("unchecked")
353
  @Override
354
  public E peek() {
355
    final E[] buffer = consumerBuffer;
1✔
356
    final long index = consumerIndex;
1✔
357
    final long mask = consumerMask;
1✔
358

359
    final long offset = modifiedCalcElementOffset(index, mask);
1✔
360
    Object e = lvElement(buffer, offset);// LoadLoad
1✔
361
    if (e == null && index != lvProducerIndex(this)) {
1✔
362
      // peek() == null iff queue is empty, null element is not strong enough indicator, so we must
363
      // check the producer index. If the queue is indeed not empty we spin until element is
364
      // visible.
365
      while ((e = lvElement(buffer, offset)) == null) {
1✔
366
        ;
367
      }
368
    }
369
    if (e == JUMP) {
1✔
370
      return newBufferPeek(getNextBuffer(buffer, mask), index);
1✔
371
    }
372
    return (E) e;
1✔
373
  }
374

375
  @SuppressWarnings("unchecked")
376
  private E[] getNextBuffer(final E[] buffer, final long mask) {
377
    final long nextArrayOffset = nextArrayOffset(mask);
1✔
378
    final E[] nextBuffer = (E[]) lvElement(buffer, nextArrayOffset);
1✔
379
    soElement(buffer, nextArrayOffset, null);
1✔
380
    return nextBuffer;
1✔
381
  }
382

383
  private long nextArrayOffset(final long mask) {
384
    return modifiedCalcElementOffset(mask + 2, Long.MAX_VALUE);
1✔
385
  }
386

387
  private E newBufferPoll(E[] nextBuffer, final long index) {
388
    final long offsetInNew = newBufferAndOffset(nextBuffer, index);
1✔
389
    final E n = lvElement(nextBuffer, offsetInNew);// LoadLoad
1✔
390
    if (n == null) {
1✔
391
      throw new IllegalStateException("new buffer must have at least one element");
×
392
    }
393
    soElement(nextBuffer, offsetInNew, null);// StoreStore
1✔
394
    soConsumerIndex(this, index + 2);
1✔
395
    return n;
1✔
396
  }
397

398
  private E newBufferPeek(E[] nextBuffer, final long index) {
399
    final long offsetInNew = newBufferAndOffset(nextBuffer, index);
1✔
400
    final E n = lvElement(nextBuffer, offsetInNew);// LoadLoad
1✔
401
    if (n == null) {
1✔
402
      throw new IllegalStateException("new buffer must have at least one element");
×
403
    }
404
    return n;
1✔
405
  }
406

407
  private long newBufferAndOffset(E[] nextBuffer, final long index) {
408
    consumerBuffer = nextBuffer;
1✔
409
    consumerMask = (nextBuffer.length - 2L) << 1;
1✔
410
    return modifiedCalcElementOffset(index, consumerMask);
1✔
411
  }
412

413
  @Override
414
  public final int size() {
415
    // NOTE: because indices are on even numbers we cannot use the size util.
416

417
    /*
418
     * It is possible for a thread to be interrupted or reschedule between the read of the producer
419
     * and consumer indices, therefore protection is required to ensure size is within valid range.
420
     * In the event of concurrent polls/offers to this method the size is OVER estimated as we read
421
     * consumer index BEFORE the producer index.
422
     */
423
    long after = lvConsumerIndex(this);
1✔
424
    long size;
425
    while (true) {
426
      final long before = after;
1✔
427
      final long currentProducerIndex = lvProducerIndex(this);
1✔
428
      after = lvConsumerIndex(this);
1✔
429
      if (before == after) {
1✔
430
        size = ((currentProducerIndex - after) >> 1);
1✔
431
        break;
1✔
432
      }
433
    }
1✔
434
    // Long overflow is impossible, so size is always positive. Integer overflow is possible for the
435
    // unbounded indexed queues.
436
    if (size > Integer.MAX_VALUE) {
1✔
437
      return Integer.MAX_VALUE;
×
438
    } else {
439
      return (int) size;
1✔
440
    }
441
  }
442

443
  @Override
444
  public final boolean isEmpty() {
445
    // Order matters!
446
    // Loading consumer before producer allows for producer increments after consumer index is read.
447
    // This ensures this method is conservative in its estimate. Note that as this is an MPMC there
448
    // is nothing we can do to make this an exact method.
449
    return (lvConsumerIndex(this) == lvProducerIndex(this));
1✔
450
  }
451

452
  private long lvProducerLimit() {
453
    return producerLimit;
1✔
454
  }
455

456
  public long currentProducerIndex() {
457
    return lvProducerIndex(this) / 2;
1✔
458
  }
459

460
  public long currentConsumerIndex() {
461
    return lvConsumerIndex(this) / 2;
1✔
462
  }
463

464
  public abstract int capacity();
465

466
  public boolean relaxedOffer(E e) {
467
    return offer(e);
1✔
468
  }
469

470
  @SuppressWarnings("unchecked")
471
  public E relaxedPoll() {
472
    final E[] buffer = consumerBuffer;
1✔
473
    final long index = consumerIndex;
1✔
474
    final long mask = consumerMask;
1✔
475

476
    final long offset = modifiedCalcElementOffset(index, mask);
1✔
477
    Object e = lvElement(buffer, offset);// LoadLoad
1✔
478
    if (e == null) {
1✔
479
      return null;
1✔
480
    }
481
    if (e == JUMP) {
1✔
482
      final E[] nextBuffer = getNextBuffer(buffer, mask);
1✔
483
      return newBufferPoll(nextBuffer, index);
1✔
484
    }
485
    soElement(buffer, offset, null);
1✔
486
    soConsumerIndex(this, index + 2);
1✔
487
    return (E) e;
1✔
488
  }
489

490
  @SuppressWarnings("unchecked")
491
  public E relaxedPeek() {
492
    final E[] buffer = consumerBuffer;
1✔
493
    final long index = consumerIndex;
1✔
494
    final long mask = consumerMask;
1✔
495

496
    final long offset = modifiedCalcElementOffset(index, mask);
1✔
497
    Object e = lvElement(buffer, offset);// LoadLoad
1✔
498
    if (e == JUMP) {
1✔
499
      return newBufferPeek(getNextBuffer(buffer, mask), index);
1✔
500
    }
501
    return (E) e;
1✔
502
  }
503

504
  private void resize(long oldMask, E[] oldBuffer, long pIndex, final E e) {
505
    int newBufferLength = getNextBufferSize(oldBuffer);
1✔
506
    final E[] newBuffer = allocate(newBufferLength);
1✔
507

508
    producerBuffer = newBuffer;
1✔
509
    final int newMask = (newBufferLength - 2) << 1;
1✔
510
    producerMask = newMask;
1✔
511

512
    final long offsetInOld = modifiedCalcElementOffset(pIndex, oldMask);
1✔
513
    final long offsetInNew = modifiedCalcElementOffset(pIndex, newMask);
1✔
514

515
    soElement(newBuffer, offsetInNew, e);// element in new array
1✔
516
    soElement(oldBuffer, nextArrayOffset(oldMask), newBuffer);// buffer linked
1✔
517

518
    // ASSERT code
519
    final long cIndex = lvConsumerIndex(this);
1✔
520
    final long availableInQueue = availableInQueue(pIndex, cIndex);
1✔
521
    if (availableInQueue <= 0) {
1✔
522
      throw new IllegalStateException();
×
523
    }
524

525
    // Invalidate racing CASs
526
    // We never set the limit beyond the bounds of a buffer
527
    soProducerLimit(this, pIndex + Math.min(newMask, availableInQueue));
1✔
528

529
    // make resize visible to the other producers
530
    soProducerIndex(this, pIndex + 2);
1✔
531

532
    // INDEX visible before ELEMENT, consistent with consumer expectation
533

534
    // make resize visible to consumer
535
    soElement(oldBuffer, offsetInOld, JUMP);
1✔
536
  }
1✔
537

538
  @SuppressWarnings("unchecked")
539
  public static <E> E[] allocate(int capacity) {
540
    return (E[]) new Object[capacity];
1✔
541
  }
542

543
  /**
544
   * @return next buffer size(inclusive of next array pointer)
545
   */
546
  protected abstract int getNextBufferSize(E[] buffer);
547

548
  /**
549
   * @return current buffer capacity for elements (excluding next pointer and jump entry) * 2
550
   */
551
  protected abstract long getCurrentBufferCapacity(long mask);
552

553
  static long lvProducerIndex(BaseMpscLinkedArrayQueue<?> self) {
554
    return (long) P_INDEX.getVolatile(self);
1✔
555
  }
556
  static long lvConsumerIndex(BaseMpscLinkedArrayQueue<?> self) {
557
    return (long) C_INDEX.getVolatile(self);
1✔
558
  }
559
  static void soProducerIndex(BaseMpscLinkedArrayQueue<?> self, long v) {
560
    P_INDEX.setRelease(self, v);
1✔
561
  }
1✔
562
  static boolean casProducerIndex(BaseMpscLinkedArrayQueue<?> self, long expect, long newValue) {
563
    return P_INDEX.compareAndSet(self, expect, newValue);
1✔
564
  }
565
  static void soConsumerIndex(BaseMpscLinkedArrayQueue<?> self, long v) {
566
    C_INDEX.setRelease(self, v);
1✔
567
  }
1✔
568
  static boolean casProducerLimit(BaseMpscLinkedArrayQueue<?> self, long expect, long newValue) {
569
    return P_LIMIT.compareAndSet(self, expect, newValue);
1✔
570
  }
571
  static void soProducerLimit(BaseMpscLinkedArrayQueue<?> self, long v) {
572
    P_LIMIT.setRelease(self, v);
1✔
573
  }
1✔
574

575
  /**
576
   * A concurrent access enabling class used by circular array based queues this class exposes an
577
   * offset computation method along with differently memory fenced load/store methods into the
578
   * underlying array. The class is pre-padded and the array is padded on either side to help with
579
   * False sharing prevention. It is expected that subclasses handle post padding.
580
   * <p>
581
   * Offset calculation is separate from access to enable the reuse of a give compute offset.
582
   * <p>
583
   * Load/Store methods using a <i>buffer</i> parameter are provided to allow the prevention of
584
   * final field reload after a LoadLoad barrier.
585
   * <p>
586
   */
587

588
  /**
589
   * An ordered store(store + StoreStore barrier) of an element to a given offset
590
   *
591
   * @param buffer this.buffer
592
   * @param offset computed
593
   * @param e an orderly kitty
594
   */
595
  static <E> void soElement(E[] buffer, long offset, E e) {
596
    REF_ARRAY.setRelease(buffer, (int) offset, e);
1✔
597
  }
1✔
598

599
  /**
600
   * A volatile load (load + LoadLoad barrier) of an element from a given offset.
601
   *
602
   * @param buffer this.buffer
603
   * @param offset computed
604
   * @return the element at the offset
605
   */
606
  @SuppressWarnings("unchecked")
607
  static <E> E lvElement(E[] buffer, long offset) {
608
    return (E) REF_ARRAY.getVolatile(buffer, (int) offset);
1✔
609
  }
610

611
  /**
612
   * This method assumes index is actually (index << 1) because lower bit is used for resize. This
613
   * is compensated for by reducing the element shift. The computation is constant folded, so
614
   * there's no cost.
615
   */
616
  static long modifiedCalcElementOffset(long index, long mask) {
617
    return (index & mask) >> 1;
1✔
618
  }
619

620
  static {
621
    try {
622
      Lookup pIndexLookup = MethodHandles.privateLookupIn(
1✔
623
          BaseMpscLinkedArrayQueueProducerFields.class, MethodHandles.lookup());
1✔
624
      Lookup cIndexLookup = MethodHandles.privateLookupIn(
1✔
625
          BaseMpscLinkedArrayQueueConsumerFields.class, MethodHandles.lookup());
1✔
626
      Lookup pLimitLookup = MethodHandles.privateLookupIn(
1✔
627
          BaseMpscLinkedArrayQueueColdProducerFields.class, MethodHandles.lookup());
1✔
628

629
      P_INDEX = pIndexLookup.findVarHandle(
1✔
630
          BaseMpscLinkedArrayQueueProducerFields.class, "producerIndex", long.class);
631
      C_INDEX = cIndexLookup.findVarHandle(
1✔
632
          BaseMpscLinkedArrayQueueConsumerFields.class, "consumerIndex", long.class);
633
      P_LIMIT = pLimitLookup.findVarHandle(
1✔
634
          BaseMpscLinkedArrayQueueColdProducerFields.class, "producerLimit", long.class);
635
      REF_ARRAY = MethodHandles.arrayElementVarHandle(Object[].class);
1✔
636
    } catch (ReflectiveOperationException e) {
×
637
      throw new ExceptionInInitializerError(e);
×
638
    }
1✔
639
  }
1✔
640
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc