• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

openmrs / openmrs-core / 21752216547

21 Aug 2025 11:52AM UTC coverage: 63.694% (-0.08%) from 63.778%
21752216547

push

github

rkorytkowski
TRUNK-6318: Back-port LocalStorageServcie and StreamDataService fixes

(cherry picked from commit 658a3df03)
(cherry picked from commit f7d12ffc9)
(cherry picked from commit 4fac7d80e)

48 of 68 new or added lines in 3 files covered. (70.59%)

282 existing lines in 9 files now uncovered.

21840 of 34289 relevant lines covered (63.69%)

0.64 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.42
/api/src/main/java/org/openmrs/util/ThreadSafeCircularFifoQueue.java
1
/**
2
 * This Source Code Form is subject to the terms of the Mozilla Public License,
3
 * v. 2.0. If a copy of the MPL was not distributed with this file, You can
4
 * obtain one at http://mozilla.org/MPL/2.0/. OpenMRS is also distributed under
5
 * the terms of the Healthcare Disclaimer located at http://openmrs.org/license.
6
 *
7
 * Copyright (C) OpenMRS Inc. OpenMRS is a registered trademark and the OpenMRS
8
 * graphic logo is a trademark of OpenMRS Inc.
9
 */
10
package org.openmrs.util;
11

12
import java.io.IOException;
13
import java.io.ObjectInputStream;
14
import java.io.Serializable;
15
import java.lang.ref.WeakReference;
16
import java.lang.reflect.Array;
17
import java.util.AbstractQueue;
18
import java.util.Collection;
19
import java.util.NoSuchElementException;
20
import java.util.Objects;
21
import java.util.Queue;
22
import java.util.concurrent.locks.ReentrantLock;
23
import java.util.function.Predicate;
24

25
/**
26
 * A thread-safe first-in, first-out queue with a fixed size that replaces the oldest element when full.
27
 * 
28
 * This class does not support null elements.
29
 *
30
 * @param <E> the type of elements in this collection
31
 * @since 2.4
32
 */
33
/*
34
 * There are already several existing implementations that are similar to this, so why create a new class?
35
 * Commons Collection's CircularFifoQueue and Guava's EvictingQueue are not thread-safe. The built-in ArrayBlockingQueue
36
 * is thread-safe, but implements a blocking queue. This involves safety guarantees that we don't need to make as this
37
 * circular queue is never "full"
38
 */
39
public class ThreadSafeCircularFifoQueue<E> extends AbstractQueue<E> implements Queue<E>, Serializable {
40

41
        private static final long serialVersionUID = -89162358098721L;
42

43
        // queue capacity
44
        private final int maxElements;
45

46
        // Underlying storage
47
        private final E[] elements;
48

49
        private transient ReentrantLock lock = new ReentrantLock();
1✔
50

51
        // index of the "start" of the queue, i.e., where data is read from
52
        private int read = 0;
1✔
53

54
        // index of the "end" of the queue, i.e., where data is written to
55
        private int write = 0;
1✔
56

57
        // number of elements in the queue
58
        private int size;
59

60
        // tracks the state of any iterators
61
        private transient Iterators iterators = null;
1✔
62

63
        @SuppressWarnings("unused")
64
        public ThreadSafeCircularFifoQueue() {
65
                this(32);
×
66
        }
×
67

68
        @SuppressWarnings("unchecked")
69
        public ThreadSafeCircularFifoQueue(int maxElements) {
1✔
70
                if (maxElements <= 0) {
1✔
71
                        throw new IllegalArgumentException("The size must be greater than 0");
×
72
                }
73

74
                // NB add one more element so that the queue size matches the expected size
75
                elements = (E[]) new Object[maxElements];
1✔
76
                this.maxElements = elements.length;
1✔
77
        }
1✔
78

79
        @SuppressWarnings("unused")
80
        public ThreadSafeCircularFifoQueue(Collection<E> collection) {
81
                this(collection.size());
×
82
                this.addAll(collection);
×
83
        }
×
84

85
        @Override
86
        public boolean add(E e) {
87
                Objects.requireNonNull(e);
1✔
88

89
                final ReentrantLock lock = this.lock;
1✔
90
                lock.lock();
1✔
91
                try {
92
                        internalAdd(e);
1✔
93
                }
94
                finally {
95
                        lock.unlock();
1✔
96
                }
97

98
                return true;
1✔
99
        }
100

101
        @Override
102
        public boolean addAll(Collection<? extends E> c) {
103
                Objects.requireNonNull(c);
1✔
104

105
                final ReentrantLock lock = this.lock;
1✔
106
                lock.lock();
1✔
107
                try {
108
                        for (E e : c) {
1✔
109
                                Objects.requireNonNull(e);
1✔
110
                                internalAdd(e);
1✔
111
                        }
1✔
112
                }
113
                finally {
114
                        lock.unlock();
1✔
115
                }
116

117
                return true;
1✔
118
        }
119

120
        @Override
121
        public void clear() {
122
                final E[] elements = this.elements;
1✔
123
                final ReentrantLock lock = this.lock;
1✔
124
                lock.lock();
1✔
125
                try {
126
                        if (size > 0) {
1✔
127
                                int idx = read;
1✔
128
                                do {
129
                                        elements[idx] = null;
1✔
130

131
                                        idx = increment(idx);
1✔
132
                                } while (idx != write);
1✔
133

134
                                read = write = size = 0;
1✔
135
                        }
136
                }
137
                finally {
138
                        lock.unlock();
1✔
139
                }
140
        }
1✔
141

142
        @Override
143
        public boolean contains(Object o) {
144
                if (null == o) {
1✔
145
                        return false;
×
146
                }
147

148
                final ReentrantLock lock = this.lock;
1✔
149
                lock.lock();
1✔
150
                try {
151
                        return internalContains(o);
1✔
152
                }
153
                finally {
154
                        lock.unlock();
1✔
155
                }
156
        }
157

158
        @Override
159
        public boolean containsAll(Collection<?> c) {
160
                if (c == null || c.isEmpty()) {
1✔
161
                        return true;
1✔
162
                }
163

164
                final ReentrantLock lock = this.lock;
1✔
165
                lock.lock();
1✔
166
                try {
167
                        for (Object o : c) {
1✔
168
                                if (!internalContains(o)) {
1✔
169
                                        return false;
1✔
170
                                }
171
                        }
1✔
172
                        
173
                        return true;
1✔
174
                }
175
                finally {
176
                        lock.unlock();
1✔
177
                }
178
        }
179

180
        @Override
181
        public E element() {
182
                final ReentrantLock lock = this.lock;
1✔
183
                lock.lock();
1✔
184
                try {
185
                        if (size == 0) {
1✔
186
                                throw new NoSuchElementException("queue is empty");
1✔
187
                        }
188

189
                        return elements[read];
1✔
190
                }
191
                finally {
192
                        lock.unlock();
1✔
193
                }
194

195
        }
196

197
        @Override
198
        public java.util.Iterator<E> iterator() {
199
                return new Iterator();
1✔
200
        }
201

202
        @Override
203
        public boolean isEmpty() {
204
                final ReentrantLock lock = this.lock;
1✔
205
                lock.lock();
1✔
206
                try {
207
                        return size == 0;
1✔
208
                }
209
                finally {
210
                        lock.unlock();
1✔
211
                }
212
        }
213

214
        @Override
215
        public boolean offer(E e) {
216
                return add(e);
1✔
217
        }
218

219
        @Override
220
        public E peek() {
221
                final ReentrantLock lock = this.lock;
1✔
222
                lock.lock();
1✔
223
                try {
224
                        return size == 0 ? null : elements[read];
1✔
225
                }
226
                finally {
227
                        lock.unlock();
1✔
228
                }
229
        }
230

231
        @Override
232
        public E poll() {
233
                final ReentrantLock lock = this.lock;
1✔
234
                lock.lock();
1✔
235
                try {
236
                        return size == 0 ? null : internalRemove();
1✔
237
                }
238
                finally {
239
                        lock.unlock();
1✔
240
                }
241
        }
242

243
        @Override
244
        public E remove() {
245
                final ReentrantLock lock = this.lock;
1✔
246
                lock.lock();
1✔
247
                try {
248
                        if (size == 0) {
1✔
249
                                throw new NoSuchElementException("queue is empty");
1✔
250
                        }
251

252
                        return internalRemove();
1✔
253
                }
254
                finally {
255
                        lock.unlock();
1✔
256
                }
257
        }
258

259
        @Override
260
        public boolean remove(Object o) {
261
                if (null == o) {
1✔
262
                        return false;
×
263
                }
264

265
                final ReentrantLock lock = this.lock;
1✔
266
                lock.lock();
1✔
267
                try {
268
                        if (size == 0) {
1✔
269
                                return false;
1✔
270
                        }
271

272
                        int idx = this.read;
1✔
273
                        do {
274
                                if (o.equals(elements[idx])) {
1✔
275
                                        internalRemoveAtIndex(idx);
1✔
276
                                        return true;
1✔
277
                                }
278

279
                                idx = increment(idx);
1✔
280
                        } while (idx != write);
1✔
281

282
                        return false;
1✔
283
                }
284
                finally {
285
                        lock.unlock();
1✔
286
                }
287
        }
288

289
        @Override
290
        public boolean removeAll(Collection<?> c) {
291
                Objects.requireNonNull(c);
1✔
292

293
                final ReentrantLock lock = this.lock;
1✔
294
                lock.lock();
1✔
295
                try {
296
                        return removeIf(c::contains);
1✔
297
                }
298
                finally {
299
                        lock.unlock();
1✔
300
                }
301
        }
302

303
        @Override
304
        public boolean retainAll(Collection<?> c) {
305
                Objects.requireNonNull(c);
1✔
306

307
                final ReentrantLock lock = this.lock;
1✔
308
                lock.lock();
1✔
309
                try {
310
                        return removeIf(o -> !c.contains(o));
1✔
311
                }
312
                finally {
313
                        lock.unlock();
1✔
314
                }
315
        }
316

317
        @Override
318
        public int size() {
319
                final ReentrantLock lock = this.lock;
1✔
320
                lock.lock();
1✔
321
                try {
322
                        return size;
1✔
323
                }
324
                finally {
325
                        lock.unlock();
1✔
326
                }
327
        }
328

329
        @Override
330
        public Object[] toArray() {
331
                return toArray(new Object[0]);
1✔
332
        }
333

334
        @Override
335
        @SuppressWarnings("unchecked")
336
        public <T> T[] toArray(T[] a) {
337
                Objects.requireNonNull(a);
1✔
338

339
                final ReentrantLock lock = this.lock;
1✔
340
                lock.lock();
1✔
341
                try {
342
                        final int size = this.size;
1✔
343
                        final T[] result = a.length < size ? (T[]) Array.newInstance(a.getClass().getComponentType(), size) : a;
1✔
344

345
                        final int n = elements.length - read;
1✔
346
                        if (size <= n) {
1✔
347
                                System.arraycopy(elements, read, result, 0, size);
1✔
348
                        } else {
UNCOV
349
                                System.arraycopy(elements, read, result, 0, n);
×
UNCOV
350
                                System.arraycopy(elements, 0, result, n, size - n);
×
351
                        }
352

353
                        if (result.length > size) {
1✔
354
                                for (int i = Math.max(0, size - 1); i < result.length; i++) {
1✔
355
                                        result[i] = null;
1✔
356
                                }
357
                        }
358

359
                        return result;
1✔
360
                }
361
                finally {
362
                        lock.unlock();
1✔
363
                }
364
        }
365

366
        public String toString() {
367
                final ReentrantLock lock = this.lock;
1✔
368
                lock.lock();
1✔
369
                try {
370
                        if (size == 0) {
1✔
371
                                return "[]";
1✔
372
                        }
373

374
                        StringBuilder sb = new StringBuilder();
1✔
375
                        sb.append('[');
1✔
376

377
                        int idx = read;
1✔
378
                        while (true) {
379
                                E e = elements[idx];
1✔
380
                                sb.append(e == this ? "(this Collection)" : e);
1✔
381

382
                                idx = (idx + 1) % maxElements;
1✔
383

384
                                if (idx == write) {
1✔
385
                                        return sb.append(']').toString();
1✔
386
                                } else {
387
                                        sb.append(',').append(' ');
1✔
388
                                }
389
                        }
1✔
390
                }
391
                finally {
392
                        lock.unlock();
1✔
393
                }
394
        }
395

396
        /* Internal implementations: MUST BE USED INSIDE LOCKS  */
397
        
398
        private int increment(int i) {
399
                return (i + 1) % maxElements;
1✔
400
        }
401

402
        private int decrement(int i) {
403
                return ((i == 0) ? maxElements : i) - 1;
1✔
404
        }
405

406
        private void internalAdd(E e) {
407
                if (size == maxElements) {
1✔
408
                        internalRemove();
1✔
409
                } else {
410
                        size++;
1✔
411
                }
412

413
                elements[write] = e;
1✔
414

415
                write = increment(write);
1✔
416
        }
1✔
417
        
418
        private boolean internalContains(Object o) {
419
                if (size > 0) {
1✔
420
                        int idx = read;
1✔
421
                        do {
422
                                if (o.equals(elements[idx])) {
1✔
423
                                        return true;
1✔
424
                                }
425

426
                                idx = (idx + 1) % maxElements;
1✔
427
                        } while (idx != write);
1✔
428
                }
429

430
                return false;
1✔
431
        }
432

433
        private E internalRemove() {
434
                final E element = elements[read];
1✔
435

436
                if (null != element) {
1✔
437
                        internalRemoveAtIndex(read);
1✔
438
                }
439

440
                return element;
1✔
441
        }
442

443
        private void internalRemoveAtIndex(int idx) {
444
                if (idx == read) {
1✔
445
                        elements[read] = null;
1✔
446

447
                        read = increment(read);
1✔
448

449
                        size--;
1✔
450
                        if (iterators != null) {
1✔
451
                                iterators.elementRemoved();
1✔
452
                        }
453
                } else {
454
                        int i = idx;
1✔
455
                        while (true) {
456
                                int next = increment(i);
1✔
457

458
                                if (next != write) {
1✔
459
                                        elements[i] = elements[next];
1✔
460
                                        i = next;
1✔
461
                                } else {
462
                                        elements[i] = null;
1✔
463
                                        write = i;
1✔
464
                                        break;
1✔
465
                                }
466
                        }
1✔
467

468
                        size--;
1✔
469
                        if (iterators != null) {
1✔
470
                                iterators.removedAt(idx);
1✔
471
                        }
472
                }
473
        }
1✔
474

475
        private void readObject(final ObjectInputStream in) throws IOException, ClassNotFoundException {
476
                in.defaultReadObject();
×
UNCOV
477
                lock = new ReentrantLock();
×
UNCOV
478
        }
×
479

480
        /**
481
         * A linked list maintaining references between the queue and any iterators
482
         * 
483
         * This class exists to ensure that iterator objects are properly updated when items are removed from the queue and
484
         * are invalidated if the underlying queue becomes incompatible with the iterator's view of it.
485
         * 
486
         * This is based on the implementation of ArrayBlockingQueue.Itrs and involves the same garbage collection scheme
487
         * described there.
488
         */
489
        private class Iterators {
490

491
                private static final int SHORT_SWEEP_PROBES = 4;
492

493
                private static final int LONG_SWEEP_PROBES = 16;
494

495
                private Node head;
496

497
                private Node sweeper = null;
1✔
498

499
                int cycles = 0;
1✔
500

501
                Iterators(Iterator iterator) {
1✔
502
                        register(iterator);
1✔
503
                }
1✔
504

505
                void register(Iterator iterator) {
506
                        head = new Node(iterator, head);
1✔
507
                }
1✔
508

509
                void elementRemoved() {
510
                        if (size == 0) {
1✔
511
                                queueEmptied();
1✔
512
                        } else if (read == 0) {
1✔
UNCOV
513
                                readWrapped();
×
514
                        }
515
                }
1✔
516

517
                void queueEmptied() {
518
                        for (Node p = head; p != null; p = p.next) {
1✔
519
                                Iterator it = p.get();
1✔
520
                                if (it != null) {
1✔
521
                                        p.clear();
1✔
522
                                        it.shutdown();
1✔
523
                                }
524
                        }
525

526
                        head = null;
1✔
527
                        iterators = null;
1✔
528
                }
1✔
529

530
                void removedAt(int removedIndex) {
531
                        prune(it -> it.removedAt(removedIndex));
1✔
532
                }
1✔
533

534
                void readWrapped() {
UNCOV
535
                        cycles++;
×
UNCOV
536
                        prune(Iterator::readWrapped);
×
UNCOV
537
                }
×
538

539
                void sweep(boolean tryHarder) {
540
                        int probes = tryHarder ? LONG_SWEEP_PROBES : SHORT_SWEEP_PROBES;
1✔
541
                        Node o, p;
542
                        final Node sweeper = this.sweeper;
1✔
543
                        boolean completeCycle;   // to limit search to one full sweep
544

545
                        if (sweeper == null) {
1✔
546
                                o = null;
1✔
547
                                p = head;
1✔
548
                                completeCycle = true;
1✔
549
                        } else {
550
                                o = sweeper;
1✔
551
                                p = o.next;
1✔
552
                                completeCycle = false;
1✔
553
                        }
554

555
                        for (; probes > 0; probes--) {
1✔
556
                                if (p == null) {
1✔
557
                                        if (completeCycle) {
1✔
558
                                                break;
1✔
559
                                        }
560

561
                                        o = null;
1✔
562
                                        p = head;
1✔
563
                                        completeCycle = true;
1✔
564
                                }
565

566
                                final Iterator it = p.get();
1✔
567
                                final Node next = p.next;
1✔
568
                                if (it == null || it.isDetached()) {
1✔
569
                                        // found a discarded/exhausted iterator
570
                                        probes = LONG_SWEEP_PROBES; // "try harder"
1✔
571
                                        // unlink p
572
                                        p.clear();
1✔
573
                                        p.next = null;
1✔
574
                                        if (o == null) {
1✔
575
                                                head = next;
1✔
576
                                                if (next == null) {
1✔
577
                                                        // We've run out of iterators to track; retire
578
                                                        iterators = null;
1✔
579
                                                        return;
1✔
580
                                                }
581
                                        } else {
UNCOV
582
                                                o.next = next;
×
583
                                        }
584
                                } else {
585
                                        o = p;
1✔
586
                                }
587
                                p = next;
1✔
588
                        }
589

590
                        this.sweeper = (p == null) ? null : o;
1✔
591
                }
1✔
592

593
                private void prune(Predicate<Iterator> shouldRemove) {
594
                        for (Node o = null, p = head; p != null; ) {
1✔
595
                                final Iterator it = p.get();
1✔
596
                                final Node next = p.next;
1✔
597

598
                                if (it == null || shouldRemove.test(it)) {
1✔
UNCOV
599
                                        p.clear();
×
UNCOV
600
                                        p.next = null;
×
601

UNCOV
602
                                        if (o == null) {
×
UNCOV
603
                                                head = next;
×
604
                                        } else {
UNCOV
605
                                                o.next = next;
×
606
                                        }
607
                                } else {
608
                                        o = p;
1✔
609
                                }
610

611
                                p = next;
1✔
612
                        }
1✔
613

614
                        if (head == null) {
1✔
UNCOV
615
                                ThreadSafeCircularFifoQueue.this.iterators = null;
×
616
                        }
617
                }
1✔
618

619
                /**
620
                 * The actual list node implementation
621
                 */
622
                private class Node extends WeakReference<Iterator> {
623

624
                        Node next;
625

626
                        Node(Iterator iterator, Node next) {
1✔
627
                                super(iterator);
1✔
628
                                this.next = next;
1✔
629
                        }
1✔
630
                }
631
        }
632

633
        /**
634
         * An attempt to be a straight-forward iterator implementation for a ThreadSafeCircularFifoQueue.
635
         * 
636
         * It should iterate over each member in the queue only once, assuming that the queue is not modified while this
637
         * iterator remains in use. If the underlying queue is modified, the iterator will attempt to recover and keep going.
638
         * 
639
         * If it becomes too far out of synch with the underlying data, an iterator may fail before iterating over all elements
640
         * in a queue. However if {@link #hasNext()} returns true, {@link #next()} will always return a result.
641
         */
642
        private class Iterator implements java.util.Iterator<E> {
643

644
                /* Special index values */
645

646
                // indicates an index that is invalid or empty
647
                private static final int NONE = -1;
648

649
                // used as a value for prevRead when the iterator is no longer valid
650
                private static final int DETACHED = -2;
651

652
                private int nextIndex;
653

654
                private E nextItem;
655

656
                private int prevIndex = NONE;
1✔
657

658
                private E prevItem = null;
1✔
659

660
                private int prevRead;
661

662
                private int prevCycles;
663

664
                Iterator() {
1✔
665
                        final ReentrantLock lock = ThreadSafeCircularFifoQueue.this.lock;
1✔
666
                        lock.lock();
1✔
667
                        try {
668
                                if (size == 0) {
1✔
669
                                        nextIndex = NONE;
1✔
670
                                        prevRead = DETACHED;
1✔
671
                                } else {
672
                                        nextItem = elements[read];
1✔
673
                                        nextIndex = read;
1✔
674
                                        prevRead = read;
1✔
675

676
                                        if (iterators == null) {
1✔
677
                                                iterators = new Iterators(this);
1✔
678
                                        } else {
679
                                                iterators.register(this);
1✔
680
                                                iterators.sweep(false);
1✔
681
                                        }
682

683
                                        prevCycles = iterators.cycles;
1✔
684
                                }
685
                        }
686
                        finally {
687
                                lock.unlock();
1✔
688
                        }
689
                }
1✔
690

691
                @Override
692
                public boolean hasNext() {
693
                        // no locks for the simplest case
694
                        if (nextItem != null) {
1✔
695
                                return true;
1✔
696
                        }
697

698
                        final ReentrantLock lock = ThreadSafeCircularFifoQueue.this.lock;
1✔
699
                        lock.lock();
1✔
700
                        try {
701
                                if (!isDetached()) {
1✔
702
                                        updateIndices();
1✔
703
                                        if (prevIndex >= 0) {
1✔
704
                                                prevItem = elements[prevIndex];
1✔
705
                                                detach();
1✔
706
                                        }
707
                                }
708
                        }
709
                        finally {
710
                                lock.unlock();
1✔
711
                        }
712
                        
713
                        return false;
1✔
714
                }
715

716
                @Override
717
                public E next() {
718
                        final E it = nextItem;
1✔
719
                        if (it == null) {
1✔
720
                                throw new NoSuchElementException();
1✔
721
                        }
722

723
                        final ReentrantLock lock = ThreadSafeCircularFifoQueue.this.lock;
1✔
724
                        lock.lock();
1✔
725
                        try {
726
                                if (!isDetached()) {
1✔
727
                                        updateIndices();
1✔
728
                                }
729
                                
730
                                prevIndex = nextIndex;
1✔
731
                                prevItem = it;
1✔
732

733
                                if (nextIndex < 0 || nextIndex == write) {
1✔
UNCOV
734
                                        nextIndex = NONE;
×
UNCOV
735
                                        nextItem = null;
×
736
                                } else {
737
                                        nextIndex = increment(nextIndex);
1✔
738
                                        nextItem = elements[nextIndex];
1✔
739
                                }
740

741
                                return it;
1✔
742
                        }
743
                        finally {
744
                                lock.unlock();
1✔
745
                        }
746
                }
747

748
                @Override
749
                public void remove() {
750
                        final ReentrantLock lock = ThreadSafeCircularFifoQueue.this.lock;
1✔
751
                        lock.lock();
1✔
752
                        try {
753
                                if (!isDetached()) {
1✔
754
                                        updateIndices();
1✔
755
                                }
756
                                
757
                                if (prevIndex == NONE) {
1✔
758
                                        throw new IllegalStateException();
1✔
759
                                } else if (prevIndex >= 0 && elements[prevIndex] == prevItem) {
1✔
760
                                        internalRemoveAtIndex(prevIndex);
1✔
761

762
                                        if (prevIndex != read) {
1✔
763
                                                nextIndex = Math.max(prevIndex, read);
1✔
764
                                        }
765
                                }
766

767
                                prevIndex = NONE;
1✔
768
                                prevItem = null;
1✔
769
                                
770
                                if (nextIndex < 0) {
1✔
UNCOV
771
                                        detach();
×
772
                                }
773
                        }
774
                        finally {
775
                                lock.unlock();
1✔
776
                        }
777
                }
1✔
778

779
                boolean readWrapped() {
UNCOV
780
                        if (isDetached()) {
×
UNCOV
781
                                return true;
×
782
                        }
783

784
                        if (iterators.cycles - prevCycles > 1) {
×
UNCOV
785
                                shutdown();
×
UNCOV
786
                                return true;
×
787
                        }
788

UNCOV
789
                        return false;
×
790
                }
791

792
                boolean removedAt(int removedIndex) {
793
                        if (isDetached()) {
1✔
UNCOV
794
                                return true;
×
795
                        }
796

797
                        final int cycles = iterators.cycles;
1✔
798
                        final int read = ThreadSafeCircularFifoQueue.this.read;
1✔
799
                        
800
                        int cycleDiff = cycles - prevCycles;
1✔
801

802
                        if (removedIndex < read) {
1✔
UNCOV
803
                                cycleDiff++;
×
804
                        }
805

806
                        final int removedDistance = (cycleDiff * maxElements) + (removedIndex - prevRead);
1✔
807

808
                        if (prevIndex >= 0) {
1✔
809
                                int x = distance(prevIndex);
1✔
810
                                if (x == removedDistance) {
1✔
811
                                        prevIndex = NONE;
1✔
UNCOV
812
                                } else if (x > removedDistance) {
×
UNCOV
813
                                        prevIndex = decrement(prevIndex);
×
814
                                }
815
                        }
816

817
                        if (nextIndex >= 0) {
1✔
818
                                int x = distance(nextIndex);
1✔
819
                                if (x == removedDistance) {
1✔
UNCOV
820
                                        nextIndex = NONE;
×
821
                                } else if (x > removedDistance) {
1✔
822
                                        nextIndex = decrement(nextIndex);
1✔
823
                                }
824
                        } else if (prevIndex < 0) {
1✔
825
                                // don't call detach() as returning true will trigger a full sweep anyways
UNCOV
826
                                prevRead = DETACHED;
×
UNCOV
827
                                return true;
×
828
                        }
829

830
                        return false;
1✔
831
                }
832

833
                void shutdown() {
834
                        // nextItem is not set to null as it has been cached and can be returned
835
                        nextIndex = nextIndex >= 0 ? NONE : nextIndex;
1✔
836
                        prevIndex = prevIndex >= 0 ? NONE : nextIndex;
1✔
837
                        prevItem = null;
1✔
838
                        prevRead = DETACHED;
1✔
839
                }
1✔
840

841
                /**
842
                 * Detach the iterator and trigger a sweep of the iterator queue
843
                 */
844
                private void detach() {
845
                        if (prevRead >= 0) {
1✔
846
                                prevRead = DETACHED;
1✔
847
                                iterators.sweep(true);
1✔
848
                        }
849
                }
1✔
850

851
                private boolean isDetached() {
852
                        return prevRead < 0;
1✔
853
                }
854

855
                private int distance(int index) {
856
                        int distance = index - prevRead;
1✔
857
                        if (distance < 0) {
1✔
UNCOV
858
                                distance += maxElements;
×
859
                        }
860
                        return distance;
1✔
861
                }
862
                
863
                private boolean indexInvalidated(int index, long dequeues) {
864
                        if (index < 0) {
1✔
865
                                return false;
1✔
866
                        }
867
                        
868
                        int distance = distance(index);
1✔
869
                        
870
                        return dequeues > distance;
1✔
871
                }
872
                
873
                private void updateIndices() {
874
                        final int cycles = ThreadSafeCircularFifoQueue.this.iterators.cycles;
1✔
875
                        if (cycles != prevCycles || read != prevRead) {
1✔
876
                                long dequeues = (cycles - prevCycles) * maxElements + (read - prevRead);
1✔
877
                                
878
                                if (indexInvalidated(prevIndex, dequeues)) {
1✔
UNCOV
879
                                        prevIndex = NONE;
×
880
                                }
881

882
                                if (indexInvalidated(nextIndex, dequeues)) {
1✔
UNCOV
883
                                        nextIndex = NONE;
×
884
                                }
885
                                
886
                                if (prevIndex < 0 && nextIndex < 0) {
1✔
UNCOV
887
                                        detach();
×
888
                                } else {
889
                                        prevCycles = cycles;
1✔
890
                                        prevRead = read;
1✔
891
                                }
892
                        }
893
                }
1✔
894
        }
895
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc