• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

bitcoindevkit / bdk / 5834188079

pending completion
5834188079

Pull #1071

github

web-flow
Merge 68b42331c into 0ba6bbe11
Pull Request #1071: Update rust bitcoin (BDK 0.28)

563 of 563 new or added lines in 28 files covered. (100.0%)

14625 of 18342 relevant lines covered (79.74%)

9267.73 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/blockchain/compact_filters/store.rs
1
// Bitcoin Dev Kit
2
// Written in 2020 by Alekos Filini <alekos.filini@gmail.com>
3
//
4
// Copyright (c) 2020-2021 Bitcoin Dev Kit Developers
5
//
6
// This file is licensed under the Apache License, Version 2.0 <LICENSE-APACHE
7
// or http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
8
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your option.
9
// You may not use this file except in accordance with one or both of these
10
// licenses.
11

12
use std::convert::TryInto;
13
use std::fmt;
14
use std::io::{Read, Write};
15
use std::marker::PhantomData;
16
use std::sync::Arc;
17
use std::sync::RwLock;
18

19
use rand::distributions::Alphanumeric;
20
use rand::{thread_rng, Rng};
21

22
use rocksdb::{Direction, IteratorMode, ReadOptions, WriteBatch, DB};
23

24
use bitcoin::bip158::BlockFilter;
25
use bitcoin::block::Header;
26
use bitcoin::blockdata::constants::genesis_block;
27
use bitcoin::consensus::{deserialize, encode::VarInt, serialize, Decodable, Encodable};
28
use bitcoin::hash_types::{FilterHash, FilterHeader};
29
use bitcoin::hashes::Hash;
30
use bitcoin::pow::Work;
31
use bitcoin::Block;
32
use bitcoin::BlockHash;
33
use bitcoin::Network;
34
use bitcoin::ScriptBuf;
35

36
use super::CompactFiltersError;
37

38
pub trait StoreType: Default + fmt::Debug {}
39

40
#[derive(Default, Debug)]
×
41
pub struct Full;
42
impl StoreType for Full {}
43
#[derive(Default, Debug)]
×
44
pub struct Snapshot;
45
impl StoreType for Snapshot {}
46

47
pub enum StoreEntry {
48
    BlockHeader(Option<usize>),
49
    Block(Option<usize>),
50
    BlockHeaderIndex(Option<BlockHash>),
51
    CFilterTable((u8, Option<usize>)),
52
}
53

54
impl StoreEntry {
55
    pub fn get_prefix(&self) -> Vec<u8> {
×
56
        match self {
×
57
            StoreEntry::BlockHeader(_) => b"z",
×
58
            StoreEntry::Block(_) => b"x",
×
59
            StoreEntry::BlockHeaderIndex(_) => b"i",
×
60
            StoreEntry::CFilterTable(_) => b"t",
×
61
        }
62
        .to_vec()
×
63
    }
×
64

65
    pub fn get_key(&self) -> Vec<u8> {
×
66
        let mut prefix = self.get_prefix();
×
67
        match self {
×
68
            StoreEntry::BlockHeader(Some(height)) => {
×
69
                prefix.extend_from_slice(&height.to_be_bytes())
×
70
            }
71
            StoreEntry::Block(Some(height)) => prefix.extend_from_slice(&height.to_be_bytes()),
×
72
            StoreEntry::BlockHeaderIndex(Some(hash)) => {
×
73
                prefix.extend_from_slice(&hash.to_raw_hash().as_ref())
×
74
            }
75
            StoreEntry::CFilterTable((filter_type, bundle_index)) => {
×
76
                prefix.push(*filter_type);
×
77
                if let Some(bundle_index) = bundle_index {
×
78
                    prefix.extend_from_slice(&bundle_index.to_be_bytes());
×
79
                }
×
80
            }
81
            _ => {}
×
82
        }
83

84
        prefix
×
85
    }
×
86
}
87

88
pub trait SerializeDb: Sized {
89
    fn serialize(&self) -> Vec<u8>;
90
    fn deserialize(data: &[u8]) -> Result<Self, CompactFiltersError>;
91
}
92

93
impl<T> SerializeDb for T
94
where
95
    T: Encodable + Decodable,
96
{
97
    fn serialize(&self) -> Vec<u8> {
×
98
        serialize(self)
×
99
    }
×
100

101
    fn deserialize(data: &[u8]) -> Result<Self, CompactFiltersError> {
×
102
        deserialize(data).map_err(|_| CompactFiltersError::DataCorruption)
×
103
    }
×
104
}
105

106
impl Encodable for BundleStatus {
107
    fn consensus_encode<W: Write + ?Sized>(&self, e: &mut W) -> Result<usize, std::io::Error> {
×
108
        let mut written = 0;
×
109

×
110
        match self {
×
111
            BundleStatus::Init => {
112
                written += 0x00u8.consensus_encode(e)?;
×
113
            }
114
            BundleStatus::CfHeaders { cf_headers } => {
×
115
                written += 0x01u8.consensus_encode(e)?;
×
116
                written += VarInt(cf_headers.len() as u64).consensus_encode(e)?;
×
117
                for header in cf_headers {
×
118
                    written += header.consensus_encode(e)?;
×
119
                }
120
            }
121
            BundleStatus::CFilters { cf_filters } => {
×
122
                written += 0x02u8.consensus_encode(e)?;
×
123
                written += VarInt(cf_filters.len() as u64).consensus_encode(e)?;
×
124
                for filter in cf_filters {
×
125
                    written += filter.consensus_encode(e)?;
×
126
                }
127
            }
128
            BundleStatus::Processed { cf_filters } => {
×
129
                written += 0x03u8.consensus_encode(e)?;
×
130
                written += VarInt(cf_filters.len() as u64).consensus_encode(e)?;
×
131
                for filter in cf_filters {
×
132
                    written += filter.consensus_encode(e)?;
×
133
                }
134
            }
135
            BundleStatus::Pruned => {
136
                written += 0x04u8.consensus_encode(e)?;
×
137
            }
138
            BundleStatus::Tip { cf_filters } => {
×
139
                written += 0x05u8.consensus_encode(e)?;
×
140
                written += VarInt(cf_filters.len() as u64).consensus_encode(e)?;
×
141
                for filter in cf_filters {
×
142
                    written += filter.consensus_encode(e)?;
×
143
                }
144
            }
145
        }
146

147
        Ok(written)
×
148
    }
×
149
}
150

151
impl Decodable for BundleStatus {
152
    fn consensus_decode<D: Read + ?Sized>(
×
153
        d: &mut D,
×
154
    ) -> Result<Self, bitcoin::consensus::encode::Error> {
×
155
        let byte_type = u8::consensus_decode(d)?;
×
156
        match byte_type {
×
157
            0x00 => Ok(BundleStatus::Init),
×
158
            0x01 => {
159
                let num = VarInt::consensus_decode(d)?;
×
160
                let num = num.0 as usize;
×
161

×
162
                let mut cf_headers = Vec::with_capacity(num);
×
163
                for _ in 0..num {
×
164
                    cf_headers.push(FilterHeader::consensus_decode(d)?);
×
165
                }
166

167
                Ok(BundleStatus::CfHeaders { cf_headers })
×
168
            }
169
            0x02 => {
170
                let num = VarInt::consensus_decode(d)?;
×
171
                let num = num.0 as usize;
×
172

×
173
                let mut cf_filters = Vec::with_capacity(num);
×
174
                for _ in 0..num {
×
175
                    cf_filters.push(Vec::<u8>::consensus_decode(d)?);
×
176
                }
177

178
                Ok(BundleStatus::CFilters { cf_filters })
×
179
            }
180
            0x03 => {
181
                let num = VarInt::consensus_decode(d)?;
×
182
                let num = num.0 as usize;
×
183

×
184
                let mut cf_filters = Vec::with_capacity(num);
×
185
                for _ in 0..num {
×
186
                    cf_filters.push(Vec::<u8>::consensus_decode(d)?);
×
187
                }
188

189
                Ok(BundleStatus::Processed { cf_filters })
×
190
            }
191
            0x04 => Ok(BundleStatus::Pruned),
×
192
            0x05 => {
193
                let num = VarInt::consensus_decode(d)?;
×
194
                let num = num.0 as usize;
×
195

×
196
                let mut cf_filters = Vec::with_capacity(num);
×
197
                for _ in 0..num {
×
198
                    cf_filters.push(Vec::<u8>::consensus_decode(d)?);
×
199
                }
200

201
                Ok(BundleStatus::Tip { cf_filters })
×
202
            }
203
            _ => Err(bitcoin::consensus::encode::Error::ParseFailed(
×
204
                "Invalid byte type",
×
205
            )),
×
206
        }
207
    }
×
208
}
209

210
pub struct ChainStore<T: StoreType> {
211
    store: Arc<RwLock<DB>>,
212
    cf_name: String,
213
    min_height: usize,
214
    network: Network,
215
    phantom: PhantomData<T>,
216
}
217

218
impl ChainStore<Full> {
219
    pub fn new(store: DB, network: Network) -> Result<Self, CompactFiltersError> {
×
220
        let genesis = genesis_block(network);
×
221

×
222
        let cf_name = "default".to_string();
×
223
        let cf_handle = store.cf_handle(&cf_name).unwrap();
×
224

×
225
        let genesis_key = StoreEntry::BlockHeader(Some(0)).get_key();
×
226

×
227
        if store.get_pinned_cf(cf_handle, &genesis_key)?.is_none() {
×
228
            let mut batch = WriteBatch::default();
×
229
            batch.put_cf(
×
230
                cf_handle,
×
231
                genesis_key,
×
232
                SerializeDb::serialize(&(genesis.header, genesis.header.work().to_be_bytes())),
×
233
            );
×
234
            batch.put_cf(
×
235
                cf_handle,
×
236
                StoreEntry::BlockHeaderIndex(Some(genesis.block_hash())).get_key(),
×
237
                0usize.to_be_bytes(),
×
238
            );
×
239
            store.write(batch)?;
×
240
        }
×
241

242
        Ok(ChainStore {
×
243
            store: Arc::new(RwLock::new(store)),
×
244
            cf_name,
×
245
            min_height: 0,
×
246
            network,
×
247
            phantom: PhantomData,
×
248
        })
×
249
    }
×
250

251
    pub fn get_locators(&self) -> Result<Vec<(BlockHash, usize)>, CompactFiltersError> {
×
252
        let mut step = 1;
×
253
        let mut index = self.get_height()?;
×
254
        let mut answer = Vec::new();
×
255

×
256
        let store_read = self.store.read().unwrap();
×
257
        let cf_handle = store_read.cf_handle(&self.cf_name).unwrap();
×
258

259
        loop {
×
260
            if answer.len() > 10 {
×
261
                step *= 2;
×
262
            }
×
263

264
            let (header, _): (Header, Vec<u8>) = SerializeDb::deserialize(
×
265
                &store_read
×
266
                    .get_pinned_cf(cf_handle, StoreEntry::BlockHeader(Some(index)).get_key())?
×
267
                    .unwrap(),
×
268
            )?;
×
269
            answer.push((header.block_hash(), index));
×
270

271
            if let Some(new_index) = index.checked_sub(step) {
×
272
                index = new_index;
×
273
            } else {
×
274
                break;
×
275
            }
×
276
        }
×
277

×
278
        Ok(answer)
×
279
    }
×
280

281
    pub fn start_snapshot(&self, from: usize) -> Result<ChainStore<Snapshot>, CompactFiltersError> {
×
282
        let new_cf_name: String = thread_rng()
×
283
            .sample_iter(&Alphanumeric)
×
284
            .map(|byte| byte as char)
×
285
            .take(16)
×
286
            .collect();
×
287
        let new_cf_name = format!("_headers:{}", new_cf_name);
×
288

×
289
        let mut write_store = self.store.write().unwrap();
×
290

×
291
        write_store.create_cf(&new_cf_name, &Default::default())?;
×
292

293
        let cf_handle = write_store.cf_handle(&self.cf_name).unwrap();
×
294
        let new_cf_handle = write_store.cf_handle(&new_cf_name).unwrap();
×
295

296
        let (header, work): (Header, [u8; 32]) = SerializeDb::deserialize(
×
297
            &write_store
×
298
                .get_pinned_cf(cf_handle, StoreEntry::BlockHeader(Some(from)).get_key())?
×
299
                .ok_or(CompactFiltersError::DataCorruption)?,
×
300
        )?;
×
301
        let work = Work::from_be_bytes(work);
×
302

×
303
        let mut batch = WriteBatch::default();
×
304
        batch.put_cf(
×
305
            new_cf_handle,
×
306
            StoreEntry::BlockHeaderIndex(Some(header.block_hash())).get_key(),
×
307
            from.to_be_bytes(),
×
308
        );
×
309
        batch.put_cf(
×
310
            new_cf_handle,
×
311
            StoreEntry::BlockHeader(Some(from)).get_key(),
×
312
            SerializeDb::serialize(&(header, work.to_be_bytes())),
×
313
        );
×
314
        write_store.write(batch)?;
×
315

316
        let store = Arc::clone(&self.store);
×
317
        Ok(ChainStore {
×
318
            store,
×
319
            cf_name: new_cf_name,
×
320
            min_height: from,
×
321
            network: self.network,
×
322
            phantom: PhantomData,
×
323
        })
×
324
    }
×
325

326
    pub fn recover_snapshot(&self, cf_name: &str) -> Result<(), CompactFiltersError> {
×
327
        let mut write_store = self.store.write().unwrap();
×
328
        let snapshot_cf_handle = write_store.cf_handle(cf_name).unwrap();
×
329

×
330
        let prefix = StoreEntry::BlockHeader(None).get_key();
×
331
        let mut iterator = write_store.prefix_iterator_cf(snapshot_cf_handle, prefix);
×
332

333
        let min_height = match iterator
×
334
            .next()
×
335
            .and_then(|(k, _)| k[1..].try_into().ok())
×
336
            .map(usize::from_be_bytes)
×
337
        {
338
            None => {
339
                std::mem::drop(iterator);
×
340
                write_store.drop_cf(cf_name).ok();
×
341

×
342
                return Ok(());
×
343
            }
344
            Some(x) => x,
×
345
        };
×
346
        std::mem::drop(iterator);
×
347
        std::mem::drop(write_store);
×
348

×
349
        let snapshot = ChainStore {
×
350
            store: Arc::clone(&self.store),
×
351
            cf_name: cf_name.into(),
×
352
            min_height,
×
353
            network: self.network,
×
354
            phantom: PhantomData,
×
355
        };
×
356
        if snapshot.work()? > self.work()? {
×
357
            self.apply_snapshot(snapshot)?;
×
358
        }
×
359

360
        Ok(())
×
361
    }
×
362

363
    pub fn apply_snapshot(
×
364
        &self,
×
365
        snaphost: ChainStore<Snapshot>,
×
366
    ) -> Result<(), CompactFiltersError> {
×
367
        let mut batch = WriteBatch::default();
×
368

×
369
        let read_store = self.store.read().unwrap();
×
370
        let cf_handle = read_store.cf_handle(&self.cf_name).unwrap();
×
371
        let snapshot_cf_handle = read_store.cf_handle(&snaphost.cf_name).unwrap();
×
372

×
373
        let from_key = StoreEntry::BlockHeader(Some(snaphost.min_height)).get_key();
×
374
        let to_key = StoreEntry::BlockHeader(Some(usize::MAX)).get_key();
×
375

×
376
        let mut opts = ReadOptions::default();
×
377
        opts.set_iterate_upper_bound(to_key.clone());
×
378

×
379
        log::debug!("Removing items");
×
380
        batch.delete_range_cf(cf_handle, &from_key, &to_key);
×
381
        for (_, v) in read_store.iterator_cf_opt(
×
382
            cf_handle,
×
383
            opts,
×
384
            IteratorMode::From(&from_key, Direction::Forward),
×
385
        ) {
×
386
            let (header, _): (Header, [u8; 32]) = SerializeDb::deserialize(&v)?;
×
387

388
            batch.delete_cf(
×
389
                cf_handle,
×
390
                StoreEntry::BlockHeaderIndex(Some(header.block_hash())).get_key(),
×
391
            );
×
392
        }
393

394
        // Delete full blocks overridden by snapshot
395
        let from_key = StoreEntry::Block(Some(snaphost.min_height)).get_key();
×
396
        let to_key = StoreEntry::Block(Some(usize::MAX)).get_key();
×
397
        batch.delete_range(&from_key, &to_key);
×
398

×
399
        log::debug!("Copying over new items");
×
400
        for (k, v) in read_store.iterator_cf(snapshot_cf_handle, IteratorMode::Start) {
×
401
            batch.put_cf(cf_handle, k, v);
×
402
        }
×
403

404
        read_store.write(batch)?;
×
405
        std::mem::drop(read_store);
×
406

×
407
        self.store.write().unwrap().drop_cf(&snaphost.cf_name)?;
×
408

409
        Ok(())
×
410
    }
×
411

412
    pub fn get_height_for(
×
413
        &self,
×
414
        block_hash: &BlockHash,
×
415
    ) -> Result<Option<usize>, CompactFiltersError> {
×
416
        let read_store = self.store.read().unwrap();
×
417
        let cf_handle = read_store.cf_handle(&self.cf_name).unwrap();
×
418

×
419
        let key = StoreEntry::BlockHeaderIndex(Some(*block_hash)).get_key();
×
420
        let data = read_store.get_pinned_cf(cf_handle, key)?;
×
421
        data.map(|data| {
×
422
            Ok::<_, CompactFiltersError>(usize::from_be_bytes(
×
423
                data.as_ref()
×
424
                    .try_into()
×
425
                    .map_err(|_| CompactFiltersError::DataCorruption)?,
×
426
            ))
427
        })
×
428
        .transpose()
×
429
    }
×
430

431
    pub fn get_block_hash(&self, height: usize) -> Result<Option<BlockHash>, CompactFiltersError> {
×
432
        let read_store = self.store.read().unwrap();
×
433
        let cf_handle = read_store.cf_handle(&self.cf_name).unwrap();
×
434

×
435
        let key = StoreEntry::BlockHeader(Some(height)).get_key();
×
436
        let data = read_store.get_pinned_cf(cf_handle, key)?;
×
437
        data.map(|data| {
×
438
            let (header, _): (Header, [u8; 32]) =
×
439
                deserialize(&data).map_err(|_| CompactFiltersError::DataCorruption)?;
×
440
            Ok::<_, CompactFiltersError>(header.block_hash())
×
441
        })
×
442
        .transpose()
×
443
    }
×
444

445
    pub fn save_full_block(&self, block: &Block, height: usize) -> Result<(), CompactFiltersError> {
×
446
        let key = StoreEntry::Block(Some(height)).get_key();
×
447
        self.store
×
448
            .read()
×
449
            .unwrap()
×
450
            .put(key, SerializeDb::serialize(block))?;
×
451

452
        Ok(())
×
453
    }
×
454

455
    pub fn get_full_block(&self, height: usize) -> Result<Option<Block>, CompactFiltersError> {
×
456
        let read_store = self.store.read().unwrap();
×
457

×
458
        let key = StoreEntry::Block(Some(height)).get_key();
×
459
        let opt_block = read_store.get_pinned(key)?;
×
460

461
        opt_block
×
462
            .map(|data| deserialize(&data))
×
463
            .transpose()
×
464
            .map_err(|_| CompactFiltersError::DataCorruption)
×
465
    }
×
466

467
    pub fn delete_blocks_until(&self, height: usize) -> Result<(), CompactFiltersError> {
×
468
        let from_key = StoreEntry::Block(Some(0)).get_key();
×
469
        let to_key = StoreEntry::Block(Some(height)).get_key();
×
470

×
471
        let mut batch = WriteBatch::default();
×
472
        batch.delete_range(&from_key, &to_key);
×
473

×
474
        self.store.read().unwrap().write(batch)?;
×
475

476
        Ok(())
×
477
    }
×
478

479
    pub fn iter_full_blocks(&self) -> Result<Vec<(usize, Block)>, CompactFiltersError> {
×
480
        let read_store = self.store.read().unwrap();
×
481

×
482
        let prefix = StoreEntry::Block(None).get_key();
×
483

×
484
        let iterator = read_store.prefix_iterator(&prefix);
×
485
        // FIXME: we have to filter manually because rocksdb sometimes returns stuff that doesn't
×
486
        // have the right prefix
×
487
        iterator
×
488
            .filter(|(k, _)| k.starts_with(&prefix))
×
489
            .map(|(k, v)| {
×
490
                let height: usize = usize::from_be_bytes(
×
491
                    k[1..]
×
492
                        .try_into()
×
493
                        .map_err(|_| CompactFiltersError::DataCorruption)?,
×
494
                );
495
                let block = SerializeDb::deserialize(&v)?;
×
496

497
                Ok((height, block))
×
498
            })
×
499
            .collect::<Result<_, _>>()
×
500
    }
×
501
}
502

503
impl<T: StoreType> ChainStore<T> {
504
    pub fn work(&self) -> Result<Work, CompactFiltersError> {
×
505
        let read_store = self.store.read().unwrap();
×
506
        let cf_handle = read_store.cf_handle(&self.cf_name).unwrap();
×
507

×
508
        let prefix = StoreEntry::BlockHeader(None).get_key();
×
509
        let iterator = read_store.prefix_iterator_cf(cf_handle, prefix);
×
510

×
511
        Ok(iterator
×
512
            .last()
×
513
            .map(|(_, v)| -> Result<_, CompactFiltersError> {
×
514
                let (_, work): (Header, [u8; 32]) = SerializeDb::deserialize(&v)?;
×
515
                let work = Work::from_be_bytes(work);
×
516

×
517
                Ok(work)
×
518
            })
×
519
            .transpose()?
×
520
            .unwrap_or_else(|| Work::from_be_bytes([0; 32])))
×
521
    }
×
522

523
    pub fn get_height(&self) -> Result<usize, CompactFiltersError> {
×
524
        let read_store = self.store.read().unwrap();
×
525
        let cf_handle = read_store.cf_handle(&self.cf_name).unwrap();
×
526

×
527
        let prefix = StoreEntry::BlockHeader(None).get_key();
×
528
        let iterator = read_store.prefix_iterator_cf(cf_handle, prefix);
×
529

×
530
        Ok(iterator
×
531
            .last()
×
532
            .map(|(k, _)| -> Result<_, CompactFiltersError> {
×
533
                let height = usize::from_be_bytes(
×
534
                    k[1..]
×
535
                        .try_into()
×
536
                        .map_err(|_| CompactFiltersError::DataCorruption)?,
×
537
                );
538

539
                Ok(height)
×
540
            })
×
541
            .transpose()?
×
542
            .unwrap_or_default())
×
543
    }
×
544

545
    pub fn get_tip_hash(&self) -> Result<Option<BlockHash>, CompactFiltersError> {
×
546
        let read_store = self.store.read().unwrap();
×
547
        let cf_handle = read_store.cf_handle(&self.cf_name).unwrap();
×
548

×
549
        let prefix = StoreEntry::BlockHeader(None).get_key();
×
550
        let iterator = read_store.prefix_iterator_cf(cf_handle, prefix);
×
551

×
552
        iterator
×
553
            .last()
×
554
            .map(|(_, v)| -> Result<_, CompactFiltersError> {
×
555
                let (header, _): (Header, [u8; 32]) = SerializeDb::deserialize(&v)?;
×
556

557
                Ok(header.block_hash())
×
558
            })
×
559
            .transpose()
×
560
    }
×
561

562
    pub fn apply(
×
563
        &mut self,
×
564
        from: usize,
×
565
        headers: Vec<Header>,
×
566
    ) -> Result<BlockHash, CompactFiltersError> {
×
567
        let mut batch = WriteBatch::default();
×
568

×
569
        let read_store = self.store.read().unwrap();
×
570
        let cf_handle = read_store.cf_handle(&self.cf_name).unwrap();
×
571

572
        let (mut last_hash, mut accumulated_work) = read_store
×
573
            .get_pinned_cf(cf_handle, StoreEntry::BlockHeader(Some(from)).get_key())?
×
574
            .map(|result| {
×
575
                let (header, work): (Header, [u8; 32]) = SerializeDb::deserialize(&result)?;
×
576
                let work = Work::from_be_bytes(work);
×
577
                Ok::<_, CompactFiltersError>((header.block_hash(), work))
×
578
            })
×
579
            .transpose()?
×
580
            .ok_or(CompactFiltersError::DataCorruption)?;
×
581

582
        for (index, header) in headers.into_iter().enumerate() {
×
583
            if header.prev_blockhash != last_hash {
×
584
                return Err(CompactFiltersError::InvalidHeaders);
×
585
            }
×
586

×
587
            last_hash = header.block_hash();
×
588
            accumulated_work = accumulated_work + header.work();
×
589

×
590
            let height = from + index + 1;
×
591
            batch.put_cf(
×
592
                cf_handle,
×
593
                StoreEntry::BlockHeaderIndex(Some(header.block_hash())).get_key(),
×
594
                (height).to_be_bytes(),
×
595
            );
×
596
            batch.put_cf(
×
597
                cf_handle,
×
598
                StoreEntry::BlockHeader(Some(height)).get_key(),
×
599
                SerializeDb::serialize(&(header, accumulated_work.to_be_bytes())),
×
600
            );
×
601
        }
602

603
        std::mem::drop(read_store);
×
604

×
605
        self.store.write().unwrap().write(batch)?;
×
606
        Ok(last_hash)
×
607
    }
×
608
}
609

610
impl<T: StoreType> fmt::Debug for ChainStore<T> {
611
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
×
612
        f.debug_struct(&format!("ChainStore<{:?}>", T::default()))
×
613
            .field("cf_name", &self.cf_name)
×
614
            .field("min_height", &self.min_height)
×
615
            .field("network", &self.network)
×
616
            .field("headers_height", &self.get_height())
×
617
            .field("tip_hash", &self.get_tip_hash())
×
618
            .finish()
×
619
    }
×
620
}
621

622
pub enum BundleStatus {
623
    Init,
624
    CfHeaders { cf_headers: Vec<FilterHeader> },
625
    CFilters { cf_filters: Vec<Vec<u8>> },
626
    Processed { cf_filters: Vec<Vec<u8>> },
627
    Tip { cf_filters: Vec<Vec<u8>> },
628
    Pruned,
629
}
630

631
pub struct CfStore {
632
    store: Arc<RwLock<DB>>,
633
    filter_type: u8,
634
}
635

636
type BundleEntry = (BundleStatus, FilterHeader);
637

638
impl CfStore {
639
    pub fn new(
×
640
        headers_store: &ChainStore<Full>,
×
641
        filter_type: u8,
×
642
    ) -> Result<Self, CompactFiltersError> {
×
643
        let cf_store = CfStore {
×
644
            store: Arc::clone(&headers_store.store),
×
645
            filter_type,
×
646
        };
×
647

×
648
        let genesis = genesis_block(headers_store.network);
×
649

650
        let filter = BlockFilter::new_script_filter(&genesis, |utxo| {
×
651
            Err::<ScriptBuf, bitcoin::bip158::Error>(bitcoin::bip158::Error::UtxoMissing(*utxo))
×
652
        })?;
×
653
        let first_key = StoreEntry::CFilterTable((filter_type, Some(0))).get_key();
×
654

×
655
        // Add the genesis' filter
×
656
        {
×
657
            let read_store = cf_store.store.read().unwrap();
×
658
            if read_store.get_pinned(&first_key)?.is_none() {
×
659
                read_store.put(
×
660
                    &first_key,
×
661
                    SerializeDb::serialize(&(
×
662
                        BundleStatus::Init,
×
663
                        filter.filter_header(&FilterHeader::from_raw_hash(Hash::all_zeros())),
×
664
                    )),
×
665
                )?;
×
666
            }
×
667
        }
668

669
        Ok(cf_store)
×
670
    }
×
671

672
    pub fn get_filter_type(&self) -> u8 {
×
673
        self.filter_type
×
674
    }
×
675

676
    pub fn get_bundles(&self) -> Result<Vec<BundleEntry>, CompactFiltersError> {
×
677
        let read_store = self.store.read().unwrap();
×
678

×
679
        let prefix = StoreEntry::CFilterTable((self.filter_type, None)).get_key();
×
680
        let iterator = read_store.prefix_iterator(&prefix);
×
681

×
682
        // FIXME: we have to filter manually because rocksdb sometimes returns stuff that doesn't
×
683
        // have the right prefix
×
684
        iterator
×
685
            .filter(|(k, _)| k.starts_with(&prefix))
×
686
            .map(|(_, data)| BundleEntry::deserialize(&data))
×
687
            .collect::<Result<_, _>>()
×
688
    }
×
689

690
    pub fn get_checkpoints(&self) -> Result<Vec<FilterHeader>, CompactFiltersError> {
×
691
        let read_store = self.store.read().unwrap();
×
692

×
693
        let prefix = StoreEntry::CFilterTable((self.filter_type, None)).get_key();
×
694
        let iterator = read_store.prefix_iterator(&prefix);
×
695

×
696
        // FIXME: we have to filter manually because rocksdb sometimes returns stuff that doesn't
×
697
        // have the right prefix
×
698
        iterator
×
699
            .filter(|(k, _)| k.starts_with(&prefix))
×
700
            .skip(1)
×
701
            .map(|(_, data)| Ok::<_, CompactFiltersError>(BundleEntry::deserialize(&data)?.1))
×
702
            .collect::<Result<_, _>>()
×
703
    }
×
704

705
    pub fn replace_checkpoints(
×
706
        &self,
×
707
        checkpoints: Vec<FilterHeader>,
×
708
    ) -> Result<(), CompactFiltersError> {
×
709
        let current_checkpoints = self.get_checkpoints()?;
×
710

711
        let mut equal_bundles = 0;
×
712
        for (index, (our, their)) in current_checkpoints
×
713
            .iter()
×
714
            .zip(checkpoints.iter())
×
715
            .enumerate()
×
716
        {
717
            equal_bundles = index;
×
718

×
719
            if our != their {
×
720
                break;
×
721
            }
×
722
        }
723

724
        let read_store = self.store.read().unwrap();
×
725
        let mut batch = WriteBatch::default();
×
726

727
        for (index, filter_hash) in checkpoints.iter().enumerate().skip(equal_bundles) {
×
728
            let key = StoreEntry::CFilterTable((self.filter_type, Some(index + 1))).get_key(); // +1 to skip the genesis' filter
×
729

730
            if let Some((BundleStatus::Tip { .. }, _)) = read_store
×
731
                .get_pinned(&key)?
×
732
                .map(|data| BundleEntry::deserialize(&data))
×
733
                .transpose()?
×
734
            {
×
735
                println!("Keeping bundle #{} as Tip", index);
×
736
            } else {
×
737
                batch.put(
×
738
                    &key,
×
739
                    SerializeDb::serialize(&(BundleStatus::Init, *filter_hash)),
×
740
                );
×
741
            }
×
742
        }
743

744
        read_store.write(batch)?;
×
745

746
        Ok(())
×
747
    }
×
748

749
    pub fn advance_to_cf_headers(
×
750
        &self,
×
751
        bundle: usize,
×
752
        checkpoint: FilterHeader,
×
753
        filter_hashes: Vec<FilterHash>,
×
754
    ) -> Result<BundleStatus, CompactFiltersError> {
×
755
        let cf_headers: Vec<FilterHeader> = filter_hashes
×
756
            .into_iter()
×
757
            .scan(checkpoint, |prev_header, filter_hash| {
×
758
                let filter_header = filter_hash.filter_header(prev_header);
×
759
                *prev_header = filter_header;
×
760

×
761
                Some(filter_header)
×
762
            })
×
763
            .collect();
×
764

×
765
        let read_store = self.store.read().unwrap();
×
766

×
767
        let next_key = StoreEntry::CFilterTable((self.filter_type, Some(bundle + 1))).get_key(); // +1 to skip the genesis' filter
×
768
        if let Some((_, next_checkpoint)) = read_store
×
769
            .get_pinned(&next_key)?
×
770
            .map(|data| BundleEntry::deserialize(&data))
×
771
            .transpose()?
×
772
        {
773
            // check connection with the next bundle if present
774
            if cf_headers.iter().last() != Some(&next_checkpoint) {
×
775
                return Err(CompactFiltersError::InvalidFilterHeader);
×
776
            }
×
777
        }
×
778

779
        let key = StoreEntry::CFilterTable((self.filter_type, Some(bundle))).get_key();
×
780
        let value = (BundleStatus::CfHeaders { cf_headers }, checkpoint);
×
781

×
782
        read_store.put(key, SerializeDb::serialize(&value))?;
×
783

784
        Ok(value.0)
×
785
    }
×
786

787
    pub fn advance_to_cf_filters(
×
788
        &self,
×
789
        bundle: usize,
×
790
        checkpoint: FilterHeader,
×
791
        headers: Vec<FilterHeader>,
×
792
        filters: Vec<(usize, Vec<u8>)>,
×
793
    ) -> Result<BundleStatus, CompactFiltersError> {
×
794
        let cf_filters = filters
×
795
            .into_iter()
×
796
            .zip(headers.into_iter())
×
797
            .scan(checkpoint, |prev_header, ((_, filter_content), header)| {
×
798
                let filter = BlockFilter::new(&filter_content);
×
799
                if header != filter.filter_header(prev_header) {
×
800
                    return Some(Err(CompactFiltersError::InvalidFilter));
×
801
                }
×
802
                *prev_header = header;
×
803

×
804
                Some(Ok::<_, CompactFiltersError>(filter_content))
×
805
            })
×
806
            .collect::<Result<_, _>>()?;
×
807

808
        let key = StoreEntry::CFilterTable((self.filter_type, Some(bundle))).get_key();
×
809
        let value = (BundleStatus::CFilters { cf_filters }, checkpoint);
×
810

×
811
        let read_store = self.store.read().unwrap();
×
812
        read_store.put(key, SerializeDb::serialize(&value))?;
×
813

814
        Ok(value.0)
×
815
    }
×
816

817
    pub fn prune_filters(
×
818
        &self,
×
819
        bundle: usize,
×
820
        checkpoint: FilterHeader,
×
821
    ) -> Result<BundleStatus, CompactFiltersError> {
×
822
        let key = StoreEntry::CFilterTable((self.filter_type, Some(bundle))).get_key();
×
823
        let value = (BundleStatus::Pruned, checkpoint);
×
824

×
825
        let read_store = self.store.read().unwrap();
×
826
        read_store.put(key, SerializeDb::serialize(&value))?;
×
827

828
        Ok(value.0)
×
829
    }
×
830

831
    pub fn mark_as_tip(
×
832
        &self,
×
833
        bundle: usize,
×
834
        cf_filters: Vec<Vec<u8>>,
×
835
        checkpoint: FilterHeader,
×
836
    ) -> Result<BundleStatus, CompactFiltersError> {
×
837
        let key = StoreEntry::CFilterTable((self.filter_type, Some(bundle))).get_key();
×
838
        let value = (BundleStatus::Tip { cf_filters }, checkpoint);
×
839

×
840
        let read_store = self.store.read().unwrap();
×
841
        read_store.put(key, SerializeDb::serialize(&value))?;
×
842

843
        Ok(value.0)
×
844
    }
×
845
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc