• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

evolvedbinary / elemental / 932

28 Apr 2025 01:10AM UTC coverage: 56.402% (-0.01%) from 56.413%
932

push

circleci

adamretter
[bugfix] Correct release process instructions

28446 of 55846 branches covered (50.94%)

Branch coverage included in aggregate %.

77456 of 131918 relevant lines covered (58.72%)

0.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

58.82
/exist-core/src/main/java/org/exist/util/WeakLazyStripes.java
1
/*
2
 * eXist-db Open Source Native XML Database
3
 * Copyright (C) 2001 The eXist-db Authors
4
 *
5
 * info@exist-db.org
6
 * http://www.exist-db.org
7
 *
8
 * This library is free software; you can redistribute it and/or
9
 * modify it under the terms of the GNU Lesser General Public
10
 * License as published by the Free Software Foundation; either
11
 * version 2.1 of the License, or (at your option) any later version.
12
 *
13
 * This library is distributed in the hope that it will be useful,
14
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16
 * Lesser General Public License for more details.
17
 *
18
 * You should have received a copy of the GNU Lesser General Public
19
 * License along with this library; if not, write to the Free Software
20
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
21
 */
22

23
package org.exist.util;
24

25
import it.unimi.dsi.fastutil.objects.Object2ObjectOpenHashMap;
26
import net.jcip.annotations.GuardedBy;
27
import net.jcip.annotations.ThreadSafe;
28

29
import javax.annotation.Nullable;
30
import java.lang.ref.Reference;
31
import java.lang.ref.ReferenceQueue;
32
import java.lang.ref.WeakReference;
33
import java.util.concurrent.atomic.AtomicBoolean;
34
import java.util.concurrent.atomic.AtomicInteger;
35
import java.util.concurrent.locks.StampedLock;
36
import java.util.function.Function;
37

38
/**
39
 * Inspired by Guava's com.google.common.util.concurrent.Striped#lazyWeakReadWriteLock(int)
40
 * implementation.
41
 * See <a href="https://google.github.io/guava/releases/21.0/api/docs/com/google/common/util/concurrent/Striped.html#lazyWeakReadWriteLock-int-">https://google.github.io/guava/releases/21.0/api/docs/com/google/common/util/concurrent/Striped.html#lazyWeakReadWriteLock-int-</a>.
42
 *
43
 * However this is much simpler, and there is no hashing; we
44
 * will always return the same object (stripe) for the same key.
45
 *
46
 * This class basically couples Weak References with a
47
 * thread safe HashMap and manages draining expired Weak
48
 * References from the HashMap.
49
 *
50
 * Weak References will be cleaned up from the internal map
51
 * after they have been cleared by the GC. Two cleanup policies
52
 * are provided: "Batch" and "Amortize". The policy is chosen
53
 * by the constructor parameter {@code amortizeCleanup}.
54
 *
55
 * Batch Cleanup
56
 *     With Batch Cleanup, expired Weak References will
57
 *     be collected up to the {@link #MAX_EXPIRED_REFERENCE_READ_COUNT}
58
 *     limit, at which point the calling thread which causes
59
 *     that ceiling to be detected will cleanup all expired references.
60
 *
61
 * Amortize Cleanup
62
 *     With Amortize Cleanup, each calling thread will attempt
63
 *     to cleanup up to {@link #DRAIN_MAX} expired weak
64
 *     references on each write operation, or after
65
 *     {@link #READ_DRAIN_THRESHOLD} since the last cleanup.
66
 *
67
 * With either cleanup policy, only a single calling thread
68
 * performs the cleanup at any time.
69
 *
70
 * @param <K> The type of the key for the stripe.
71
 * @param <S> The type of the stripe.
72
 *
73
 * @author <a href="mailto:adam@evolvedbinary.com">Adam Retter</a>
74
 */
75
@ThreadSafe
76
public class WeakLazyStripes<K, S> {
77
    private static final int INITIAL_CAPACITY = 1000;
78
    private static final float LOAD_FACTOR = 0.75f;
79

80
    /**
81
     * When {@link #amortizeCleanup} is false, this is the
82
     * number of reads allowed which return expired references
83
     * before calling {@link #drainClearedReferences()}.
84
     */
85
    private static final int MAX_EXPIRED_REFERENCE_READ_COUNT = 1000;
86

87
    /**
88
     * When {@link #amortizeCleanup} is true, this is the
89
     * number of reads which are performed between calls
90
     * to {@link #drainClearedReferences()}.
91
     */
92
    private static final int READ_DRAIN_THRESHOLD = 64;
93

94
    /**
95
     * When {@link #amortizeCleanup} is true, this is the
96
     * maximum number of entries to be drained
97
     * by {@link #drainClearedReferences()}.
98
     */
99
    private static final int DRAIN_MAX = 16;
100

101
    private final ReferenceQueue<S> referenceQueue;
102

103
    private final StampedLock stripesLock = new StampedLock();
1✔
104
    @GuardedBy("stripesLock") private final Object2ObjectOpenHashMap<K, WeakValueReference<K, S>> stripes;
105

106
    /**
107
     * The number of reads on {@link #stripes} which have returned
108
     * expired weak references.
109
     */
110
    private final AtomicInteger expiredReferenceReadCount = new AtomicInteger();
1✔
111

112
    /**
113
     * The number of reads on {@link #stripes} since
114
     * {@link #drainClearedReferences()} was last
115
     * completed.
116
     */
117
    private final AtomicInteger readCount = new AtomicInteger();
1✔
118

119
    private final Function<K, S> creator;
120
    private final boolean amortizeCleanup;
121

122
    /**
123
     * Guard so that only a single thread drains
124
     * references at once.
125
     */
126
    private final AtomicBoolean draining = new AtomicBoolean();
1✔
127

128
    /**
129
     * Constructs a WeakLazyStripes where the concurrencyLevel
130
     * is the lower of either ConcurrentHashMap#DEFAULT_CONCURRENCY_LEVEL
131
     * or {@code Runtime.getRuntime().availableProcessors() * 2}.
132
     *
133
     * @param creator A factory for creating new Stripes when needed
134
     */
135
    public WeakLazyStripes(final Function<K, S> creator) {
136
       this(Math.min(16, Runtime.getRuntime().availableProcessors() * 2), creator);  // 16 == ConcurrentHashMap#DEFAULT_CONCURRENCY_LEVEL
1✔
137
    }
1✔
138

139
    /**
140
     * Constructs a WeakLazyStripes.
141
     *
142
     * @param concurrencyLevel The concurrency level for the underlying stripes map
143
     * @param creator A factory for creating new Stripes when needed
144
     */
145
    public WeakLazyStripes(final int concurrencyLevel, final Function<K, S> creator) {
146
        this(concurrencyLevel, creator, true);
1✔
147
    }
1✔
148

149
    /**
150
     * Constructs a WeakLazyStripes.
151
     *
152
     * @param concurrencyLevel The concurrency level for the underlying stripes map
153
     * @param creator A factory for creating new Stripes when needed
154
     * @param amortizeCleanup true if the cleanup of weak references should be
155
     *     amortized across many calls (default), false if the cleanup should be batched up
156
     *     and apportioned to a particular caller at a threshold
157
     */
158
    public WeakLazyStripes(final int concurrencyLevel, final Function<K, S> creator, final boolean amortizeCleanup) {
1✔
159
        this.stripes = new Object2ObjectOpenHashMap<>(INITIAL_CAPACITY, LOAD_FACTOR);
1✔
160
        this.referenceQueue = new ReferenceQueue<>();
1✔
161
        this.creator = creator;
1✔
162
        this.amortizeCleanup = amortizeCleanup;
1✔
163
    }
1✔
164

165
    /**
166
     * Get the stripe for the given key
167
     *
168
     * If the stripe does not exist, it will be created by
169
     * calling {@link Function#apply(Object)} on {@link #creator}
170
     *
171
     * @param key the key for the stripe
172
     * @return the stripe
173
     */
174
    public S get(final K key) {
175

176
        while (true) {
177

178
            final Holder<Boolean> written = new Holder<>(false);
1✔
179

180
            // 1) attempt lookup via optimistic read and immediate conversion to write lock
181
            WeakValueReference<K, S> stripeRef = getOptimistic(key, written);
1✔
182
            if (stripeRef == null) {
1✔
183

184
                // 2) attempt lookup via pessimistic read and immediate conversion to write lock
185
                stripeRef = getPessimistic(key, written);
1✔
186
                if (stripeRef == null) {
1!
187

188
                    // 3) attempt lookup via exclusive write lock
189
                    stripeRef = getExclusive(key, written);
×
190
                }
191
            }
192

193
            if (amortizeCleanup) {
1!
194
                if (written.value) {
1✔
195
                    // TODO (AR) if we find that we are too frequently draining and it is expensive
196
                    // then we could make the read and write drain paths both use the DRAIN_THRESHOLD
197
                    drainClearedReferences();
1✔
198
                } else if (readCount.get() >= READ_DRAIN_THRESHOLD) {
1✔
199
                    drainClearedReferences();
1✔
200
                }
201
            } else {
1✔
202
                // have we reached the threshold where we should clear
203
                // out any cleared WeakReferences from the stripes map
204
                final int count = expiredReferenceReadCount.get();
×
205
                if (count > MAX_EXPIRED_REFERENCE_READ_COUNT
×
206
                        && expiredReferenceReadCount.compareAndSet(count, 0)) {
×
207
                    drainClearedReferences();
×
208
                }
209
            }
210

211
            // check the weak reference before returning!
212
            final S stripe = stripeRef.get();
1✔
213
            if (stripe != null) {
1!
214
                return stripe;
1✔
215
            }
216

217
            // weak reference has expired in the mean time, so loop...
218
        }
219
    }
220

221
    /**
222
     * Get the stripe via immediate conversion of an optimistic read lock to a write lock.
223
     *
224
     * @param key the stripe key
225
     * @param written (OUT) will be set to true if {@link #stripes} was updated
226
     *
227
     * @return null if we could not perform an optimistic read, or a new object needed to be
228
     *     created and we could not take the {@link #stripesLock} write lock immediately,
229
     *     otherwise the stripe.
230
     */
231
    private @Nullable WeakValueReference<K, S> getOptimistic(final K key, final Holder<Boolean> written) {
232
        // optimistic read
233
        final long stamp = stripesLock.tryOptimisticRead();
1✔
234
        WeakValueReference<K, S> stripeRef;
235
        try {
236
            stripeRef = stripes.get(key);
1✔
237
        } catch (final ArrayIndexOutOfBoundsException e) {
1✔
238
            // this can occur as we don't hold a lock, we just have a stamp for an optimistic read,
239
            // so `stripes` might be concurrently modified
240
            return null;
×
241
        }
242
        if (stripeRef == null || stripeRef.get() == null) {
1✔
243
            final long writeStamp = stripesLock.tryConvertToWriteLock(stamp);
1✔
244
            if (writeStamp != 0L) {
1!
245
                final boolean wasGCd = stripeRef != null && stripeRef.get() == null;
1!
246
                try {
247
                    stripeRef = new WeakValueReference<>(key, creator.apply(key), referenceQueue);
1✔
248
                    stripes.put(key, stripeRef);
1✔
249
                } finally {
1✔
250
                    stripesLock.unlockWrite(writeStamp);
1✔
251
                }
252

253
                written.value = true;
1✔
254

255
                if (wasGCd && !amortizeCleanup) {
1!
256
                    expiredReferenceReadCount.incrementAndGet();
×
257
                }
258
            } else {
×
259
                // invalid conversion to write lock... small optimisation for the fall-through to #getPessimistic(K, Holder) in #get(K)
260
                stripeRef = null;
×
261
            }
262
        } else {
×
263
            if (stripesLock.validate(stamp)) {
1✔
264
                if (amortizeCleanup) {
1!
265
                    readCount.incrementAndGet();
1✔
266
                }
267
            } else {
1✔
268
                // invalid optimistic read
269
                stripeRef = null;
1✔
270
            }
271
        }
272

273
        return stripeRef;
1✔
274
    }
275

276
    /**
277
     * Get the stripe via immediate conversion of a read lock to a write lock.
278
     *
279
     * @param key the stripe key
280
     * @param written (OUT) will be set to true if {@link #stripes} was updated
281
     *
282
     * @return null if a new object needed to be created and we could not take the {@link #stripesLock}
283
     *     write lock immediately, otherwise the stripe.
284
     */
285
    private @Nullable WeakValueReference<K, S> getPessimistic(final K key, final Holder<Boolean> written) {
286
        WeakValueReference<K, S> stripeRef;
287
        long stamp = stripesLock.readLock();
1✔
288
        try {
289
            stripeRef = stripes.get(key);
1✔
290
            if (stripeRef == null || stripeRef.get() == null) {
1!
291
                final long writeStamp = stripesLock.tryConvertToWriteLock(stamp);
×
292
                if (writeStamp != 0L) {
×
293
                    final boolean wasGCd = stripeRef != null && stripeRef.get() == null;
×
294

295
                    stamp = writeStamp;  // NOTE: this causes the write lock to be released in the finally further down
×
296
                    stripeRef = new WeakValueReference<>(key, creator.apply(key), referenceQueue);
×
297
                    stripes.put(key, stripeRef);
×
298

299
                    written.value = true;
×
300

301
                    if (wasGCd && !amortizeCleanup) {
×
302
                        expiredReferenceReadCount.incrementAndGet();
×
303
                    }
304
                } else {
×
305
                    // invalid conversion to write lock... small optimisation for the fall-through to #getExclusive(K, Holder) in #get(K)
306
                    stripeRef = null;
×
307
                }
308

309
                return stripeRef;
×
310
            }
311
        } finally {
312
            stripesLock.unlock(stamp);
1✔
313
        }
314

315
        // else (we don't need the lock on this path)
316
        if (amortizeCleanup) {
1!
317
            readCount.incrementAndGet();
1✔
318
        }
319

320
        return stripeRef;
1✔
321
    }
322

323
    /**
324
     * Get the stripe whilst holding the write lock.
325
     *
326
     * @param key the stripe key
327
     * @param written (OUT) will be set to true if {@link #stripes} was updated
328
     *
329
     * @return the stripe
330
     */
331
    private WeakValueReference<K, S> getExclusive(final K key, final Holder<Boolean> written) {
332
        WeakValueReference<K, S> stripeRef;
333
        final long writeStamp = stripesLock.writeLock();
×
334
        try {
335
            stripeRef = stripes.get(key);
×
336
            if (stripeRef == null || stripeRef.get() == null) {
×
337
                final boolean wasGCd = stripeRef != null && stripeRef.get() == null;
×
338

339
                stripeRef = new WeakValueReference<>(key, creator.apply(key), referenceQueue);
×
340
                stripes.put(key, stripeRef);
×
341

342
                written.value = true;
×
343

344
                if (wasGCd && !amortizeCleanup) {
×
345
                    expiredReferenceReadCount.incrementAndGet();
×
346
                }
347

348
                return stripeRef;
×
349
            }
350
        } finally {
351
            stripesLock.unlockWrite(writeStamp);
×
352
        }
353

354
        // else (we don't need the write lock on this path)
355
        if (amortizeCleanup) {
×
356
            readCount.incrementAndGet();
×
357
        }
358
        return stripeRef;
×
359
    }
360

361
    /**
362
     * Removes cleared WeakReferences
363
     * from the stripes map.
364
     *
365
     * If {@link #amortizeCleanup} is false, then
366
     * all cleared WeakReferences will be removed,
367
     * otherwise up to {@link #DRAIN_MAX} are removed.
368
     */
369
    private void drainClearedReferences() {
370
        if (draining.compareAndSet(false, true)) {  // critical section
1✔
371
            Reference<? extends S> ref;
372
            int i = 0;
1✔
373
            while ((ref = referenceQueue.poll()) != null) {
1✔
374
                @SuppressWarnings("unchecked") final WeakValueReference<K, S> stripeRef = (WeakValueReference<K, S>) ref;
1✔
375

376
                final long writeStamp = stripesLock.writeLock();
1✔
377
                try {
378

379
                    // TODO(AR) it may be more performant to call #drainClearedReferences() at the beginning of #get(K) as oposed to the end, then we could avoid the extra check here which calls stripes#get(K)
380

381
                    /*
382
                        NOTE: we have to check that we have not added a new reference to replace an
383
                        expired reference in #get(K) before calling #drainClearedReferences()
384
                     */
385
                    final WeakValueReference<K, S> check = stripes.get(stripeRef.key);
1✔
386
                    if (check != null && check.get() == null) {
1!
387

388
                        stripes.remove(stripeRef.key);
1✔
389

390
                    }
391
                } finally {
1✔
392
                    stripesLock.unlockWrite(writeStamp);
1✔
393
                }
394

395
                if (amortizeCleanup && ++i == DRAIN_MAX) {
1!
396
                    break;
1✔
397
                }
398
            }
399
            if (amortizeCleanup) {
1!
400
                readCount.set(0);
1✔
401
            }
402
            draining.set(false);
1✔
403
        }
404
    }
1✔
405

406
    /**
407
     * Extends a WeakReference with a strong reference to a key.
408
     *
409
     * Used for cleaning up the {@link #stripes} from the {@link #referenceQueue}.
410
     */
411
    private static class WeakValueReference<K, V> extends WeakReference<V> {
412
        final K key;
413
        public WeakValueReference(final K key, final V referent, final ReferenceQueue<? super V> q) {
414
            super(referent, q);
1✔
415
            this.key = key;
1✔
416
        }
1✔
417
    }
418
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc