• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

igniterealtime / Smack / #2866

pending completion
#2866

push

github-actions

web-flow
Merge pull request #537 from MF1-MS/mf1-ms/use_xmpp_connection_as_local_socks5_address

Use XMPP connection as local socks5 address

10 of 10 new or added lines in 3 files covered. (100.0%)

16374 of 41845 relevant lines covered (39.13%)

0.39 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

22.26
/smack-core/src/main/java/org/jivesoftware/smack/util/ArrayBlockingQueueWithShutdown.java
1
/**
2
 *
3
 * Copyright 2014-2019 Florian Schmaus
4
 *
5
 * Licensed under the Apache License, Version 2.0 (the "License");
6
 * you may not use this file except in compliance with the License.
7
 * You may obtain a copy of the License at
8
 *
9
 *     http://www.apache.org/licenses/LICENSE-2.0
10
 *
11
 * Unless required by applicable law or agreed to in writing, software
12
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 * See the License for the specific language governing permissions and
15
 * limitations under the License.
16
 */
17
package org.jivesoftware.smack.util;
18

19
import java.util.AbstractQueue;
20
import java.util.Collection;
21
import java.util.Iterator;
22
import java.util.NoSuchElementException;
23
import java.util.concurrent.BlockingQueue;
24
import java.util.concurrent.TimeUnit;
25
import java.util.concurrent.locks.Condition;
26
import java.util.concurrent.locks.ReentrantLock;
27

28
/**
29
 * Like ArrayBlockingQueue but with additional {@link #shutdown()} and {@link #start} methods. Will
30
 * throw {@link InterruptedException} if Queue has been shutdown on {@link #take()} and
31
 * {@link #poll(long, TimeUnit)}.
32
 * <p>
33
 * Based on ArrayBlockingQueue of OpenJDK by Doug Lea (who released ArrayBlockingQueue as public
34
 * domain).
35
 *
36
 * @param <E> the type of elements held in this collection
37
 */
38
public class ArrayBlockingQueueWithShutdown<E> extends AbstractQueue<E> implements BlockingQueue<E> {
1✔
39

40
    private final E[] items;
41

42
    private int takeIndex;
43

44
    private int putIndex;
45

46
    private int count;
47

48
    private final ReentrantLock lock;
49

50
    private final Condition notEmpty;
51

52
    private final Condition notFull;
53

54
    private volatile boolean isShutdown = false;
1✔
55

56
    private int inc(int i) {
57
        return (++i == items.length) ? 0 : i;
1✔
58
    }
59

60
    private void insert(E e) {
61
        insert(e, true);
×
62
    }
×
63

64
    private void insert(E e, boolean signalNotEmpty) {
65
        items[putIndex] = e;
1✔
66
        putIndex = inc(putIndex);
1✔
67
        count++;
1✔
68
        if (signalNotEmpty) {
1✔
69
            notEmpty.signal();
1✔
70
        }
71
    }
1✔
72

73
    private E extract() {
74
        E e = items[takeIndex];
1✔
75
        items[takeIndex] = null;
1✔
76
        takeIndex = inc(takeIndex);
1✔
77
        count--;
1✔
78
        notFull.signal();
1✔
79
        return e;
1✔
80
    }
81

82
    private void removeAt(int i) {
83
        if (i == takeIndex) {
×
84
            items[takeIndex] = null;
×
85
            takeIndex = inc(takeIndex);
×
86
        }
87
        else {
88
            while (true) {
89
                int nexti = inc(i);
×
90
                if (nexti != putIndex) {
×
91
                    items[i] = items[nexti];
×
92
                    i = nexti;
×
93
                }
94
                else {
95
                    items[i] = null;
×
96
                    putIndex = i;
×
97
                    break;
×
98
                }
99
            }
×
100
        }
101
        count--;
×
102
        notFull.signal();
×
103
    }
×
104

105
    private static void checkNotNull(Object o) {
106
        if (o == null) {
1✔
107
            throw new NullPointerException();
×
108
        }
109
    }
1✔
110

111
    private void checkNotShutdown() throws InterruptedException {
112
        if (isShutdown) {
1✔
113
            throw new InterruptedException("Queue was already shut down");
1✔
114
        }
115
    }
1✔
116

117
    private boolean hasNoElements() {
118
        return count == 0;
1✔
119
    }
120

121
    private boolean hasElements() {
122
        return !hasNoElements();
×
123
    }
124

125
    private boolean isFull() {
126
        return count == items.length;
1✔
127
    }
128

129
    private boolean isNotFull() {
130
        return !isFull();
×
131
    }
132

133
    public ArrayBlockingQueueWithShutdown(int capacity) {
134
        this(capacity, false);
×
135
    }
×
136

137
    @SuppressWarnings("unchecked")
138
    public ArrayBlockingQueueWithShutdown(int capacity, boolean fair) {
1✔
139
        if (capacity <= 0)
1✔
140
            throw new IllegalArgumentException();
×
141
        items = (E[]) new Object[capacity];
1✔
142
        lock = new ReentrantLock(fair);
1✔
143
        notEmpty = lock.newCondition();
1✔
144
        notFull = lock.newCondition();
1✔
145
    }
1✔
146

147
    /**
148
     * Shutdown the Queue. Will method currently waiting for a not full/empty condition will unblock
149
     * (and usually throw a InterruptedException).
150
     */
151
    public void shutdown() {
152
        lock.lock();
1✔
153
        try {
154
            isShutdown = true;
1✔
155
            notEmpty.signalAll();
1✔
156
            notFull.signalAll();
1✔
157
        }
158
        finally {
159
            lock.unlock();
1✔
160
        }
161
    }
1✔
162

163
    /**
164
     * Start the queue. Newly created instances will be started automatically, thus this only needs
165
     * to be called after {@link #shutdown()}.
166
     *
167
     * @return <code>true</code> if the queues was shutdown before, <code>false</code> if not.
168
     */
169
    public boolean start() {
170
        boolean previousIsShutdown;
171
        lock.lock();
1✔
172
        try {
173
            previousIsShutdown = isShutdown;
1✔
174
            isShutdown = false;
1✔
175
        }
176
        finally {
177
            lock.unlock();
1✔
178
        }
179
        return previousIsShutdown;
1✔
180
    }
181

182
    /**
183
     * Returns true if the queue is currently shut down.
184
     *
185
     * @return true if the queue is shut down.
186
     */
187
    public boolean isShutdown() {
188
        lock.lock();
×
189
        try {
190
            return isShutdown;
×
191
        } finally {
192
            lock.unlock();
×
193
        }
194
    }
195

196
    @Override
197
    public E poll() {
198
        lock.lock();
×
199
        try {
200
            if (hasNoElements()) {
×
201
                return null;
×
202
            }
203
            E e = extract();
×
204
            return e;
×
205
        }
206
        finally {
207
            lock.unlock();
×
208
        }
209
    }
210

211
    @Override
212
    public E peek() {
213
        lock.lock();
×
214
        try {
215
            return hasNoElements() ? null : items[takeIndex];
×
216
        }
217
        finally {
218
            lock.unlock();
×
219
        }
220
    }
221

222
    @Override
223
    public boolean offer(E e) {
224
        checkNotNull(e);
×
225
        lock.lock();
×
226
        try {
227
            if (isFull() || isShutdown) {
×
228
                return false;
×
229
            }
230
            else {
231
                insert(e);
×
232
                return true;
×
233
            }
234
        }
235
        finally {
236
            lock.unlock();
×
237
        }
238
    }
239

240
    public boolean offerAndShutdown(E e) {
241
        checkNotNull(e);
×
242
        boolean res;
243
        lock.lock();
×
244
        try {
245
            res = offer(e);
×
246
            shutdown();
×
247
        } finally {
248
            lock.unlock();
×
249
        }
250
        return res;
×
251
    }
252

253
    private void putInternal(E e, boolean signalNotEmpty) throws InterruptedException {
254
        assert lock.isHeldByCurrentThread();
1✔
255

256
        while (isFull()) {
1✔
257
            try {
258
                notFull.await();
1✔
259
                checkNotShutdown();
×
260
            }
261
            catch (InterruptedException ie) {
1✔
262
                notFull.signal();
1✔
263
                throw ie;
1✔
264
            }
×
265
        }
266
        insert(e, signalNotEmpty);
1✔
267
    }
1✔
268

269
    /**
270
     * Inserts the specified element into this queue, waiting if necessary
271
     * for space to become available.
272
     * <p>
273
     * This may throw an {@link InterruptedException} in two cases
274
     * <ol>
275
     *  <li>If the queue was shut down.</li>
276
     *  <li>If the thread was was interrupted.</li>
277
     * </ol>
278
     * So you have to check which is the case, e.g. by calling {@link #isShutdown()}.
279
     *
280
     * @param e the element to add.
281
     * @throws InterruptedException if interrupted while waiting or if the queue was shut down.
282
     */
283
    @Override
284
    public void put(E e) throws InterruptedException {
285
        checkNotNull(e);
1✔
286
        lock.lockInterruptibly();
1✔
287

288
        try {
289
            putInternal(e, true);
1✔
290
        }
291
        finally {
292
            lock.unlock();
1✔
293
        }
294
    }
1✔
295

296
    /**
297
     * Put if the queue has not been shutdown yet.
298
     *
299
     * @param e the element to put into the queue.
300
     * @return <code>true</code> if the element has been put into the queue, <code>false</code> if the queue was shutdown.
301
     * @throws InterruptedException if the calling thread was interrupted.
302
     * @since 4.4
303
     */
304
    public boolean putIfNotShutdown(E e) throws InterruptedException {
305
        checkNotNull(e);
×
306
        lock.lockInterruptibly();
×
307

308
        try {
309
            if (isShutdown) {
×
310
                return false;
×
311
            }
312

313
            putInternal(e, true);
×
314
            return true;
×
315
        } finally {
316
            lock.unlock();
×
317
        }
318
    }
319

320
    public void putAll(Collection<? extends E> elements) throws InterruptedException {
321
        checkNotNull(elements);
×
322
        lock.lockInterruptibly();
×
323

324
        try {
325
            for (E element : elements) {
×
326
                putInternal(element, false);
×
327
            }
×
328
        } finally {
329
            notEmpty.signalAll();
×
330
            lock.unlock();
×
331
        }
332
    }
×
333

334
    public enum TryPutResult {
×
335
        /**
336
         * The method was unable to acquire the queue lock.
337
         */
338
        couldNotLock,
×
339

340
        /**
341
         * The queue was shut down.
342
         */
343
        queueWasShutDown,
×
344

345
        /**
346
         * The method was unable to put another element into the queue because the queue was full.
347
         */
348
        queueWasFull,
×
349

350
        /**
351
         * The element was successfully placed into the queue.
352
         */
353
        putSuccessful,
×
354
    }
355

356
    public TryPutResult tryPut(E e) {
357
        checkNotNull(e);
×
358

359
        boolean locked = lock.tryLock();
×
360
        if (!locked) {
×
361
            return TryPutResult.couldNotLock;
×
362
        }
363
        try {
364
            if (isShutdown) {
×
365
                return TryPutResult.queueWasShutDown;
×
366
            }
367
            if (isFull()) {
×
368
                return TryPutResult.queueWasFull;
×
369
            }
370

371
            insert(e);
×
372
            return TryPutResult.putSuccessful;
×
373
        } finally {
374
            lock.unlock();
×
375
        }
376
    }
377

378
    @Override
379
    public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
380
        checkNotNull(e);
×
381
        long nanos = unit.toNanos(timeout);
×
382
        lock.lockInterruptibly();
×
383
        try {
384
            while (true) {
385
                if (isNotFull()) {
×
386
                    insert(e);
×
387
                    return true;
×
388
                }
389
                if (nanos <= 0) {
×
390
                    return false;
×
391
                }
392
                try {
393
                    nanos = notFull.awaitNanos(nanos);
×
394
                    checkNotShutdown();
×
395
                }
396
                catch (InterruptedException ie) {
×
397
                    notFull.signal();
×
398
                    throw ie;
×
399
                }
×
400
            }
401
        }
402
        finally {
403
            lock.unlock();
×
404
        }
405

406
    }
407

408
    @Override
409
    public E take() throws InterruptedException {
410
        lock.lockInterruptibly();
1✔
411
        try {
412
            checkNotShutdown();
1✔
413
            try {
414
                while (hasNoElements()) {
1✔
415
                    notEmpty.await();
×
416
                    checkNotShutdown();
×
417
                }
418
            }
419
            catch (InterruptedException ie) {
×
420
                notEmpty.signal();
×
421
                throw ie;
×
422
            }
1✔
423
            E e = extract();
1✔
424
            return e;
1✔
425
        }
426
        finally {
427
            lock.unlock();
1✔
428
        }
429
    }
430

431
    public enum TryTakeResultCode {
×
432
        /**
433
         * The method was unable to acquire the queue lock.
434
         */
435
        couldNotLock,
×
436

437
        /**
438
         * The queue was shut down.
439
         */
440
        queueWasShutDown,
×
441

442
        /**
443
         * The queue was empty.
444
         */
445
        queueWasEmpty,
×
446

447
        /**
448
         * An element was successfully removed from the queue.
449
         */
450
        takeSuccessful,
×
451
    }
452

453
    public static final class TryTakeResult<E> {
×
454
        private final E element;
455
        private final TryTakeResultCode resultCode;
456

457
        private TryTakeResult(TryTakeResultCode resultCode) {
×
458
            assert resultCode != null;
×
459
            this.resultCode = resultCode;
×
460
            this.element = null;
×
461
        }
×
462

463
        private TryTakeResult(E element) {
×
464
            assert element != null;
×
465
            this.resultCode = TryTakeResultCode.takeSuccessful;
×
466
            this.element = element;
×
467
        }
×
468

469
        public TryTakeResultCode getResultCode() {
470
            return resultCode;
×
471
        }
472

473
        public E getElement() {
474
            return element;
×
475
        }
476
    }
477

478
    public TryTakeResult<E> tryTake() {
479
        boolean locked = lock.tryLock();
×
480
        if (!locked) {
×
481
            return new TryTakeResult<E>(TryTakeResultCode.couldNotLock);
×
482
        }
483
        try {
484
            if (isShutdown) {
×
485
                return new TryTakeResult<E>(TryTakeResultCode.queueWasShutDown);
×
486
            }
487
            if (hasNoElements()) {
×
488
                return new TryTakeResult<E>(TryTakeResultCode.queueWasEmpty);
×
489
            }
490
            E element = extract();
×
491
            return new TryTakeResult<E>(element);
×
492
        } finally {
493
            lock.unlock();
×
494
        }
495
    }
496

497
    @Override
498
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
499
        long nanos = unit.toNanos(timeout);
×
500
        lock.lockInterruptibly();
×
501
        try {
502
            checkNotShutdown();
×
503
            while (true) {
504
                if (hasElements()) {
×
505
                    E e = extract();
×
506
                    return e;
×
507
                }
508
                if (nanos <= 0) {
×
509
                    return null;
×
510
                }
511
                try {
512
                    nanos = notEmpty.awaitNanos(nanos);
×
513
                    checkNotShutdown();
×
514
                }
515
                catch (InterruptedException ie) {
×
516
                    notEmpty.signal();
×
517
                    throw ie;
×
518
                }
×
519
            }
520
        }
521
        finally {
522
            lock.unlock();
×
523
        }
524
    }
525

526
    @Override
527
    public int remainingCapacity() {
528
        lock.lock();
×
529
        try {
530
            return items.length - count;
×
531
        }
532
        finally {
533
            lock.unlock();
×
534
        }
535
    }
536

537
    @Override
538
    public int drainTo(Collection<? super E> c) {
539
        checkNotNull(c);
×
540
        if (c == this) {
×
541
            throw new IllegalArgumentException();
×
542
        }
543
        lock.lock();
×
544
        try {
545
            int i = takeIndex;
×
546
            int n = 0;
×
547
            for (; n < count; n++) {
×
548
                c.add(items[i]);
×
549
                items[i] = null;
×
550
                i = inc(i);
×
551
            }
552
            if (n > 0) {
×
553
                count = 0;
×
554
                putIndex = 0;
×
555
                takeIndex = 0;
×
556
                notFull.signalAll();
×
557
            }
558
            return n;
×
559
        }
560
        finally {
561
            lock.unlock();
×
562
        }
563
    }
564

565
    @Override
566
    public int drainTo(Collection<? super E> c, int maxElements) {
567
        checkNotNull(c);
×
568
        if (c == this) {
×
569
            throw new IllegalArgumentException();
×
570
        }
571
        if (maxElements <= 0) {
×
572
            return 0;
×
573
        }
574
        lock.lock();
×
575
        try {
576
            int i = takeIndex;
×
577
            int n = 0;
×
578
            int max = (maxElements < count) ? maxElements : count;
×
579
            for (; n < max; n++) {
×
580
                c.add(items[i]);
×
581
                items[i] = null;
×
582
                i = inc(i);
×
583
            }
584
            if (n > 0) {
×
585
                count -= n;
×
586
                takeIndex = i;
×
587
                notFull.signalAll();
×
588
            }
589
            return n;
×
590
        }
591
        finally {
592
            lock.unlock();
×
593
        }
594
    }
595

596
    @Override
597
    public int size() {
598
        lock.lock();
1✔
599
        try {
600
            return count;
1✔
601
        }
602
        finally {
603
            lock.unlock();
1✔
604
        }
605
    }
606

607
    @Override
608
    public Iterator<E> iterator() {
609
        lock.lock();
×
610
        try {
611
            return new Itr();
×
612
        }
613
        finally {
614
            lock.unlock();
×
615
        }
616
    }
617

618
    private class Itr implements Iterator<E> {
619
        private int nextIndex;
620
        private E nextItem;
621
        private int lastRet;
622

623
        Itr() {
×
624
            lastRet = -1;
×
625
            if (count == 0) {
×
626
                nextIndex = -1;
×
627
            }
628
            else {
629
                nextIndex = takeIndex;
×
630
                nextItem = items[takeIndex];
×
631
            }
632
        }
×
633

634
        @Override
635
        public boolean hasNext() {
636
            return nextIndex >= 0;
×
637
        }
638

639
        private void checkNext() {
640
            if (nextIndex == putIndex) {
×
641
                nextIndex = -1;
×
642
                nextItem = null;
×
643
            }
644
            else {
645
                nextItem = items[nextIndex];
×
646
                if (nextItem == null) {
×
647
                    nextIndex = -1;
×
648
                }
649
            }
650
        }
×
651

652
        @Override
653
        public E next() {
654
            lock.lock();
×
655
            try {
656
                if (nextIndex < 0) {
×
657
                    throw new NoSuchElementException();
×
658
                }
659
                lastRet = nextIndex;
×
660
                E e = nextItem;
×
661
                nextIndex = inc(nextIndex);
×
662
                checkNext();
×
663
                return e;
×
664
            }
665
            finally {
666
                lock.unlock();
×
667
            }
668
        }
669

670
        @Override
671
        public void remove() {
672
            lock.lock();
×
673
            try {
674
                int i = lastRet;
×
675
                if (i < 0) {
×
676
                    throw new IllegalStateException();
×
677
                }
678
                lastRet = -1;
×
679
                int ti = takeIndex;
×
680
                removeAt(i);
×
681
                nextIndex = (i == ti) ? takeIndex : i;
×
682
                checkNext();
×
683
            }
684
            finally {
685
                lock.unlock();
×
686
            }
687
        }
×
688
    }
689

690
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc