• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

trydofor / professional-mirana / #92

26 Jan 2025 09:07AM UTC coverage: 86.903% (-0.5%) from 87.427%
#92

push

trydofor
✨ R.ngError, CodeAware, NameAware #49

0 of 45 new or added lines in 6 files covered. (0.0%)

3 existing lines in 2 files now uncovered.

6861 of 7895 relevant lines covered (86.9%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.15
/src/main/java/pro/fessional/mirana/id/LightIdBufferedProvider.java
1
package pro.fessional.mirana.id;
2

3
import net.jcip.annotations.ThreadSafe;
4
import org.jetbrains.annotations.NotNull;
5
import pro.fessional.mirana.pain.TimeoutRuntimeException;
6

7
import java.util.LinkedList;
8
import java.util.List;
9
import java.util.NoSuchElementException;
10
import java.util.concurrent.ConcurrentHashMap;
11
import java.util.concurrent.ExecutorService;
12
import java.util.concurrent.SynchronousQueue;
13
import java.util.concurrent.ThreadFactory;
14
import java.util.concurrent.ThreadPoolExecutor;
15
import java.util.concurrent.TimeUnit;
16
import java.util.concurrent.atomic.AtomicBoolean;
17
import java.util.concurrent.atomic.AtomicInteger;
18
import java.util.concurrent.atomic.AtomicLong;
19
import java.util.concurrent.atomic.AtomicReference;
20

21
/**
22
 * <pre>
23
 * Lightweight lock, high performance, double-buffered light-id provider.
24
 *
25
 * The following 3 types of threads exist in total, and read threads are promoted to write threads and even load threads.
26
 * At the same time, there are multiple read threads, but only unique write threads, and unique load threads.
27
 *
28
 * - Read thread, normal light-id consumer
29
 * - Write thread, upgraded read thread or load thread to append fragment to buffer (segment)
30
 * - Load thread, async thread or upgraded read thread to load segment via loader.
31
 *
32
 * Double buffer works as the following mechanism, it will track the id usage and auto control the count of preloading, but not exceed maxCount.
33
 *
34
 * - When the Id balance is less than 20%, the only async preload is `maxUsage in 60s` or `maxCount`.
35
 * - When Id balance is exhausted, read threads upgrade to write threads, other read threads wait until woken up or timeout.
36
 * - When read thread upgrades to write thread, loader exists, this read thread switches buffer after spinning busy and so on.
37
 *
38
 * </pre>
39
 *
40
 * @author trydofor
41
 * @since 2019-05-26
42
 */
43
@ThreadSafe
44
public class LightIdBufferedProvider implements LightIdProvider {
45

46
    public static final int MAX_COUNT = 10_000;
47
    public static final int MIN_COUNT = 100;
48
    public static final int FIX_COUNT = 0;
49
    public static final int MAX_ERROR = 5;
50
    public static final long ERR_ALIVE = 120_000; // 2 minute
51
    public static final long TIME_OUT = 1000; // 1 second
52
    public static final Generator GENERATOR = (name, block, timeout) -> LightIdUtil.toId(block, timeout);
1✔
53

54
    private final ExecutorService executor;
55
    private final Loader loader;
56
    private final ConcurrentHashMap<String, SegmentBuffer> cache = new ConcurrentHashMap<>();
1✔
57

58
    private volatile long loadTimeout = TIME_OUT;
1✔
59
    private volatile int loadMaxError = MAX_ERROR;
1✔
60
    private volatile int loadMaxCount = MAX_COUNT;
1✔
61
    private volatile int loadMinCount = MIN_COUNT;
1✔
62
    private volatile int loadFixCount = FIX_COUNT;
1✔
63
    private volatile long loadErrAlive = ERR_ALIVE;
1✔
64
    private volatile Generator generator = GENERATOR;
1✔
65

66
    /**
67
     * default thread pool is core-size=3, max-size=64, keep-alive 60S
68
     *
69
     * @param loader the loader
70
     */
71
    public LightIdBufferedProvider(@NotNull Loader loader) {
72
        this(loader, new ThreadPoolExecutor(3, 64, 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), new ThreadFactory() {
×
73
            private final AtomicInteger counter = new AtomicInteger(1);
×
74

75
            @Override
76
            public Thread newThread(@NotNull Runnable r) {
77
                return new Thread(r, "light-id-buffered-provider-" + counter.getAndIncrement());
×
78
            }
79
        }));
80
    }
×
81

82
    /**
83
     * Buffered Multiple Thread Provider
84
     *
85
     * @param loader   the loader
86
     * @param executor the executor
87
     */
88
    public LightIdBufferedProvider(@NotNull Loader loader, @NotNull ExecutorService executor) {
1✔
89
        this.loader = loader;
1✔
90
        this.executor = executor;
1✔
91
    }
1✔
92

93
    public long getErrAlive() {
94
        return loadErrAlive;
×
95
    }
96

97

98
    public long getTimeout() {
99
        return loadTimeout;
×
100
    }
101

102
    public int getMaxError() {
103
        return loadMaxError;
×
104
    }
105

106
    public int getMaxCount() {
107
        return loadMaxCount;
×
108
    }
109

110
    public int getMinCount() {
111
        return loadMinCount;
×
112
    }
113

114
    public int getFixCount() {
115
        return loadFixCount;
×
116
    }
117

118
    public Generator getGenerator() {
119
        return generator;
×
120
    }
121

122
    /**
123
     * set the error status alive time, which will be cleared when it expires, default 2 minutes.
124
     * Less than 0 means it will not be cleared
125
     *
126
     * @param t time in mills
127
     * @see #ERR_ALIVE
128
     */
129
    public void setErrAlive(long t) {
130
        loadErrAlive = t;
×
131
    }
×
132

133
    /**
134
     * set request timeout in mills, default 1 second.
135
     *
136
     * @param t time in mills
137
     * @return ture if the count is greater than 0
138
     * @see #TIME_OUT
139
     */
140
    public boolean setTimeout(long t) {
141
        if (t > 0) {
×
142
            loadTimeout = t;
×
143
            return true;
×
144
        }
145
        else {
146
            return false;
×
147
        }
148
    }
149

150
    /**
151
     * set the max tolerated errors in the loading, default 5.
152
     *
153
     * @param n count
154
     * @return ture if the count is greater than 0
155
     * @see #MAX_ERROR
156
     */
157
    public boolean setMaxError(int n) {
158
        if (n >= 0) {
×
159
            loadMaxError = n;
×
160
            return true;
×
161
        }
162
        else {
163
            return false;
×
164
        }
165
    }
166

167
    /**
168
     * set the max count of preload, default is 10000
169
     *
170
     * @param n count
171
     * @return ture if the count is greater than 0
172
     * @see #MAX_COUNT
173
     */
174
    public boolean setMaxCount(int n) {
175
        if (n >= 0) {
×
176
            loadMaxCount = n;
×
177
            return true;
×
178
        }
179
        else {
180
            return false;
×
181
        }
182
    }
183

184
    /**
185
     * set the min count of preload, default is 100.
186
     *
187
     * @param n count
188
     * @return ture if the count is greater than 0
189
     * @see #MIN_COUNT
190
     */
191
    public boolean setMinCount(int n) {
192
        if (n >= 0) {
×
193
            loadMinCount = n;
×
194
            return true;
×
195
        }
196
        else {
197
            return false;
×
198
        }
199
    }
200

201
    /**
202
     * Whether to preload a fixed count of ids.
203
     * fixed if ge FIX_COUNT, dynamic otherwise
204
     *
205
     * @param n count, Fixed count if ge FIX_COUNT
206
     * @return success or not
207
     * @see #FIX_COUNT
208
     */
209
    public boolean setFixCount(int n) {
210
        if (n < FIX_COUNT) return false;
1✔
211

212
        loadFixCount = n;
1✔
213
        return true;
1✔
214
    }
215

216
    /**
217
     * set Sequence Handler to edit the sequence before the LightId
218
     *
219
     * @see LightIdUtil#toId(int, long)
220
     * @see #GENERATOR
221
     */
222
    public void setGenerator(@NotNull Generator generator) {
223
        this.generator = generator;
×
224
    }
×
225

226
    /**
227
     * Preload all LightId's in the block, doing this once at startup is enough.
228
     *
229
     * @param block id's block
230
     */
231
    public void preload(int block) {
232
        List<Segment> segments = loader.preload(block);
×
233
        for (Segment seg : segments) {
×
234
            SegmentBuffer buff = load(seg.getBlock(), seg.getName());
×
235
            buff.fillSegment(seg);
×
236
        }
×
237
    }
×
238

239
    /**
240
     * clean the error, and reset the counter.
241
     *
242
     * @param name  id's name
243
     * @param block id's block
244
     */
245
    public void cleanError(@NotNull String name, int block) {
246
        load(block, name).handleError(null);
1✔
247
    }
1✔
248

249
    @Override
250
    public long next(@NotNull String name, int block) {
251
        return load(block, name).nextId(loadTimeout);
1✔
252
    }
253

254
    @Override
255
    public long next(@NotNull String name, int block, long timeout) {
256
        if (timeout <= 0) timeout = loadTimeout;
1✔
257
        return load(block, name).nextId(timeout);
1✔
258
    }
259

260
    // init or reload
261
    private SegmentBuffer load(int block, String name) {
262
        return cache.computeIfAbsent(name + "@" + block, k -> new SegmentBuffer(name, block));
1✔
263
    }
264

265
    /////////////////////////////////////////////////
266
    private class SegmentStatus {
267
        private final long headSeq;
268
        private final long kneeSeq;
269
        private final long footSeq;
270
        private final long startMs;
271
        private final AtomicLong sequence;
272

273
        private SegmentStatus() {
1✔
274
            this.headSeq = -1;
1✔
275
            this.kneeSeq = -1;
1✔
276
            this.footSeq = -1;
1✔
277
            this.startMs = System.currentTimeMillis();
1✔
278
            sequence = new AtomicLong(0);
1✔
279
        }
1✔
280

281
        private SegmentStatus(Segment seg) {
1✔
282
            headSeq = seg.getHead();
1✔
283
            footSeq = seg.getFoot();
1✔
284
            kneeSeq = footSeq - (footSeq - headSeq) * 2 / 10; // 20% remaining
1✔
285
            startMs = System.currentTimeMillis();
1✔
286
            sequence = new AtomicLong(seg.getHead());
1✔
287
        }
1✔
288

289
        public int count60s(int mul) {
290
            int fix = loadFixCount;
1✔
291
            if (fix > 0) return fix;
1✔
292

293
            long ms = (System.currentTimeMillis() - startMs);
1✔
294
            long count = footSeq - headSeq + 1;
1✔
295
            if (ms > 0) {
1✔
UNCOV
296
                count = count * 60_000 / ms; // Reserve 60 seconds.
×
297
            }
298

299
            if (mul > 1) {
1✔
300
                count = count * mul;
×
301
            }
302

303
            int max = loadMaxCount;
1✔
304
            int min = loadMinCount;
1✔
305
            if (count < 0 || count > max) { // overflow
1✔
UNCOV
306
                return max;
×
307
            }
308
            else if (count < min) {
1✔
309
                return min;
1✔
310
            }
311
            else {
312
                return (int) count;
×
313
            }
314
        }
315
    }
316

317
    private class SegmentBuffer {
318
        private final String name;
319
        private final int block;
320

321
        private final LinkedList<Segment> segmentPool = new LinkedList<>();
1✔
322
        private final AtomicReference<SegmentStatus> segmentSlot = new AtomicReference<>(new SegmentStatus());
1✔
323

324
        private final AtomicBoolean loaderIdle = new AtomicBoolean(true);
1✔
325
        private final AtomicBoolean switchIdle = new AtomicBoolean(true);
1✔
326
        private final AtomicInteger awaitCount = new AtomicInteger(0);
1✔
327

328
        // Error messages on load, less need for consistency.
329
        private final AtomicInteger errorCount = new AtomicInteger(0);
1✔
330
        private final AtomicReference<RuntimeException> errorNewer = new AtomicReference<>();
1✔
331
        private final AtomicLong errorEpoch = new AtomicLong(0);
1✔
332

333
        public SegmentBuffer(String name, int block) {
1✔
334
            this.name = name;
1✔
335
            this.block = block;
1✔
336
        }
1✔
337

338
        public long nextId(final long timeout) {
339
            checkError();
1✔
340

341
            // no need to sync
342
            final SegmentStatus slot = segmentSlot.get();
1✔
343
            long seq = slot.sequence.getAndIncrement();
1✔
344

345
            // Not init or enough, waiting to be reloaded.
346
            if (seq > slot.footSeq) {
1✔
347
                pollSegment(timeout);
1✔
348
                return nextId(timeout); // require again
1✔
349
            }
350

351
            // preload
352
            if (seq > slot.kneeSeq) {
1✔
353
                loadSegment(slot.count60s(0), true);
×
354
            }
355

356
            return generator.gen(name, block, seq);
1✔
357
        }
358

359

360
        // append to the end of the pool
361
        public void fillSegment(final Segment seg) {
362
            if (seg == null) {
1✔
363
                return;
×
364
            }
365

366
            String err = null;
1✔
367
            if (seg.getBlock() != block) {
1✔
368
                err = "difference block, name=" + name + ", block=" + block + ",seg.block=" + seg.getBlock();
×
369
            }
370
            else if (!name.equalsIgnoreCase(seg.getName())) {
1✔
371
                err = "difference name, name=" + name + ", block=" + block + ",seg.name=" + seg.getName();
×
372
            }
373
            else {
374
                // Guaranteed insertion order, non-separable read and write
375
                synchronized (segmentPool) {
1✔
376
                    if (!segmentPool.isEmpty() && seg.getHead() <= segmentPool.getLast().getFoot()) {
1✔
377
                        err = "seg.start must bigger than last.endin, name=" + name + ",block=" + block; // Can overwrite previous err
×
378
                    }
379
                    else {
380
                        segmentPool.addLast(seg);
1✔
381
                    }
382
                }
1✔
383
            }
384

385
            handleError(err == null ? null : new IllegalStateException(err));
1✔
386
        }
1✔
387

388
        // no need to lock
389
        public void handleError(RuntimeException e) {
390
            if (e == null) {
1✔
391
                errorCount.set(0);
1✔
392
                errorNewer.set(null);
1✔
393
                errorEpoch.set(0);
1✔
394
            }
395
            else {
396
                errorCount.incrementAndGet();
1✔
397
                errorNewer.set(e);
1✔
398
                errorEpoch.set(System.currentTimeMillis());
1✔
399
            }
400
        }
1✔
401

402
        // async load, only one is active at a time
403
        private void loadSegment(final int count, final boolean async) {
404
            if (loaderIdle.compareAndSet(true, false)) {
1✔
405
                if (async) {
1✔
406
                    executor.submit(() -> loadSegment(count));
×
407
                }
408
                else {
409
                    loadSegment(count);
1✔
410
                }
411
            }
412
        }
1✔
413

414
        private void loadSegment(final int count) {
415
            try {
416
                Segment seg = loader.require(name, block, count, false);
1✔
417
                handleError(null); // before fillSegment
1✔
418
                fillSegment(seg);
1✔
419
            }
420
            catch (RuntimeException e) {
1✔
421
                handleError(e);
1✔
422
            }
423
            finally {
424
                loaderIdle.set(true); // no need to sync
1✔
425
            }
426
        }
1✔
427

428
        // out of sequence, switching or loading + switching
429
        private void pollSegment(long timeout) {
430

431
            final long throwMs = System.currentTimeMillis() + timeout;
1✔
432

433
            // the only one switching thread, and others are in waiting.
434
            if (!switchIdle.compareAndSet(true, false)) {
1✔
435
                try {
436
                    // Wait for timeout or wake up on successful switchover
437
                    synchronized (switchIdle) {
×
438
                        if (switchIdle.get()) {
×
439
                            return; // no timeout check
×
440
                        }
441
                        else {
442
                            awaitCount.incrementAndGet();
×
443
                            switchIdle.wait(timeout);
×
444
                        }
445
                    }
×
446
                }
447
                catch (InterruptedException e) {
×
448
                    Thread.currentThread().interrupt();
×
449
                    throw new IllegalStateException("dont interrupt me", e);
×
450
                }
×
451
                long now = System.currentTimeMillis();
×
452
                if (now > throwMs) {
×
453
                    throw new TimeoutRuntimeException("waiting segment pollTimeout=" + (now - throwMs + timeout));
×
454
                }
455
                else {
456
                    return;
×
457
                }
458
            }
459

460
            // only one thread can ben here, upgraded (1) writing thread, (2) loading+writing thread
461
            try {
462
                while (true) {
463
                    checkError();
1✔
464

465
                    final SegmentStatus status;
466
                    synchronized (segmentPool) {
1✔
467
                        Segment seg = segmentPool.poll();
1✔
468
                        if (seg == null) { // empty
1✔
469
                            status = segmentSlot.get();
1✔
470
                        }
471
                        else {
472
                            segmentSlot.set(new SegmentStatus(seg));
1✔
473
                            status = null;
1✔
474
                        }
475
                    }
1✔
476

477
                    if (status == null) {
1✔
478
                        break; // switchover, no timeout check
1✔
479
                    }
480
                    else {
481
                        loadSegment(status.count60s(awaitCount.get()), false); // upgrade the load thread
1✔
482
                    }
483

484
                    long now = System.currentTimeMillis();
1✔
485
                    if (now > throwMs) {
1✔
486
                        throw new TimeoutRuntimeException("switching segment loadTimeout=" + (now - throwMs + timeout));
×
487
                    }
488
                }
1✔
489
            }
490
            finally {
491
                synchronized (switchIdle) {
1✔
492
                    switchIdle.set(true);
1✔
493
                    awaitCount.set(0);
1✔
494
                    switchIdle.notifyAll();
1✔
495
                }
1✔
496
            }
497
        }
1✔
498

499
        private void checkError() {
500
            long lf = loadErrAlive;
1✔
501
            long ep = errorEpoch.get();
1✔
502
            if (lf > 0 && ep > 0 && (System.currentTimeMillis() - ep) > lf) {
1✔
503
                errorCount.set(0);
×
504
                errorNewer.set(null);
×
505
                errorEpoch.set(0);
×
506
                return;
×
507
            }
508

509
            RuntimeException err = errorNewer.get();
1✔
510
            if (err == null) {
1✔
511
                return;
1✔
512
            }
513

514
            // not exist
515
            if (err instanceof NoSuchElementException) {
1✔
516
                throw err;
1✔
517
            }
518

519
            // out of count
520
            if (errorCount.get() > loadMaxError) {
1✔
521
                throw err;
1✔
522
            }
523
        }
1✔
524
    }
525
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc