• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

evolvedbinary / elemental / 982

29 Apr 2025 08:34PM UTC coverage: 56.409% (+0.007%) from 56.402%
982

push

circleci

adamretter
[feature] Improve README.md badges

28451 of 55847 branches covered (50.94%)

Branch coverage included in aggregate %.

77468 of 131924 relevant lines covered (58.72%)

0.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

60.96
/exist-core/src/main/java/org/exist/util/WeakLazyStripes.java
1
/*
2
 * Elemental
3
 * Copyright (C) 2024, Evolved Binary Ltd
4
 *
5
 * admin@evolvedbinary.com
6
 * https://www.evolvedbinary.com | https://www.elemental.xyz
7
 *
8
 * Use of this software is governed by the Business Source License 1.1
9
 * included in the LICENSE file and at www.mariadb.com/bsl11.
10
 *
11
 * Change Date: 2028-04-27
12
 *
13
 * On the date above, in accordance with the Business Source License, use
14
 * of this software will be governed by the Apache License, Version 2.0.
15
 *
16
 * Additional Use Grant: Production use of the Licensed Work for a permitted
17
 * purpose. A Permitted Purpose is any purpose other than a Competing Use.
18
 * A Competing Use means making the Software available to others in a commercial
19
 * product or service that: substitutes for the Software; substitutes for any
20
 * other product or service we offer using the Software that exists as of the
21
 * date we make the Software available; or offers the same or substantially
22
 * similar functionality as the Software.
23
 */
24
package org.exist.util;
25

26
import it.unimi.dsi.fastutil.objects.Object2ObjectOpenHashMap;
27
import net.jcip.annotations.GuardedBy;
28
import net.jcip.annotations.ThreadSafe;
29

30
import javax.annotation.Nullable;
31
import java.lang.ref.Reference;
32
import java.lang.ref.ReferenceQueue;
33
import java.lang.ref.WeakReference;
34
import java.util.concurrent.atomic.AtomicBoolean;
35
import java.util.concurrent.atomic.AtomicInteger;
36
import java.util.concurrent.locks.StampedLock;
37
import java.util.function.Function;
38

39
/**
40
 * Inspired by Guava's com.google.common.util.concurrent.Striped#lazyWeakReadWriteLock(int)
41
 * implementation.
42
 * See <a href="https://google.github.io/guava/releases/21.0/api/docs/com/google/common/util/concurrent/Striped.html#lazyWeakReadWriteLock-int-">https://google.github.io/guava/releases/21.0/api/docs/com/google/common/util/concurrent/Striped.html#lazyWeakReadWriteLock-int-</a>.
43
 *
44
 * However this is much simpler, and there is no hashing; we
45
 * will always return the same object (stripe) for the same key.
46
 *
47
 * This class basically couples Weak References with a
48
 * thread safe HashMap and manages draining expired Weak
49
 * References from the HashMap.
50
 *
51
 * Weak References will be cleaned up from the internal map
52
 * after they have been cleared by the GC. Two cleanup policies
53
 * are provided: "Batch" and "Amortize". The policy is chosen
54
 * by the constructor parameter {@code amortizeCleanup}.
55
 *
56
 * Batch Cleanup
57
 *     With Batch Cleanup, expired Weak References will
58
 *     be collected up to the {@link #MAX_EXPIRED_REFERENCE_READ_COUNT}
59
 *     limit, at which point the calling thread which causes
60
 *     that ceiling to be detected will cleanup all expired references.
61
 *
62
 * Amortize Cleanup
63
 *     With Amortize Cleanup, each calling thread will attempt
64
 *     to cleanup up to {@link #DRAIN_MAX} expired weak
65
 *     references on each write operation, or after
66
 *     {@link #READ_DRAIN_THRESHOLD} since the last cleanup.
67
 *
68
 * With either cleanup policy, only a single calling thread
69
 * performs the cleanup at any time.
70
 *
71
 * @param <K> The type of the key for the stripe.
72
 * @param <S> The type of the stripe.
73
 *
74
 * @author <a href="mailto:adam@evolvedbinary.com">Adam Retter</a>
75
 */
76
@ThreadSafe
77
public class WeakLazyStripes<K, S> {
78
    private static final int INITIAL_CAPACITY = 1000;
79
    private static final float LOAD_FACTOR = 0.75f;
80

81
    /**
82
     * When {@link #amortizeCleanup} is false, this is the
83
     * number of reads allowed which return expired references
84
     * before calling {@link #drainClearedReferences()}.
85
     */
86
    private static final int MAX_EXPIRED_REFERENCE_READ_COUNT = 1000;
87

88
    /**
89
     * When {@link #amortizeCleanup} is true, this is the
90
     * number of reads which are performed between calls
91
     * to {@link #drainClearedReferences()}.
92
     */
93
    private static final int READ_DRAIN_THRESHOLD = 64;
94

95
    /**
96
     * When {@link #amortizeCleanup} is true, this is the
97
     * maximum number of entries to be drained
98
     * by {@link #drainClearedReferences()}.
99
     */
100
    private static final int DRAIN_MAX = 16;
101

102
    private final ReferenceQueue<S> referenceQueue;
103

104
    private final StampedLock stripesLock = new StampedLock();
1✔
105
    @GuardedBy("stripesLock") private final Object2ObjectOpenHashMap<K, WeakValueReference<K, S>> stripes;
106

107
    /**
108
     * The number of reads on {@link #stripes} which have returned
109
     * expired weak references.
110
     */
111
    private final AtomicInteger expiredReferenceReadCount = new AtomicInteger();
1✔
112

113
    /**
114
     * The number of reads on {@link #stripes} since
115
     * {@link #drainClearedReferences()} was last
116
     * completed.
117
     */
118
    private final AtomicInteger readCount = new AtomicInteger();
1✔
119

120
    private final Function<K, S> creator;
121
    private final boolean amortizeCleanup;
122

123
    /**
124
     * Guard so that only a single thread drains
125
     * references at once.
126
     */
127
    private final AtomicBoolean draining = new AtomicBoolean();
1✔
128

129
    /**
130
     * Constructs a WeakLazyStripes where the concurrencyLevel
131
     * is the lower of either ConcurrentHashMap#DEFAULT_CONCURRENCY_LEVEL
132
     * or {@code Runtime.getRuntime().availableProcessors() * 2}.
133
     *
134
     * @param creator A factory for creating new Stripes when needed
135
     */
136
    public WeakLazyStripes(final Function<K, S> creator) {
137
       this(Math.min(16, Runtime.getRuntime().availableProcessors() * 2), creator);  // 16 == ConcurrentHashMap#DEFAULT_CONCURRENCY_LEVEL
1✔
138
    }
1✔
139

140
    /**
141
     * Constructs a WeakLazyStripes.
142
     *
143
     * @param concurrencyLevel The concurrency level for the underlying stripes map
144
     * @param creator A factory for creating new Stripes when needed
145
     */
146
    public WeakLazyStripes(final int concurrencyLevel, final Function<K, S> creator) {
147
        this(concurrencyLevel, creator, true);
1✔
148
    }
1✔
149

150
    /**
151
     * Constructs a WeakLazyStripes.
152
     *
153
     * @param concurrencyLevel The concurrency level for the underlying stripes map
154
     * @param creator A factory for creating new Stripes when needed
155
     * @param amortizeCleanup true if the cleanup of weak references should be
156
     *     amortized across many calls (default), false if the cleanup should be batched up
157
     *     and apportioned to a particular caller at a threshold
158
     */
159
    public WeakLazyStripes(final int concurrencyLevel, final Function<K, S> creator, final boolean amortizeCleanup) {
1✔
160
        this.stripes = new Object2ObjectOpenHashMap<>(INITIAL_CAPACITY, LOAD_FACTOR);
1✔
161
        this.referenceQueue = new ReferenceQueue<>();
1✔
162
        this.creator = creator;
1✔
163
        this.amortizeCleanup = amortizeCleanup;
1✔
164
    }
1✔
165

166
    /**
167
     * Get the stripe for the given key
168
     *
169
     * If the stripe does not exist, it will be created by
170
     * calling {@link Function#apply(Object)} on {@link #creator}
171
     *
172
     * @param key the key for the stripe
173
     * @return the stripe
174
     */
175
    public S get(final K key) {
176

177
        while (true) {
178

179
            final Holder<Boolean> written = new Holder<>(false);
1✔
180

181
            // 1) attempt lookup via optimistic read and immediate conversion to write lock
182
            WeakValueReference<K, S> stripeRef = getOptimistic(key, written);
1✔
183
            if (stripeRef == null) {
1✔
184

185
                // 2) attempt lookup via pessimistic read and immediate conversion to write lock
186
                stripeRef = getPessimistic(key, written);
1✔
187
                if (stripeRef == null) {
1!
188

189
                    // 3) attempt lookup via exclusive write lock
190
                    stripeRef = getExclusive(key, written);
×
191
                }
192
            }
193

194
            if (amortizeCleanup) {
1!
195
                if (written.value) {
1✔
196
                    // TODO (AR) if we find that we are too frequently draining and it is expensive
197
                    // then we could make the read and write drain paths both use the DRAIN_THRESHOLD
198
                    drainClearedReferences();
1✔
199
                } else if (readCount.get() >= READ_DRAIN_THRESHOLD) {
1✔
200
                    drainClearedReferences();
1✔
201
                }
202
            } else {
1✔
203
                // have we reached the threshold where we should clear
204
                // out any cleared WeakReferences from the stripes map
205
                final int count = expiredReferenceReadCount.get();
×
206
                if (count > MAX_EXPIRED_REFERENCE_READ_COUNT
×
207
                        && expiredReferenceReadCount.compareAndSet(count, 0)) {
×
208
                    drainClearedReferences();
×
209
                }
210
            }
211

212
            // check the weak reference before returning!
213
            final S stripe = stripeRef.get();
1✔
214
            if (stripe != null) {
1✔
215
                return stripe;
1✔
216
            }
217

218
            // weak reference has expired in the mean time, so loop...
219
        }
220
    }
221

222
    /**
223
     * Get the stripe via immediate conversion of an optimistic read lock to a write lock.
224
     *
225
     * @param key the stripe key
226
     * @param written (OUT) will be set to true if {@link #stripes} was updated
227
     *
228
     * @return null if we could not perform an optimistic read, or a new object needed to be
229
     *     created and we could not take the {@link #stripesLock} write lock immediately,
230
     *     otherwise the stripe.
231
     */
232
    private @Nullable WeakValueReference<K, S> getOptimistic(final K key, final Holder<Boolean> written) {
233
        // optimistic read
234
        final long stamp = stripesLock.tryOptimisticRead();
1✔
235
        WeakValueReference<K, S> stripeRef;
236
        try {
237
            stripeRef = stripes.get(key);
1✔
238
        } catch (final ArrayIndexOutOfBoundsException e) {
1✔
239
            // this can occur as we don't hold a lock, we just have a stamp for an optimistic read,
240
            // so `stripes` might be concurrently modified
241
            return null;
×
242
        }
243
        if (stripeRef == null || stripeRef.get() == null) {
1✔
244
            final long writeStamp = stripesLock.tryConvertToWriteLock(stamp);
1✔
245
            if (writeStamp != 0L) {
1✔
246
                final boolean wasGCd = stripeRef != null && stripeRef.get() == null;
1!
247
                try {
248
                    stripeRef = new WeakValueReference<>(key, creator.apply(key), referenceQueue);
1✔
249
                    stripes.put(key, stripeRef);
1✔
250
                } finally {
1✔
251
                    stripesLock.unlockWrite(writeStamp);
1✔
252
                }
253

254
                written.value = true;
1✔
255

256
                if (wasGCd && !amortizeCleanup) {
1!
257
                    expiredReferenceReadCount.incrementAndGet();
×
258
                }
259
            } else {
×
260
                // invalid conversion to write lock... small optimisation for the fall-through to #getPessimistic(K, Holder) in #get(K)
261
                stripeRef = null;
1✔
262
            }
263
        } else {
1✔
264
            if (stripesLock.validate(stamp)) {
1✔
265
                if (amortizeCleanup) {
1!
266
                    readCount.incrementAndGet();
1✔
267
                }
268
            } else {
1✔
269
                // invalid optimistic read
270
                stripeRef = null;
1✔
271
            }
272
        }
273

274
        return stripeRef;
1✔
275
    }
276

277
    /**
278
     * Get the stripe via immediate conversion of a read lock to a write lock.
279
     *
280
     * @param key the stripe key
281
     * @param written (OUT) will be set to true if {@link #stripes} was updated
282
     *
283
     * @return null if a new object needed to be created and we could not take the {@link #stripesLock}
284
     *     write lock immediately, otherwise the stripe.
285
     */
286
    private @Nullable WeakValueReference<K, S> getPessimistic(final K key, final Holder<Boolean> written) {
287
        WeakValueReference<K, S> stripeRef;
288
        long stamp = stripesLock.readLock();
1✔
289
        try {
290
            stripeRef = stripes.get(key);
1✔
291
            if (stripeRef == null || stripeRef.get() == null) {
1!
292
                final long writeStamp = stripesLock.tryConvertToWriteLock(stamp);
×
293
                if (writeStamp != 0L) {
×
294
                    final boolean wasGCd = stripeRef != null && stripeRef.get() == null;
×
295

296
                    stamp = writeStamp;  // NOTE: this causes the write lock to be released in the finally further down
×
297
                    stripeRef = new WeakValueReference<>(key, creator.apply(key), referenceQueue);
×
298
                    stripes.put(key, stripeRef);
×
299

300
                    written.value = true;
×
301

302
                    if (wasGCd && !amortizeCleanup) {
×
303
                        expiredReferenceReadCount.incrementAndGet();
×
304
                    }
305
                } else {
×
306
                    // invalid conversion to write lock... small optimisation for the fall-through to #getExclusive(K, Holder) in #get(K)
307
                    stripeRef = null;
×
308
                }
309

310
                return stripeRef;
×
311
            }
312
        } finally {
313
            stripesLock.unlock(stamp);
1✔
314
        }
315

316
        // else (we don't need the lock on this path)
317
        if (amortizeCleanup) {
1!
318
            readCount.incrementAndGet();
1✔
319
        }
320

321
        return stripeRef;
1✔
322
    }
323

324
    /**
325
     * Get the stripe whilst holding the write lock.
326
     *
327
     * @param key the stripe key
328
     * @param written (OUT) will be set to true if {@link #stripes} was updated
329
     *
330
     * @return the stripe
331
     */
332
    private WeakValueReference<K, S> getExclusive(final K key, final Holder<Boolean> written) {
333
        WeakValueReference<K, S> stripeRef;
334
        final long writeStamp = stripesLock.writeLock();
×
335
        try {
336
            stripeRef = stripes.get(key);
×
337
            if (stripeRef == null || stripeRef.get() == null) {
×
338
                final boolean wasGCd = stripeRef != null && stripeRef.get() == null;
×
339

340
                stripeRef = new WeakValueReference<>(key, creator.apply(key), referenceQueue);
×
341
                stripes.put(key, stripeRef);
×
342

343
                written.value = true;
×
344

345
                if (wasGCd && !amortizeCleanup) {
×
346
                    expiredReferenceReadCount.incrementAndGet();
×
347
                }
348

349
                return stripeRef;
×
350
            }
351
        } finally {
352
            stripesLock.unlockWrite(writeStamp);
×
353
        }
354

355
        // else (we don't need the write lock on this path)
356
        if (amortizeCleanup) {
×
357
            readCount.incrementAndGet();
×
358
        }
359
        return stripeRef;
×
360
    }
361

362
    /**
363
     * Removes cleared WeakReferences
364
     * from the stripes map.
365
     *
366
     * If {@link #amortizeCleanup} is false, then
367
     * all cleared WeakReferences will be removed,
368
     * otherwise up to {@link #DRAIN_MAX} are removed.
369
     */
370
    private void drainClearedReferences() {
371
        if (draining.compareAndSet(false, true)) {  // critical section
1✔
372
            Reference<? extends S> ref;
373
            int i = 0;
1✔
374
            while ((ref = referenceQueue.poll()) != null) {
1✔
375
                @SuppressWarnings("unchecked") final WeakValueReference<K, S> stripeRef = (WeakValueReference<K, S>) ref;
1✔
376

377
                final long writeStamp = stripesLock.writeLock();
1✔
378
                try {
379

380
                    // TODO(AR) it may be more performant to call #drainClearedReferences() at the beginning of #get(K) as oposed to the end, then we could avoid the extra check here which calls stripes#get(K)
381

382
                    /*
383
                        NOTE: we have to check that we have not added a new reference to replace an
384
                        expired reference in #get(K) before calling #drainClearedReferences()
385
                     */
386
                    final WeakValueReference<K, S> check = stripes.get(stripeRef.key);
1✔
387
                    if (check != null && check.get() == null) {
1!
388

389
                        stripes.remove(stripeRef.key);
1✔
390

391
                    }
392
                } finally {
1✔
393
                    stripesLock.unlockWrite(writeStamp);
1✔
394
                }
395

396
                if (amortizeCleanup && ++i == DRAIN_MAX) {
1!
397
                    break;
1✔
398
                }
399
            }
400
            if (amortizeCleanup) {
1!
401
                readCount.set(0);
1✔
402
            }
403
            draining.set(false);
1✔
404
        }
405
    }
1✔
406

407
    /**
408
     * Extends a WeakReference with a strong reference to a key.
409
     *
410
     * Used for cleaning up the {@link #stripes} from the {@link #referenceQueue}.
411
     */
412
    private static class WeakValueReference<K, V> extends WeakReference<V> {
413
        final K key;
414
        public WeakValueReference(final K key, final V referent, final ReferenceQueue<? super V> q) {
415
            super(referent, q);
1✔
416
            this.key = key;
1✔
417
        }
1✔
418
    }
419
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc