• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

bernardladenthin / BitcoinAddressFinder / #299

14 Apr 2025 12:55PM UTC coverage: 67.541% (-0.06%) from 67.599%
#299

push

web-flow
Merge pull request #54 from bernardladenthin/dependabot/maven/commons-io-commons-io-2.19.0

Bump commons-io:commons-io from 2.18.0 to 2.19.0

1159 of 1716 relevant lines covered (67.54%)

0.68 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.48
/src/main/java/net/ladenthin/bitcoinaddressfinder/ConsumerJava.java
1
// @formatter:off
2
/**
3
 * Copyright 2020 Bernard Ladenthin bernard.ladenthin@gmail.com
4
 *
5
 * Licensed under the Apache License, Version 2.0 (the "License");
6
 * you may not use this file except in compliance with the License.
7
 * You may obtain a copy of the License at
8
 *
9
 *    http://www.apache.org/licenses/LICENSE-2.0
10
 *
11
 * Unless required by applicable law or agreed to in writing, software
12
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 * See the License for the specific language governing permissions and
15
 * limitations under the License.
16
 *
17
 */
18
// @formatter:on
19
package net.ladenthin.bitcoinaddressfinder;
20

21
import com.google.common.annotations.VisibleForTesting;
22
import java.nio.ByteBuffer;
23
import java.time.Duration;
24
import java.time.temporal.ChronoUnit;
25
import java.util.ArrayList;
26
import java.util.Arrays;
27
import java.util.List;
28
import java.util.concurrent.ExecutorService;
29
import java.util.concurrent.Executors;
30
import java.util.concurrent.Future;
31
import java.util.concurrent.LinkedBlockingQueue;
32
import java.util.concurrent.ScheduledExecutorService;
33
import java.util.concurrent.TimeUnit;
34
import java.util.concurrent.atomic.AtomicBoolean;
35
import java.util.concurrent.atomic.AtomicLong;
36
import java.util.regex.Matcher;
37
import java.util.regex.Pattern;
38
import net.ladenthin.bitcoinaddressfinder.configuration.CConsumerJava;
39
import net.ladenthin.bitcoinaddressfinder.persistence.Persistence;
40
import net.ladenthin.bitcoinaddressfinder.persistence.PersistenceUtils;
41
import net.ladenthin.bitcoinaddressfinder.persistence.lmdb.LMDBPersistence;
42
import org.apache.commons.codec.binary.Hex;
43
import org.bitcoinj.crypto.ECKey;
44
import org.bitcoinj.crypto.MnemonicException;
45
import org.slf4j.Logger;
46
import org.slf4j.LoggerFactory;
47

48
public class ConsumerJava implements Consumer {
49

50
    /**
51
     * We assume a queue might be empty after this amount of time.
52
     * If not, some keys in the queue are not checked before shutdow.
53
     */
54
    @VisibleForTesting
55
    static Duration AWAIT_DURATION_QUEUE_EMPTY = Duration.ofMinutes(1);
1✔
56
    
57
    public static final String MISS_PREFIX = "miss: Could not find the address: ";
58
    public static final String HIT_PREFIX = "hit: Found the address: ";
59
    public static final String VANITY_HIT_PREFIX = "vanity pattern match: ";
60
    public static final String HIT_SAFE_PREFIX = "hit: safe log: ";
61

62
    private Logger logger = LoggerFactory.getLogger(this.getClass());
1✔
63

64
    private final KeyUtility keyUtility;
65
    protected final AtomicLong checkedKeys = new AtomicLong();
1✔
66
    protected final AtomicLong checkedKeysSumOfTimeToCheckContains = new AtomicLong();
1✔
67
    protected final AtomicLong emptyConsumer = new AtomicLong();
1✔
68
    protected final AtomicLong hits = new AtomicLong();
1✔
69
    protected long startTime = 0;
1✔
70

71
    protected final CConsumerJava consumerJava;
72
    @VisibleForTesting
1✔
73
    ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
1✔
74

75
    protected Persistence persistence;
76
    private final PersistenceUtils persistenceUtils;
77
    
78
    private final List<Future<Void>> consumers = new ArrayList<>();
1✔
79
    protected final LinkedBlockingQueue<PublicKeyBytes[]> keysQueue;
80
    private final ByteBufferUtility byteBufferUtility = new ByteBufferUtility(true);
1✔
81
    
82
    protected final AtomicLong vanityHits = new AtomicLong();
1✔
83
    private final Pattern vanityPattern;
84
    
85
    @VisibleForTesting
1✔
86
    final AtomicBoolean shouldRun = new AtomicBoolean(true);
87
    
88
    @VisibleForTesting
89
    final ExecutorService consumeKeysExecutorService;
90

91
    protected ConsumerJava(CConsumerJava consumerJava, KeyUtility keyUtility, PersistenceUtils persistenceUtils) {
1✔
92
        this.consumerJava = consumerJava;
1✔
93
        this.keysQueue = new LinkedBlockingQueue<>(consumerJava.queueSize);
1✔
94
        this.keyUtility = keyUtility;
1✔
95
        this.persistenceUtils = persistenceUtils;
1✔
96
        if (consumerJava.enableVanity) {
1✔
97
            this.vanityPattern = Pattern.compile(consumerJava.vanityPattern);
1✔
98
        } else {
99
            vanityPattern = null;
1✔
100
        }
101
        consumeKeysExecutorService = Executors.newFixedThreadPool(consumerJava.threads);
1✔
102
    }
1✔
103

104
    Logger getLogger() {
105
        return logger;
×
106
    }
107
    
108
    void setLogger(Logger logger) {
109
        this.logger = logger;
1✔
110
    }
1✔
111

112
    protected void initLMDB() {
113
        persistence = new LMDBPersistence(consumerJava.lmdbConfigurationReadOnly, persistenceUtils);
1✔
114
        persistence.init();
1✔
115
    }
1✔
116

117
    protected void startStatisticsTimer() {
118
        long period = consumerJava.printStatisticsEveryNSeconds;
1✔
119
        if (period <= 0) {
1✔
120
            throw new IllegalArgumentException("period must be greater than 0.");
1✔
121
        }
122

123
        startTime = System.currentTimeMillis();
1✔
124

125
        scheduledExecutorService.scheduleAtFixedRate(() -> {
1✔
126
            // get transient information
127
            long uptime = Math.max(System.currentTimeMillis() - startTime, 1);
1✔
128

129
            String message = new Statistics().createStatisticsMessage(uptime, checkedKeys.get(), checkedKeysSumOfTimeToCheckContains.get(), emptyConsumer.get(), keysQueue.size(), hits.get());
1✔
130

131
            // log the information
132
            logger.info(message);
1✔
133
        }, period, period, TimeUnit.SECONDS);
1✔
134
    }
1✔
135

136
    @Override
137
    public void startConsumer() {
138
        logger.debug("Starting {} consumer threads...", consumerJava.threads);
1✔
139
        for (int i = 0; i < consumerJava.threads; i++) {
1✔
140
            consumers.add(consumeKeysExecutorService.submit(
1✔
141
                    () -> {
142
                        consumeKeysRunner();
1✔
143
                        return null;
1✔
144
                    }));
145
        }
146
        logger.debug("Successfully started {} consumer threads.", consumers.size());
1✔
147
    }
1✔
148
    
149
    /**
150
     * This method runs in multiple threads.
151
     */
152
    private void consumeKeysRunner() {
153
        logger.info("start consumeKeysRunner");
1✔
154
        ByteBuffer threadLocalReuseableByteBuffer = ByteBuffer.allocateDirect(PublicKeyBytes.HASH160_SIZE);
1✔
155
        
156
        while (shouldRun.get()) {
1✔
157
            if (keysQueue.size() >= consumerJava.queueSize) {
1✔
158
                logger.warn("Attention, queue is full. Please increase queue size.");
1✔
159
            }
160
            try {
161
                consumeKeys(threadLocalReuseableByteBuffer);
1✔
162
                // the consumeKeys method is looped inside, if the method returns it means the queue is empty
163
                emptyConsumer.incrementAndGet();
1✔
164
                Thread.sleep(consumerJava.delayEmptyConsumer);
1✔
165
            } catch (InterruptedException e) {
×
166
                // we need to catch the exception to not break the thread
167
                logger.error("Ignore InterruptedException during Thread.sleep.", e);
×
168
            } catch (Exception e) {
×
169
                // log every Exception because it's hard to debug and we do not break down the thread loop
170
                logger.error("Error in consumeKeysRunner()." , e);
×
171
                e.printStackTrace();
×
172
            }
1✔
173
        }
174
        
175
        if (threadLocalReuseableByteBuffer != null) {
1✔
176
            byteBufferUtility.freeByteBuffer(threadLocalReuseableByteBuffer);
1✔
177
        }
178
        logger.info("end consumeKeysRunner");
1✔
179
    }
1✔
180
    
181
    void consumeKeys(ByteBuffer threadLocalReuseableByteBuffer) throws MnemonicException.MnemonicLengthException {
182
        logger.trace("consumeKeys");
1✔
183
        PublicKeyBytes[] publicKeyBytesArray = keysQueue.poll();
1✔
184
        while (publicKeyBytesArray != null) {
1✔
185
            for (PublicKeyBytes publicKeyBytes : publicKeyBytesArray) {
1✔
186
                if (publicKeyBytes.isInvalid()) {
1✔
187
                    continue;
1✔
188
                }
189
                byte[] hash160Uncompressed = publicKeyBytes.getUncompressedKeyHash();
1✔
190

191
                threadLocalReuseableByteBuffer.rewind();
1✔
192
                threadLocalReuseableByteBuffer.put(hash160Uncompressed);
1✔
193
                threadLocalReuseableByteBuffer.flip();
1✔
194

195
                boolean containsAddressUncompressed = containsAddress(threadLocalReuseableByteBuffer);
1✔
196

197
                byte[] hash160Compressed = publicKeyBytes.getCompressedKeyHash();
1✔
198
                threadLocalReuseableByteBuffer.rewind();
1✔
199
                threadLocalReuseableByteBuffer.put(hash160Compressed);
1✔
200
                threadLocalReuseableByteBuffer.flip();
1✔
201

202
                boolean containsAddressCompressed = containsAddress(threadLocalReuseableByteBuffer);
1✔
203

204
                if (consumerJava.runtimePublicKeyCalculationCheck) {
1✔
205
                    
206
                    ECKey fromPrivateUncompressed = ECKey.fromPrivate(publicKeyBytes.getSecretKey(), false);
1✔
207
                    ECKey fromPrivateCompressed = ECKey.fromPrivate(publicKeyBytes.getSecretKey(), true);
1✔
208
                    
209
                    final byte[] pubKeyUncompressedFromEcKey = fromPrivateUncompressed.getPubKey();
1✔
210
                    final byte[] pubKeyCompressedFromEcKey = fromPrivateCompressed.getPubKey();
1✔
211
                    
212
                    final byte[] hash160UncompressedFromEcKey = fromPrivateUncompressed.getPubKeyHash();
1✔
213
                    final byte[] hash160CompressedFromEcKey = fromPrivateCompressed.getPubKeyHash();
1✔
214
                    
215
                    if (!Arrays.equals(hash160UncompressedFromEcKey, hash160Uncompressed)) {
1✔
216
                        logger.error("fromPrivateUncompressed.getPubKeyHash() != hash160Uncompressed");
1✔
217
                        logger.error("getSecretKey: " + publicKeyBytes.getSecretKey());
1✔
218
                        logger.error("pubKeyUncompressed: " + Hex.encodeHexString(publicKeyBytes.getUncompressed()));
1✔
219
                        logger.error("pubKeyUncompressedFromEcKey: " + Hex.encodeHexString(pubKeyUncompressedFromEcKey));
1✔
220
                        logger.error("hash160Uncompressed: " + Hex.encodeHexString(hash160Uncompressed));
1✔
221
                        logger.error("hash160UncompressedFromEcKey: " + Hex.encodeHexString(hash160UncompressedFromEcKey));
1✔
222
                    }
223
                    
224
                    if (!Arrays.equals(hash160CompressedFromEcKey, hash160Compressed)) {
1✔
225
                        logger.error("fromPrivateCompressed.getPubKeyHash() != hash160Compressed");
1✔
226
                        logger.error("getSecretKey: " + publicKeyBytes.getSecretKey());
1✔
227
                        logger.error("pubKeyCompressed: " + Hex.encodeHexString(publicKeyBytes.getCompressed()));
1✔
228
                        logger.error("pubKeyCompressedFromEcKey: " + Hex.encodeHexString(pubKeyCompressedFromEcKey));
1✔
229
                        logger.error("hash160Compressed: " + Hex.encodeHexString(hash160Compressed));
1✔
230
                        logger.error("hash160CompressedFromEcKey: " + Hex.encodeHexString(hash160CompressedFromEcKey));
1✔
231
                    }
232
                }
233

234
                if (containsAddressUncompressed) {
1✔
235
                    // immediately log the secret
236
                    safeLog(publicKeyBytes, hash160Uncompressed, hash160Compressed);
1✔
237
                    hits.incrementAndGet();
1✔
238
                    ECKey ecKeyUncompressed = ECKey.fromPrivateAndPrecalculatedPublic(publicKeyBytes.getSecretKey().toByteArray(), publicKeyBytes.getUncompressed());
1✔
239
                    String hitMessageUncompressed = HIT_PREFIX + keyUtility.createKeyDetails(ecKeyUncompressed);
1✔
240
                    logger.info(hitMessageUncompressed);
1✔
241
                }
242

243
                if (containsAddressCompressed) {
1✔
244
                    // immediately log the secret
245
                    safeLog(publicKeyBytes, hash160Uncompressed, hash160Compressed);
1✔
246
                    hits.incrementAndGet();
1✔
247
                    ECKey ecKeyCompressed = ECKey.fromPrivateAndPrecalculatedPublic(publicKeyBytes.getSecretKey().toByteArray(), publicKeyBytes.getCompressed());
1✔
248
                    String hitMessageCompressed = HIT_PREFIX + keyUtility.createKeyDetails(ecKeyCompressed);
1✔
249
                    logger.info(hitMessageCompressed);
1✔
250
                }
251

252
                if (consumerJava.enableVanity) {
1✔
253
                    String uncompressedKeyHashAsBase58 = publicKeyBytes.getUncompressedKeyHashAsBase58(keyUtility);
1✔
254
                    Matcher uncompressedKeyHashAsBase58Matcher = vanityPattern.matcher(uncompressedKeyHashAsBase58);
1✔
255
                    if (uncompressedKeyHashAsBase58Matcher.matches()) {
1✔
256
                        // immediately log the secret
257
                        safeLog(publicKeyBytes, hash160Uncompressed, hash160Compressed);
1✔
258
                        vanityHits.incrementAndGet();
1✔
259
                        ECKey ecKeyUncompressed = ECKey.fromPrivateAndPrecalculatedPublic(publicKeyBytes.getSecretKey().toByteArray(), publicKeyBytes.getUncompressed());
1✔
260
                        String vanityHitMessageUncompressed = VANITY_HIT_PREFIX + keyUtility.createKeyDetails(ecKeyUncompressed);
1✔
261
                        logger.info(vanityHitMessageUncompressed);
1✔
262
                    }
263

264
                    String compressedKeyHashAsBase58 = publicKeyBytes.getCompressedKeyHashAsBase58(keyUtility);
1✔
265
                    Matcher compressedKeyHashAsBase58Matcher = vanityPattern.matcher(compressedKeyHashAsBase58);
1✔
266
                    if (compressedKeyHashAsBase58Matcher.matches()) {
1✔
267
                        // immediately log the secret
268
                        safeLog(publicKeyBytes, hash160Uncompressed, hash160Compressed);
1✔
269
                        vanityHits.incrementAndGet();
1✔
270
                        ECKey ecKeyCompressed = ECKey.fromPrivateAndPrecalculatedPublic(publicKeyBytes.getSecretKey().toByteArray(), publicKeyBytes.getCompressed());
1✔
271
                        String vanityHitMessageCompressed = VANITY_HIT_PREFIX + keyUtility.createKeyDetails(ecKeyCompressed);
1✔
272
                        logger.info(vanityHitMessageCompressed);
1✔
273
                    }
274
                }
275

276
                if (!containsAddressUncompressed && !containsAddressCompressed) {
1✔
277
                    if (logger.isTraceEnabled()) {
1✔
278
                        ECKey ecKeyUncompressed = ECKey.fromPrivateAndPrecalculatedPublic(publicKeyBytes.getSecretKey().toByteArray(), publicKeyBytes.getUncompressed());
1✔
279
                        String missMessageUncompressed = MISS_PREFIX + keyUtility.createKeyDetails(ecKeyUncompressed);
1✔
280
                        logger.trace(missMessageUncompressed);
1✔
281

282
                        ECKey ecKeyCompressed = ECKey.fromPrivateAndPrecalculatedPublic(publicKeyBytes.getSecretKey().toByteArray(), publicKeyBytes.getCompressed());
1✔
283
                        String missMessageCompressed = MISS_PREFIX + keyUtility.createKeyDetails(ecKeyCompressed);
1✔
284
                        logger.trace(missMessageCompressed);
1✔
285
                    }
286
                }
287
            }
288
            publicKeyBytesArray = keysQueue.poll();
1✔
289
        }
290
    }
1✔
291
    
292
    /**
293
     * Try to log safe informations which may not thrown an exception.
294
     */
295
    private void safeLog(PublicKeyBytes publicKeyBytes, byte[] hash160Uncompressed, byte[] hash160Compressed) {
296
        logger.info(HIT_SAFE_PREFIX +"publicKeyBytes.getSecretKey(): " + publicKeyBytes.getSecretKey());
1✔
297
        logger.info(HIT_SAFE_PREFIX +"publicKeyBytes.getUncompressed(): " + Hex.encodeHexString(publicKeyBytes.getUncompressed()));
1✔
298
        logger.info(HIT_SAFE_PREFIX +"publicKeyBytes.getCompressed(): " + Hex.encodeHexString(publicKeyBytes.getCompressed()));
1✔
299
        logger.info(HIT_SAFE_PREFIX +"hash160Uncompressed: " + Hex.encodeHexString(hash160Uncompressed));
1✔
300
        logger.info(HIT_SAFE_PREFIX +"hash160Compressed: " + Hex.encodeHexString(hash160Compressed));
1✔
301
    }
1✔
302

303
    private boolean containsAddress(ByteBuffer hash160AsByteBuffer) {
304
        long timeBefore = System.currentTimeMillis();
1✔
305
        if (logger.isTraceEnabled()) {
1✔
306
            logger.trace("Time before persistence.containsAddress: " + timeBefore);
1✔
307
        }
308
        boolean containsAddress = persistence.containsAddress(hash160AsByteBuffer);
1✔
309
        long timeAfter = System.currentTimeMillis();
1✔
310
        long timeDelta = timeAfter - timeBefore;
1✔
311
        checkedKeys.incrementAndGet();
1✔
312
        checkedKeysSumOfTimeToCheckContains.addAndGet(timeDelta);
1✔
313
        if (logger.isTraceEnabled()) {
1✔
314
            logger.trace("Time after persistence.containsAddress: " + timeAfter);
1✔
315
            logger.trace("Time delta: " + timeDelta);
1✔
316
        }
317
        return containsAddress;
1✔
318
    }
319

320
    @Override
321
    public void consumeKeys(PublicKeyBytes[] publicKeyBytes) throws InterruptedException {
322
        if(logger.isDebugEnabled()){
1✔
323
            logger.debug("keysQueue.put(publicKeyBytes) with length: " + publicKeyBytes.length);
1✔
324
        }
325
        
326
        keysQueue.put(publicKeyBytes);
1✔
327
        
328
        if(logger.isDebugEnabled()){
1✔
329
            logger.debug("keysQueue.size(): " + keysQueue.size());
1✔
330
        }
331
    }
1✔
332
    
333
    @Override
334
    public void interrupt() {
335
        logger.debug("Interrupt initiated: stopping consumer execution...");
1✔
336
        shouldRun.set(false);
1✔
337
        scheduledExecutorService.shutdown();
1✔
338
        consumeKeysExecutorService.shutdown();
1✔
339
        logger.debug("Waiting for termination of {} consumer threads (timeout: {} seconds)...", consumers.size(), AWAIT_DURATION_QUEUE_EMPTY.getSeconds());
1✔
340
        try {
341
            boolean terminated = consumeKeysExecutorService.awaitTermination(AWAIT_DURATION_QUEUE_EMPTY.get(ChronoUnit.SECONDS), TimeUnit.SECONDS);
1✔
342
            if (!terminated) {
1✔
343
                logger.warn("Timeout reached. Some consumer threads may not have terminated cleanly.");
×
344
            }
345
        } catch (InterruptedException ex) {
×
346
            logger.error("Interrupted while awaiting consumer termination.", ex);
×
347
            throw new RuntimeException(ex);
×
348
        }
1✔
349
        persistence.close();
1✔
350
        logger.debug("Interrupt complete: resources released and persistence closed.");
1✔
351
    }
1✔
352
    
353
    @VisibleForTesting
354
    int keysQueueSize() {
355
        return keysQueue.size();
1✔
356
    }
357
    
358
    @Override
359
    public String toString() {
360
        return "ConsumerJava@" + Integer.toHexString(System.identityHashCode(this));
1✔
361
    }
362
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc