• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

bitcoindevkit / bdk / 5834188079

pending completion
5834188079

Pull #1071

github

web-flow
Merge 68b42331c into 0ba6bbe11
Pull Request #1071: Update rust bitcoin (BDK 0.28)

563 of 563 new or added lines in 28 files covered. (100.0%)

14625 of 18342 relevant lines covered (79.74%)

9267.73 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/blockchain/compact_filters/peer.rs
1
// Bitcoin Dev Kit
2
// Written in 2020 by Alekos Filini <alekos.filini@gmail.com>
3
//
4
// Copyright (c) 2020-2021 Bitcoin Dev Kit Developers
5
//
6
// This file is licensed under the Apache License, Version 2.0 <LICENSE-APACHE
7
// or http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
8
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your option.
9
// You may not use this file except in accordance with one or both of these
10
// licenses.
11

12
use std::collections::HashMap;
13
use std::io::BufReader;
14
use std::net::{TcpStream, ToSocketAddrs};
15
use std::sync::{Arc, Condvar, Mutex, RwLock};
16
use std::thread;
17
use std::time::{Duration, SystemTime, UNIX_EPOCH};
18

19
use socks::{Socks5Stream, ToTargetAddr};
20

21
use rand::{thread_rng, Rng};
22

23
use bitcoin::consensus::{Decodable, Encodable};
24
use bitcoin::hash_types::BlockHash;
25
use bitcoin::network::constants::ServiceFlags;
26
use bitcoin::network::message::{NetworkMessage, RawNetworkMessage};
27
use bitcoin::network::message_blockdata::*;
28
use bitcoin::network::message_filter::*;
29
use bitcoin::network::message_network::VersionMessage;
30
use bitcoin::network::{Address, Magic};
31
use bitcoin::{Block, Network, Transaction, Txid, Wtxid};
32

33
use super::CompactFiltersError;
34

35
type ResponsesMap = HashMap<&'static str, Arc<(Mutex<Vec<NetworkMessage>>, Condvar)>>;
36

37
pub(crate) const TIMEOUT_SECS: u64 = 30;
38

39
/// Container for unconfirmed, but valid Bitcoin transactions
40
///
41
/// It is normally shared between [`Peer`]s with the use of [`Arc`], so that transactions are not
42
/// duplicated in memory.
43
#[derive(Debug, Default)]
×
44
pub struct Mempool(RwLock<InnerMempool>);
45

46
#[derive(Debug, Default)]
×
47
struct InnerMempool {
48
    txs: HashMap<Txid, Transaction>,
49
    wtxids: HashMap<Wtxid, Txid>,
50
}
51

52
#[derive(Debug, Clone, PartialEq, Eq)]
×
53
enum TxIdentifier {
54
    Wtxid(Wtxid),
55
    Txid(Txid),
56
}
57

58
impl Mempool {
59
    /// Create a new empty mempool
60
    pub fn new() -> Self {
×
61
        Self::default()
×
62
    }
×
63

64
    /// Add a transaction to the mempool
65
    ///
66
    /// Note that this doesn't propagate the transaction to other
67
    /// peers. To do that, [`broadcast`](crate::blockchain::Blockchain::broadcast) should be used.
68
    pub fn add_tx(&self, tx: Transaction) {
×
69
        let mut guard = self.0.write().unwrap();
×
70

×
71
        guard.wtxids.insert(tx.wtxid(), tx.txid());
×
72
        guard.txs.insert(tx.txid(), tx);
×
73
    }
×
74

75
    /// Look-up a transaction in the mempool given an [`Inventory`] request
76
    pub fn get_tx(&self, inventory: &Inventory) -> Option<Transaction> {
×
77
        let identifer = match inventory {
×
78
            Inventory::Error
79
            | Inventory::Block(_)
80
            | Inventory::WitnessBlock(_)
81
            | Inventory::CompactBlock(_) => return None,
×
82
            Inventory::Transaction(txid) => TxIdentifier::Txid(*txid),
×
83
            Inventory::WitnessTransaction(txid) => TxIdentifier::Txid(*txid),
×
84
            Inventory::WTx(wtxid) => TxIdentifier::Wtxid(*wtxid),
×
85
            Inventory::Unknown { inv_type, hash } => {
×
86
                log::warn!(
×
87
                    "Unknown inventory request type `{}`, hash `{:?}`",
×
88
                    inv_type,
89
                    hash
90
                );
91
                return None;
×
92
            }
93
        };
94

95
        let txid = match identifer {
×
96
            TxIdentifier::Txid(txid) => Some(txid),
×
97
            TxIdentifier::Wtxid(wtxid) => self.0.read().unwrap().wtxids.get(&wtxid).cloned(),
×
98
        };
99

100
        txid.and_then(|txid| self.0.read().unwrap().txs.get(&txid).cloned())
×
101
    }
×
102

103
    /// Return whether or not the mempool contains a transaction with a given txid
104
    pub fn has_tx(&self, txid: &Txid) -> bool {
×
105
        self.0.read().unwrap().txs.contains_key(txid)
×
106
    }
×
107

108
    /// Return the list of transactions contained in the mempool
109
    pub fn iter_txs(&self) -> Vec<Transaction> {
×
110
        self.0.read().unwrap().txs.values().cloned().collect()
×
111
    }
×
112
}
113

114
/// A Bitcoin peer
115
#[derive(Debug)]
×
116
#[allow(dead_code)]
117
pub struct Peer {
118
    writer: Arc<Mutex<TcpStream>>,
119
    responses: Arc<RwLock<ResponsesMap>>,
120

121
    reader_thread: thread::JoinHandle<()>,
122
    connected: Arc<RwLock<bool>>,
123

124
    mempool: Arc<Mempool>,
125

126
    version: VersionMessage,
127
    network: Network,
128
}
129

130
impl Peer {
131
    /// Connect to a peer over a plaintext TCP connection
132
    ///
133
    /// This function internally spawns a new thread that will monitor incoming messages from the
134
    /// peer, and optionally reply to some of them transparently, like [pings](bitcoin::network::message::NetworkMessage::Ping)
135
    pub fn connect<A: ToSocketAddrs>(
×
136
        address: A,
×
137
        mempool: Arc<Mempool>,
×
138
        network: Network,
×
139
    ) -> Result<Self, CompactFiltersError> {
×
140
        let stream = TcpStream::connect(address)?;
×
141

142
        Peer::from_stream(stream, mempool, network)
×
143
    }
×
144

145
    /// Connect to a peer through a SOCKS5 proxy, optionally by using some credentials, specified
146
    /// as a tuple of `(username, password)`
147
    ///
148
    /// This function internally spawns a new thread that will monitor incoming messages from the
149
    /// peer, and optionally reply to some of them transparently, like [pings](NetworkMessage::Ping)
150
    pub fn connect_proxy<T: ToTargetAddr, P: ToSocketAddrs>(
×
151
        target: T,
×
152
        proxy: P,
×
153
        credentials: Option<(&str, &str)>,
×
154
        mempool: Arc<Mempool>,
×
155
        network: Network,
×
156
    ) -> Result<Self, CompactFiltersError> {
×
157
        let socks_stream = if let Some((username, password)) = credentials {
×
158
            Socks5Stream::connect_with_password(proxy, target, username, password)?
×
159
        } else {
160
            Socks5Stream::connect(proxy, target)?
×
161
        };
162

163
        Peer::from_stream(socks_stream.into_inner(), mempool, network)
×
164
    }
×
165

166
    /// Create a [`Peer`] from an already connected TcpStream
167
    fn from_stream(
×
168
        stream: TcpStream,
×
169
        mempool: Arc<Mempool>,
×
170
        network: Network,
×
171
    ) -> Result<Self, CompactFiltersError> {
×
172
        let writer = Arc::new(Mutex::new(stream.try_clone()?));
×
173
        let responses: Arc<RwLock<ResponsesMap>> = Arc::new(RwLock::new(HashMap::new()));
×
174
        let connected = Arc::new(RwLock::new(true));
×
175

×
176
        let mut locked_writer = writer.lock().unwrap();
×
177

×
178
        let reader_thread_responses = Arc::clone(&responses);
×
179
        let reader_thread_writer = Arc::clone(&writer);
×
180
        let reader_thread_mempool = Arc::clone(&mempool);
×
181
        let reader_thread_connected = Arc::clone(&connected);
×
182
        let reader_thread = thread::spawn(move || {
×
183
            Self::reader_thread(
×
184
                network,
×
185
                stream,
×
186
                reader_thread_responses,
×
187
                reader_thread_writer,
×
188
                reader_thread_mempool,
×
189
                reader_thread_connected,
×
190
            )
×
191
        });
×
192

193
        let timestamp = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs() as i64;
×
194
        let nonce = thread_rng().gen();
×
195
        let receiver = Address::new(&locked_writer.peer_addr()?, ServiceFlags::NONE);
×
196
        let sender = Address {
×
197
            services: ServiceFlags::NONE,
×
198
            address: [0u16; 8],
×
199
            port: 0,
×
200
        };
×
201

×
202
        Self::_send(
×
203
            &mut locked_writer,
×
204
            network.magic(),
×
205
            NetworkMessage::Version(VersionMessage::new(
×
206
                ServiceFlags::WITNESS,
×
207
                timestamp,
×
208
                receiver,
×
209
                sender,
×
210
                nonce,
×
211
                "MagicalBitcoinWallet".into(),
×
212
                0,
×
213
            )),
×
214
        )?;
×
215
        let version = if let NetworkMessage::Version(version) =
×
216
            Self::_recv(&responses, "version", None).unwrap()
×
217
        {
218
            version
×
219
        } else {
220
            return Err(CompactFiltersError::InvalidResponse);
×
221
        };
222

223
        if let NetworkMessage::Verack = Self::_recv(&responses, "verack", None).unwrap() {
×
224
            Self::_send(&mut locked_writer, network.magic(), NetworkMessage::Verack)?;
×
225
        } else {
226
            return Err(CompactFiltersError::InvalidResponse);
×
227
        }
228

229
        std::mem::drop(locked_writer);
×
230

×
231
        Ok(Peer {
×
232
            writer,
×
233
            responses,
×
234
            reader_thread,
×
235
            connected,
×
236
            mempool,
×
237
            version,
×
238
            network,
×
239
        })
×
240
    }
×
241

242
    /// Send a Bitcoin network message
243
    fn _send(
×
244
        writer: &mut TcpStream,
×
245
        magic: Magic,
×
246
        payload: NetworkMessage,
×
247
    ) -> Result<(), CompactFiltersError> {
×
248
        log::trace!("==> {:?}", payload);
×
249

250
        let raw_message = RawNetworkMessage { magic, payload };
×
251

×
252
        raw_message
×
253
            .consensus_encode(writer)
×
254
            .map_err(|_| CompactFiltersError::DataCorruption)?;
×
255

256
        Ok(())
×
257
    }
×
258

259
    /// Wait for a specific incoming Bitcoin message, optionally with a timeout
260
    fn _recv(
×
261
        responses: &Arc<RwLock<ResponsesMap>>,
×
262
        wait_for: &'static str,
×
263
        timeout: Option<Duration>,
×
264
    ) -> Option<NetworkMessage> {
×
265
        let message_resp = {
×
266
            let mut lock = responses.write().unwrap();
×
267
            let message_resp = lock.entry(wait_for).or_default();
×
268
            Arc::clone(message_resp)
×
269
        };
×
270

×
271
        let (lock, cvar) = &*message_resp;
×
272

×
273
        let mut messages = lock.lock().unwrap();
×
274
        while messages.is_empty() {
×
275
            match timeout {
×
276
                None => messages = cvar.wait(messages).unwrap(),
×
277
                Some(t) => {
×
278
                    let result = cvar.wait_timeout(messages, t).unwrap();
×
279
                    if result.1.timed_out() {
×
280
                        return None;
×
281
                    }
×
282
                    messages = result.0;
×
283
                }
284
            }
285
        }
286

287
        messages.pop()
×
288
    }
×
289

290
    /// Return the [`VersionMessage`] sent by the peer
291
    pub fn get_version(&self) -> &VersionMessage {
×
292
        &self.version
×
293
    }
×
294

295
    /// Return the Bitcoin [`Network`] in use
296
    pub fn get_network(&self) -> Network {
×
297
        self.network
×
298
    }
×
299

300
    /// Return the mempool used by this peer
301
    pub fn get_mempool(&self) -> Arc<Mempool> {
×
302
        Arc::clone(&self.mempool)
×
303
    }
×
304

305
    /// Return whether or not the peer is still connected
306
    pub fn is_connected(&self) -> bool {
×
307
        *self.connected.read().unwrap()
×
308
    }
×
309

310
    /// Internal function called once the `reader_thread` is spawned
311
    fn reader_thread(
×
312
        network: Network,
×
313
        connection: TcpStream,
×
314
        reader_thread_responses: Arc<RwLock<ResponsesMap>>,
×
315
        reader_thread_writer: Arc<Mutex<TcpStream>>,
×
316
        reader_thread_mempool: Arc<Mempool>,
×
317
        reader_thread_connected: Arc<RwLock<bool>>,
×
318
    ) {
×
319
        macro_rules! check_disconnect {
×
320
            ($call:expr) => {
×
321
                match $call {
×
322
                    Ok(good) => good,
×
323
                    Err(e) => {
×
324
                        log::debug!("Error {:?}", e);
×
325
                        *reader_thread_connected.write().unwrap() = false;
×
326

×
327
                        break;
×
328
                    }
×
329
                }
×
330
            };
×
331
        }
×
332

×
333
        let mut reader = BufReader::new(connection);
×
334
        loop {
335
            let raw_message: RawNetworkMessage =
×
336
                check_disconnect!(Decodable::consensus_decode(&mut reader));
×
337

338
            let in_message = if raw_message.magic != network.magic() {
×
339
                continue;
×
340
            } else {
341
                raw_message.payload
×
342
            };
×
343

×
344
            log::trace!("<== {:?}", in_message);
×
345

346
            match in_message {
×
347
                NetworkMessage::Ping(nonce) => {
×
348
                    check_disconnect!(Self::_send(
×
349
                        &mut reader_thread_writer.lock().unwrap(),
×
350
                        network.magic(),
×
351
                        NetworkMessage::Pong(nonce),
×
352
                    ));
×
353

354
                    continue;
×
355
                }
356
                NetworkMessage::Alert(_) => continue,
×
357
                NetworkMessage::GetData(ref inv) => {
×
358
                    let (found, not_found): (Vec<_>, Vec<_>) = inv
×
359
                        .iter()
×
360
                        .map(|item| (*item, reader_thread_mempool.get_tx(item)))
×
361
                        .partition(|(_, d)| d.is_some());
×
362
                    for (_, found_tx) in found {
×
363
                        check_disconnect!(Self::_send(
×
364
                            &mut reader_thread_writer.lock().unwrap(),
×
365
                            network.magic(),
×
366
                            NetworkMessage::Tx(found_tx.unwrap()),
×
367
                        ));
×
368
                    }
369

370
                    if !not_found.is_empty() {
×
371
                        check_disconnect!(Self::_send(
×
372
                            &mut reader_thread_writer.lock().unwrap(),
×
373
                            network.magic(),
×
374
                            NetworkMessage::NotFound(
×
375
                                not_found.into_iter().map(|(i, _)| i).collect(),
×
376
                            ),
×
377
                        ));
×
378
                    }
×
379
                }
380
                _ => {}
×
381
            }
382

383
            let message_resp = {
×
384
                let mut lock = reader_thread_responses.write().unwrap();
×
385
                let message_resp = lock.entry(in_message.cmd()).or_default();
×
386
                Arc::clone(message_resp)
×
387
            };
×
388

×
389
            let (lock, cvar) = &*message_resp;
×
390
            let mut messages = lock.lock().unwrap();
×
391
            messages.push(in_message);
×
392
            cvar.notify_all();
×
393
        }
394
    }
×
395

396
    /// Send a raw Bitcoin message to the peer
397
    pub fn send(&self, payload: NetworkMessage) -> Result<(), CompactFiltersError> {
×
398
        let mut writer = self.writer.lock().unwrap();
×
399
        Self::_send(&mut writer, self.network.magic(), payload)
×
400
    }
×
401

402
    /// Waits for a specific incoming Bitcoin message, optionally with a timeout
403
    pub fn recv(
×
404
        &self,
×
405
        wait_for: &'static str,
×
406
        timeout: Option<Duration>,
×
407
    ) -> Result<Option<NetworkMessage>, CompactFiltersError> {
×
408
        Ok(Self::_recv(&self.responses, wait_for, timeout))
×
409
    }
×
410
}
411

412
pub trait CompactFiltersPeer {
413
    fn get_cf_checkpt(
414
        &self,
415
        filter_type: u8,
416
        stop_hash: BlockHash,
417
    ) -> Result<CFCheckpt, CompactFiltersError>;
418
    fn get_cf_headers(
419
        &self,
420
        filter_type: u8,
421
        start_height: u32,
422
        stop_hash: BlockHash,
423
    ) -> Result<CFHeaders, CompactFiltersError>;
424
    fn get_cf_filters(
425
        &self,
426
        filter_type: u8,
427
        start_height: u32,
428
        stop_hash: BlockHash,
429
    ) -> Result<(), CompactFiltersError>;
430
    fn pop_cf_filter_resp(&self) -> Result<CFilter, CompactFiltersError>;
431
}
432

433
impl CompactFiltersPeer for Peer {
434
    fn get_cf_checkpt(
435
        &self,
436
        filter_type: u8,
437
        stop_hash: BlockHash,
438
    ) -> Result<CFCheckpt, CompactFiltersError> {
439
        self.send(NetworkMessage::GetCFCheckpt(GetCFCheckpt {
×
440
            filter_type,
×
441
            stop_hash,
×
442
        }))?;
×
443

444
        let response = self
×
445
            .recv("cfcheckpt", Some(Duration::from_secs(TIMEOUT_SECS)))?
×
446
            .ok_or(CompactFiltersError::Timeout)?;
×
447
        let response = match response {
×
448
            NetworkMessage::CFCheckpt(response) => response,
×
449
            _ => return Err(CompactFiltersError::InvalidResponse),
×
450
        };
451

452
        if response.filter_type != filter_type {
×
453
            return Err(CompactFiltersError::InvalidResponse);
×
454
        }
×
455

×
456
        Ok(response)
×
457
    }
×
458

459
    fn get_cf_headers(
460
        &self,
461
        filter_type: u8,
462
        start_height: u32,
463
        stop_hash: BlockHash,
464
    ) -> Result<CFHeaders, CompactFiltersError> {
465
        self.send(NetworkMessage::GetCFHeaders(GetCFHeaders {
×
466
            filter_type,
×
467
            start_height,
×
468
            stop_hash,
×
469
        }))?;
×
470

471
        let response = self
×
472
            .recv("cfheaders", Some(Duration::from_secs(TIMEOUT_SECS)))?
×
473
            .ok_or(CompactFiltersError::Timeout)?;
×
474
        let response = match response {
×
475
            NetworkMessage::CFHeaders(response) => response,
×
476
            _ => return Err(CompactFiltersError::InvalidResponse),
×
477
        };
478

479
        if response.filter_type != filter_type {
×
480
            return Err(CompactFiltersError::InvalidResponse);
×
481
        }
×
482

×
483
        Ok(response)
×
484
    }
×
485

486
    fn pop_cf_filter_resp(&self) -> Result<CFilter, CompactFiltersError> {
×
487
        let response = self
×
488
            .recv("cfilter", Some(Duration::from_secs(TIMEOUT_SECS)))?
×
489
            .ok_or(CompactFiltersError::Timeout)?;
×
490
        let response = match response {
×
491
            NetworkMessage::CFilter(response) => response,
×
492
            _ => return Err(CompactFiltersError::InvalidResponse),
×
493
        };
494

495
        Ok(response)
×
496
    }
×
497

498
    fn get_cf_filters(
499
        &self,
500
        filter_type: u8,
501
        start_height: u32,
502
        stop_hash: BlockHash,
503
    ) -> Result<(), CompactFiltersError> {
504
        self.send(NetworkMessage::GetCFilters(GetCFilters {
×
505
            filter_type,
×
506
            start_height,
×
507
            stop_hash,
×
508
        }))?;
×
509

510
        Ok(())
×
511
    }
×
512
}
513

514
pub trait InvPeer {
515
    fn get_block(&self, block_hash: BlockHash) -> Result<Option<Block>, CompactFiltersError>;
516
    fn ask_for_mempool(&self) -> Result<(), CompactFiltersError>;
517
    fn broadcast_tx(&self, tx: Transaction) -> Result<(), CompactFiltersError>;
518
}
519

520
impl InvPeer for Peer {
521
    fn get_block(&self, block_hash: BlockHash) -> Result<Option<Block>, CompactFiltersError> {
522
        self.send(NetworkMessage::GetData(vec![Inventory::WitnessBlock(
×
523
            block_hash,
×
524
        )]))?;
×
525

526
        match self.recv("block", Some(Duration::from_secs(TIMEOUT_SECS)))? {
×
527
            None => Ok(None),
×
528
            Some(NetworkMessage::Block(response)) => Ok(Some(response)),
×
529
            _ => Err(CompactFiltersError::InvalidResponse),
×
530
        }
531
    }
×
532

533
    fn ask_for_mempool(&self) -> Result<(), CompactFiltersError> {
×
534
        if !self.version.services.has(ServiceFlags::BLOOM) {
×
535
            return Err(CompactFiltersError::PeerBloomDisabled);
×
536
        }
×
537

×
538
        self.send(NetworkMessage::MemPool)?;
×
539
        let inv = match self.recv("inv", Some(Duration::from_secs(5)))? {
×
540
            None => return Ok(()), // empty mempool
×
541
            Some(NetworkMessage::Inv(inv)) => inv,
×
542
            _ => return Err(CompactFiltersError::InvalidResponse),
×
543
        };
544

545
        let getdata = inv
×
546
            .iter()
×
547
            .cloned()
×
548
            .filter(
×
549
                |item| matches!(item, Inventory::Transaction(txid) if !self.mempool.has_tx(txid)),
×
550
            )
×
551
            .collect::<Vec<_>>();
×
552
        let num_txs = getdata.len();
×
553
        self.send(NetworkMessage::GetData(getdata))?;
×
554

555
        for _ in 0..num_txs {
×
556
            let tx = self
×
557
                .recv("tx", Some(Duration::from_secs(TIMEOUT_SECS)))?
×
558
                .ok_or(CompactFiltersError::Timeout)?;
×
559
            let tx = match tx {
×
560
                NetworkMessage::Tx(tx) => tx,
×
561
                _ => return Err(CompactFiltersError::InvalidResponse),
×
562
            };
563

564
            self.mempool.add_tx(tx);
×
565
        }
566

567
        Ok(())
×
568
    }
×
569

570
    fn broadcast_tx(&self, tx: Transaction) -> Result<(), CompactFiltersError> {
×
571
        self.mempool.add_tx(tx.clone());
×
572
        self.send(NetworkMessage::Tx(tx))?;
×
573

574
        Ok(())
×
575
    }
×
576
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc