• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

bernardladenthin / BitcoinAddressFinder / #348

17 Jul 2025 03:25PM UTC coverage: 69.604% (-0.05%) from 69.653%
#348

push

bernardladenthin
Improve README.md

1406 of 2020 relevant lines covered (69.6%)

0.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.75
/src/main/java/net/ladenthin/bitcoinaddressfinder/ConsumerJava.java
1
// @formatter:off
2
/**
3
 * Copyright 2020 Bernard Ladenthin bernard.ladenthin@gmail.com
4
 *
5
 * Licensed under the Apache License, Version 2.0 (the "License");
6
 * you may not use this file except in compliance with the License.
7
 * You may obtain a copy of the License at
8
 *
9
 *    http://www.apache.org/licenses/LICENSE-2.0
10
 *
11
 * Unless required by applicable law or agreed to in writing, software
12
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 * See the License for the specific language governing permissions and
15
 * limitations under the License.
16
 *
17
 */
18
// @formatter:on
19
package net.ladenthin.bitcoinaddressfinder;
20

21
import com.google.common.annotations.VisibleForTesting;
22
import java.nio.ByteBuffer;
23
import java.time.Duration;
24
import java.util.ArrayList;
25
import java.util.List;
26
import java.util.concurrent.ExecutorService;
27
import java.util.concurrent.Executors;
28
import java.util.concurrent.Future;
29
import java.util.concurrent.LinkedBlockingQueue;
30
import java.util.concurrent.ScheduledExecutorService;
31
import java.util.concurrent.TimeUnit;
32
import java.util.concurrent.atomic.AtomicBoolean;
33
import java.util.concurrent.atomic.AtomicLong;
34
import java.util.regex.Matcher;
35
import java.util.regex.Pattern;
36
import net.ladenthin.bitcoinaddressfinder.configuration.CConsumerJava;
37
import net.ladenthin.bitcoinaddressfinder.persistence.Persistence;
38
import net.ladenthin.bitcoinaddressfinder.persistence.PersistenceUtils;
39
import net.ladenthin.bitcoinaddressfinder.persistence.lmdb.LMDBPersistence;
40
import org.apache.commons.codec.binary.Hex;
41
import org.bitcoinj.crypto.ECKey;
42
import org.bitcoinj.crypto.MnemonicException;
43
import org.slf4j.Logger;
44
import org.slf4j.LoggerFactory;
45

46
public class ConsumerJava implements Consumer {
47

48
    /**
49
     * We assume a queue might be empty after this amount of time.
50
     * If not, some keys in the queue are not checked before shutdow.
51
     */
52
    @VisibleForTesting
53
    static Duration AWAIT_DURATION_QUEUE_EMPTY = Duration.ofMinutes(1);
1✔
54
    
55
    public static final String MISS_PREFIX = "miss: Could not find the address: ";
56
    public static final String HIT_PREFIX = "hit: Found the address: ";
57
    public static final String VANITY_HIT_PREFIX = "vanity pattern match: ";
58
    public static final String HIT_SAFE_PREFIX = "hit: safe log: ";
59

60
    private Logger logger = LoggerFactory.getLogger(this.getClass());
1✔
61

62
    private final KeyUtility keyUtility;
63
    protected final AtomicLong checkedKeys = new AtomicLong();
1✔
64
    protected final AtomicLong checkedKeysSumOfTimeToCheckContains = new AtomicLong();
1✔
65
    protected final AtomicLong emptyConsumer = new AtomicLong();
1✔
66
    protected final AtomicLong hits = new AtomicLong();
1✔
67
    protected long startTime = 0;
1✔
68

69
    protected final CConsumerJava consumerJava;
70
    @VisibleForTesting
1✔
71
    ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
1✔
72

73
    protected Persistence persistence;
74
    private final PersistenceUtils persistenceUtils;
75
    
76
    private final List<Future<Void>> consumers = new ArrayList<>();
1✔
77
    protected final LinkedBlockingQueue<PublicKeyBytes[]> keysQueue;
78
    private final ByteBufferUtility byteBufferUtility = new ByteBufferUtility(true);
1✔
79
    
80
    protected final AtomicLong vanityHits = new AtomicLong();
1✔
81
    private final Pattern vanityPattern;
82
    
83
    @VisibleForTesting
1✔
84
    final AtomicBoolean shouldRun = new AtomicBoolean(true);
85
    
86
    @VisibleForTesting
87
    final ExecutorService consumeKeysExecutorService;
88

89
    protected ConsumerJava(CConsumerJava consumerJava, KeyUtility keyUtility, PersistenceUtils persistenceUtils) {
1✔
90
        this.consumerJava = consumerJava;
1✔
91
        this.keysQueue = new LinkedBlockingQueue<>(consumerJava.queueSize);
1✔
92
        this.keyUtility = keyUtility;
1✔
93
        this.persistenceUtils = persistenceUtils;
1✔
94
        if (consumerJava.enableVanity) {
1✔
95
            this.vanityPattern = Pattern.compile(consumerJava.vanityPattern);
1✔
96
        } else {
97
            vanityPattern = null;
1✔
98
        }
99
        consumeKeysExecutorService = Executors.newFixedThreadPool(consumerJava.threads);
1✔
100
    }
1✔
101

102
    Logger getLogger() {
103
        return logger;
×
104
    }
105
    
106
    void setLogger(Logger logger) {
107
        this.logger = logger;
1✔
108
    }
1✔
109

110
    protected void initLMDB() {
111
        persistence = new LMDBPersistence(consumerJava.lmdbConfigurationReadOnly, persistenceUtils);
1✔
112
        persistence.init();
1✔
113
    }
1✔
114

115
    protected void startStatisticsTimer() {
116
        long period = consumerJava.printStatisticsEveryNSeconds;
1✔
117
        if (period <= 0) {
1✔
118
            throw new IllegalArgumentException("period must be greater than 0.");
1✔
119
        }
120

121
        startTime = System.currentTimeMillis();
1✔
122

123
        scheduledExecutorService.scheduleAtFixedRate(() -> {
1✔
124
            // get transient information
125
            long uptime = Math.max(System.currentTimeMillis() - startTime, 1);
1✔
126

127
            String message = new Statistics().createStatisticsMessage(uptime, checkedKeys.get(), checkedKeysSumOfTimeToCheckContains.get(), emptyConsumer.get(), keysQueue.size(), hits.get());
1✔
128

129
            // log the information
130
            logger.info(message);
1✔
131
        }, period, period, TimeUnit.SECONDS);
1✔
132
    }
1✔
133

134
    @Override
135
    public void startConsumer() {
136
        logger.debug("Starting {} consumer threads...", consumerJava.threads);
1✔
137
        for (int i = 0; i < consumerJava.threads; i++) {
1✔
138
            consumers.add(consumeKeysExecutorService.submit(
1✔
139
                    () -> {
140
                        consumeKeysRunner();
1✔
141
                        return null;
1✔
142
                    }));
143
        }
144
        logger.debug("Successfully started {} consumer threads.", consumers.size());
1✔
145
    }
1✔
146
    
147
    /**
148
     * This method runs in multiple threads.
149
     */
150
    private void consumeKeysRunner() {
151
        logger.info("start consumeKeysRunner");
1✔
152
        ByteBuffer threadLocalReuseableByteBuffer = ByteBuffer.allocateDirect(PublicKeyBytes.RIPEMD160_HASH_NUM_BYTES);
1✔
153
        
154
        while (shouldRun.get()) {
1✔
155
            if (keysQueue.size() >= consumerJava.queueSize) {
1✔
156
                logger.warn("Attention, queue is full. Please increase queue size.");
1✔
157
            }
158
            try {
159
                consumeKeys(threadLocalReuseableByteBuffer);
1✔
160
                // the consumeKeys method is looped inside, if the method returns it means the queue is empty
161
                emptyConsumer.incrementAndGet();
1✔
162
                Thread.sleep(consumerJava.delayEmptyConsumer);
1✔
163
            } catch (InterruptedException e) {
×
164
                // we need to catch the exception to not break the thread
165
                logger.error("Ignore InterruptedException during Thread.sleep.", e);
×
166
            } catch (Exception e) {
×
167
                // log every Exception because it's hard to debug and we do not break down the thread loop
168
                logger.error("Error in consumeKeysRunner()." , e);
×
169
                e.printStackTrace();
×
170
            }
1✔
171
        }
172
        
173
        if (threadLocalReuseableByteBuffer != null) {
1✔
174
            byteBufferUtility.freeByteBuffer(threadLocalReuseableByteBuffer);
1✔
175
        }
176
        logger.info("end consumeKeysRunner");
1✔
177
    }
1✔
178
    
179
    void consumeKeys(ByteBuffer threadLocalReuseableByteBuffer) throws MnemonicException.MnemonicLengthException {
180
        logger.trace("consumeKeys");
1✔
181
        PublicKeyBytes[] publicKeyBytesArray = keysQueue.poll();
1✔
182
        while (publicKeyBytesArray != null) {
1✔
183
            for (PublicKeyBytes publicKeyBytes : publicKeyBytesArray) {
1✔
184
                if (publicKeyBytes.isOutsidePrivateKeyRange()) {
1✔
185
                    continue;
1✔
186
                }
187
                
188
                byte[] hash160Uncompressed = publicKeyBytes.getUncompressedKeyHash();
1✔
189
                boolean containsAddressUncompressed = containsAddress(threadLocalReuseableByteBuffer, hash160Uncompressed);
1✔
190
                
191
                byte[] hash160Compressed = publicKeyBytes.getCompressedKeyHash();
1✔
192
                boolean containsAddressCompressed = containsAddress(threadLocalReuseableByteBuffer, hash160Compressed);
1✔
193
                
194
                if (consumerJava.runtimePublicKeyCalculationCheck) {
1✔
195
                    publicKeyBytes.runtimePublicKeyCalculationCheck(logger);
1✔
196
                }
197
                
198
                if (containsAddressUncompressed) {
1✔
199
                    // immediately log the secret
200
                    safeLog(publicKeyBytes, hash160Uncompressed, hash160Compressed);
1✔
201
                    hits.incrementAndGet();
1✔
202
                    ECKey ecKeyUncompressed = ECKey.fromPrivateAndPrecalculatedPublic(publicKeyBytes.getSecretKey().toByteArray(), publicKeyBytes.getUncompressed());
1✔
203
                    String hitMessageUncompressed = HIT_PREFIX + keyUtility.createKeyDetails(ecKeyUncompressed);
1✔
204
                    logger.info(hitMessageUncompressed);
1✔
205
                }
206

207
                if (containsAddressCompressed) {
1✔
208
                    // immediately log the secret
209
                    safeLog(publicKeyBytes, hash160Uncompressed, hash160Compressed);
1✔
210
                    hits.incrementAndGet();
1✔
211
                    ECKey ecKeyCompressed = ECKey.fromPrivateAndPrecalculatedPublic(publicKeyBytes.getSecretKey().toByteArray(), publicKeyBytes.getCompressed());
1✔
212
                    String hitMessageCompressed = HIT_PREFIX + keyUtility.createKeyDetails(ecKeyCompressed);
1✔
213
                    logger.info(hitMessageCompressed);
1✔
214
                }
215

216
                if (consumerJava.enableVanity) {
1✔
217
                    String uncompressedKeyHashAsBase58 = publicKeyBytes.getUncompressedKeyHashAsBase58(keyUtility);
1✔
218
                    Matcher uncompressedKeyHashAsBase58Matcher = vanityPattern.matcher(uncompressedKeyHashAsBase58);
1✔
219
                    if (uncompressedKeyHashAsBase58Matcher.matches()) {
1✔
220
                        // immediately log the secret
221
                        safeLog(publicKeyBytes, hash160Uncompressed, hash160Compressed);
1✔
222
                        vanityHits.incrementAndGet();
1✔
223
                        ECKey ecKeyUncompressed = ECKey.fromPrivateAndPrecalculatedPublic(publicKeyBytes.getSecretKey().toByteArray(), publicKeyBytes.getUncompressed());
1✔
224
                        String vanityHitMessageUncompressed = VANITY_HIT_PREFIX + keyUtility.createKeyDetails(ecKeyUncompressed);
1✔
225
                        logger.info(vanityHitMessageUncompressed);
1✔
226
                    }
227

228
                    String compressedKeyHashAsBase58 = publicKeyBytes.getCompressedKeyHashAsBase58(keyUtility);
1✔
229
                    Matcher compressedKeyHashAsBase58Matcher = vanityPattern.matcher(compressedKeyHashAsBase58);
1✔
230
                    if (compressedKeyHashAsBase58Matcher.matches()) {
1✔
231
                        // immediately log the secret
232
                        safeLog(publicKeyBytes, hash160Uncompressed, hash160Compressed);
1✔
233
                        vanityHits.incrementAndGet();
1✔
234
                        ECKey ecKeyCompressed = ECKey.fromPrivateAndPrecalculatedPublic(publicKeyBytes.getSecretKey().toByteArray(), publicKeyBytes.getCompressed());
1✔
235
                        String vanityHitMessageCompressed = VANITY_HIT_PREFIX + keyUtility.createKeyDetails(ecKeyCompressed);
1✔
236
                        logger.info(vanityHitMessageCompressed);
1✔
237
                    }
238
                }
239

240
                if (!containsAddressUncompressed && !containsAddressCompressed) {
1✔
241
                    if (logger.isTraceEnabled()) {
1✔
242
                        ECKey ecKeyUncompressed = ECKey.fromPrivateAndPrecalculatedPublic(publicKeyBytes.getSecretKey().toByteArray(), publicKeyBytes.getUncompressed());
1✔
243
                        String missMessageUncompressed = MISS_PREFIX + keyUtility.createKeyDetails(ecKeyUncompressed);
1✔
244
                        logger.trace(missMessageUncompressed);
1✔
245

246
                        ECKey ecKeyCompressed = ECKey.fromPrivateAndPrecalculatedPublic(publicKeyBytes.getSecretKey().toByteArray(), publicKeyBytes.getCompressed());
1✔
247
                        String missMessageCompressed = MISS_PREFIX + keyUtility.createKeyDetails(ecKeyCompressed);
1✔
248
                        logger.trace(missMessageCompressed);
1✔
249
                    }
250
                }
251
            }
252
            publicKeyBytesArray = keysQueue.poll();
1✔
253
        }
254
    }
1✔
255

256
    public boolean containsAddress(ByteBuffer threadLocalReuseableByteBuffer, byte[] hash160) {
257
        threadLocalReuseableByteBuffer.rewind();
1✔
258
        threadLocalReuseableByteBuffer.put(hash160);
1✔
259
        threadLocalReuseableByteBuffer.flip();
1✔
260
        return containsAddress(threadLocalReuseableByteBuffer);
1✔
261
    }
262
    
263
    /**
264
     * Logs key information in a safe and robust way to avoid losing critical data
265
     * in case of a runtime exception.
266
     * <p>
267
     * The primary goal of this method is to ensure that if a valid secret key (i.e., a hit)
268
     * is found, its corresponding BigInteger value is immediately logged. Since logging a
269
     * BigInteger is unlikely to fail, this is the first and most essential piece of information.
270
     * <p>
271
     * Logging additional details such as the uncompressed/compressed public keys and their
272
     * hash160 values may theoretically trigger runtime exceptions (e.g., due to malformed data
273
     * or encoding issues). To mitigate the risk of losing the crucial secret value in such rare
274
     * cases, it is logged first.
275
     * <p>
276
     * All logs are prefixed consistently with {@code HIT_SAFE_PREFIX} to make hits easily searchable.
277
     *
278
     * @param publicKeyBytes         the public key bytes wrapper
279
     * @param hash160Uncompressed    the hash160 of the uncompressed public key
280
     * @param hash160Compressed      the hash160 of the compressed public key
281
     */
282
    private void safeLog(PublicKeyBytes publicKeyBytes, byte[] hash160Uncompressed, byte[] hash160Compressed) {
283
        logger.info(HIT_SAFE_PREFIX +"publicKeyBytes.getSecretKey(): " + publicKeyBytes.getSecretKey());
1✔
284
        logger.info(HIT_SAFE_PREFIX +"publicKeyBytes.getUncompressed(): " + Hex.encodeHexString(publicKeyBytes.getUncompressed()));
1✔
285
        logger.info(HIT_SAFE_PREFIX +"publicKeyBytes.getCompressed(): " + Hex.encodeHexString(publicKeyBytes.getCompressed()));
1✔
286
        logger.info(HIT_SAFE_PREFIX +"hash160Uncompressed: " + Hex.encodeHexString(hash160Uncompressed));
1✔
287
        logger.info(HIT_SAFE_PREFIX +"hash160Compressed: " + Hex.encodeHexString(hash160Compressed));
1✔
288
    }
1✔
289

290
    private boolean containsAddress(ByteBuffer hash160AsByteBuffer) {
291
        long timeBefore = System.currentTimeMillis();
1✔
292
        if (logger.isTraceEnabled()) {
1✔
293
            logger.trace("Time before persistence.containsAddress: " + timeBefore);
1✔
294
        }
295
        boolean containsAddress = persistence.containsAddress(hash160AsByteBuffer);
1✔
296
        long timeAfter = System.currentTimeMillis();
1✔
297
        long timeDelta = timeAfter - timeBefore;
1✔
298
        checkedKeys.incrementAndGet();
1✔
299
        checkedKeysSumOfTimeToCheckContains.addAndGet(timeDelta);
1✔
300
        if (logger.isTraceEnabled()) {
1✔
301
            logger.trace("Time after persistence.containsAddress: " + timeAfter);
1✔
302
            logger.trace("Time delta: " + timeDelta);
1✔
303
        }
304
        return containsAddress;
1✔
305
    }
306

307
    @Override
308
    public void consumeKeys(PublicKeyBytes[] publicKeyBytes) throws InterruptedException {
309
        if(logger.isDebugEnabled()){
1✔
310
            logger.debug("keysQueue.put(publicKeyBytes) with length: " + publicKeyBytes.length);
1✔
311
        }
312
        
313
        keysQueue.put(publicKeyBytes);
1✔
314
        
315
        if(logger.isDebugEnabled()){
1✔
316
            logger.debug("keysQueue.size(): " + keysQueue.size());
1✔
317
        }
318
    }
1✔
319
    
320
    /**
321
    * Initiates a graceful shutdown of the consumer:
322
    * <ul>
323
    *   <li>Stops internal execution by setting the control flag</li>
324
    *   <li>Shuts down scheduled tasks and consumer thread pool</li>
325
    *   <li>Waits for all consumer threads to finish within a defined timeout</li>
326
    *   <li>Logs any unclean terminations</li>
327
    *   <li>Closes LMDB persistence and releases resources</li>
328
    * </ul>
329
    *
330
    * This method ensures a clean and deterministic shutdown without relying on thread interruption signals.
331
    */
332
    @Override
333
    public void interrupt() {
334
        logger.info("Interrupt initiated: stopping consumer execution...");
1✔
335
        shouldRun.set(false);
1✔
336
        scheduledExecutorService.shutdown();
1✔
337
        consumeKeysExecutorService.shutdown();
1✔
338
        logger.info("Waiting for termination of {} consumer threads (timeout: {} seconds)...", consumers.size(), AWAIT_DURATION_QUEUE_EMPTY.getSeconds());
1✔
339
        try {
340
            boolean terminated = consumeKeysExecutorService.awaitTermination(AWAIT_DURATION_QUEUE_EMPTY.getSeconds(), TimeUnit.SECONDS);
1✔
341
            if (!terminated) {
1✔
342
                logger.warn("Timeout reached. Some consumer threads may not have terminated cleanly.");
×
343
            }
344
        } catch (InterruptedException ex) {
×
345
            logger.error("Interrupted while awaiting consumer termination.", ex);
×
346
            throw new RuntimeException(ex);
×
347
        }
1✔
348
        persistence.close();
1✔
349
        logger.debug("Interrupt complete: resources released and persistence closed.");
1✔
350
    }
1✔
351
    
352
    @VisibleForTesting
353
    int keysQueueSize() {
354
        return keysQueue.size();
1✔
355
    }
356
    
357
    @Override
358
    public String toString() {
359
        return "ConsumerJava@" + Integer.toHexString(System.identityHashCode(this));
1✔
360
    }
361
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc