• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tari-project / tari / 15280118615

27 May 2025 04:01PM UTC coverage: 73.59% (+0.4%) from 73.233%
15280118615

push

github

web-flow
feat: add base node HTTP wallet service (#7061)

Description
---
Added a new HTTP server for base node that exposes some wallet related
query functionality.

Current new endpoints (examples on **esmeralda** network):
 - http://127.0.0.1:9005/get_tip_info
 - http://127.0.0.1:9005/get_header_by_height?height=6994
 - http://127.0.0.1:9005/get_height_at_time?time=1747739959

Default ports for http service (by network):
```
MainNet: 9000,
StageNet: 9001,
NextNet: 9002,
LocalNet: 9003,
Igor: 9004,
Esmeralda: 9005,
```

New configuration needs to be set in base node:
```toml
[base_node.http_wallet_query_service]
port = 9000
external_address = "http://127.0.0.1:9000" # this is optional, but if not set, when someone requests for the external address, just returns a None, so wallets can't contact base node
```

Motivation and Context
---


How Has This Been Tested?
---
### Manually

#### Basic test
1. Build new base node
2. Set base node configuration by adding the following:
```toml
[base_node.http_wallet_query_service]
port = 9000
external_address = "http://127.0.0.1:9000"
```
This way we set the port and external address (which is sent to wallet
client when requesting, so in real world it must be public)
3. Set logging level of base node logs to DEBUG
4. Start base node
5. Build and start console wallet
6. See that it is still able to synchronize
7. Check logs of base node (with `tail -f ...` command for instance) and
see that the HTTP endpoints are used

#### Use RPC fallback test
1. Build new base node
2. Set base node configuration by adding the following:
```toml
[base_node.http_wallet_query_service]
port = 9000
external_address = "http://127.0.0.1:9001"
```
This way we set the port and external address (which is sent to wallet
client when requesting, so in real world it must be public)
3. Set logging level of base node logs to DEBUG
4. Start base node
5. Build and start console wallet
6. See that it is still able to synchronize
7. Check logs of base nod... (continued)

9 of 114 new or added lines in 4 files covered. (7.89%)

1592 existing lines in 62 files now uncovered.

82227 of 111736 relevant lines covered (73.59%)

272070.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/base_layer/core/src/base_node/sync/horizon_state_sync/synchronizer.rs
1
//  Copyright 2022, The Tari Project
2
//
3
//  Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
4
//  following conditions are met:
5
//
6
//  1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
7
//  disclaimer.
8
//
9
//  2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
10
//  following disclaimer in the documentation and/or other materials provided with the distribution.
11
//
12
//  3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
13
//  products derived from this software without specific prior written permission.
14
//
15
//  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
16
//  INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17
//  DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
18
//  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
19
//  SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
20
//  WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
21
//  USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
22

23
use std::{
24
    cmp,
25
    convert::{TryFrom, TryInto},
26
    sync::Arc,
27
    time::{Duration, Instant},
28
};
29

30
use futures::StreamExt;
31
use log::*;
32
use tari_common_types::types::{CompressedCommitment, FixedHash, RangeProofService};
33
use tari_comms::{connectivity::ConnectivityRequester, peer_manager::NodeId, protocol::rpc::RpcClient, PeerConnection};
34
use tari_crypto::commitment::HomomorphicCommitment;
35
use tari_utilities::{hex::Hex, ByteArray};
36
use tokio::task;
37

38
use super::error::HorizonSyncError;
39
use crate::{
40
    base_node::sync::{
41
        ban::PeerBanManager,
42
        hooks::Hooks,
43
        horizon_state_sync::{HorizonSyncInfo, HorizonSyncStatus},
44
        rpc,
45
        rpc::BaseNodeSyncRpcClient,
46
        BlockchainSyncConfig,
47
        SyncPeer,
48
    },
49
    blocks::{BlockHeader, ChainHeader, UpdateBlockAccumulatedData},
50
    chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend, ChainStorageError, MmrTree},
51
    common::{rolling_avg::RollingAverageTime, BanPeriod},
52
    consensus::ConsensusManager,
53
    proto::base_node::{sync_utxos_response::Txo, SyncKernelsRequest, SyncUtxosRequest, SyncUtxosResponse},
54
    transactions::transaction_components::{
55
        transaction_output::batch_verify_range_proofs,
56
        TransactionKernel,
57
        TransactionOutput,
58
    },
59
    validation::{
60
        aggregate_body::validate_individual_output,
61
        helpers::validate_output_version,
62
        FinalHorizonStateValidation,
63
    },
64
    PrunedKernelMmr,
65
};
66

67
const LOG_TARGET: &str = "c::bn::state_machine_service::states::horizon_state_sync";
68

69
const MAX_LATENCY_INCREASES: usize = 5;
70

71
pub struct HorizonStateSynchronization<'a, B> {
72
    config: BlockchainSyncConfig,
73
    db: AsyncBlockchainDb<B>,
74
    rules: ConsensusManager,
75
    sync_peers: &'a mut Vec<SyncPeer>,
76
    horizon_sync_height: u64,
77
    prover: Arc<RangeProofService>,
78
    num_kernels: u64,
79
    num_outputs: u64,
80
    hooks: Hooks,
81
    connectivity: ConnectivityRequester,
82
    final_state_validator: Arc<dyn FinalHorizonStateValidation<B>>,
83
    max_latency: Duration,
84
    peer_ban_manager: PeerBanManager,
85
}
86

87
impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
88
    #[allow(clippy::too_many_arguments)]
UNCOV
89
    pub fn new(
×
UNCOV
90
        config: BlockchainSyncConfig,
×
UNCOV
91
        db: AsyncBlockchainDb<B>,
×
UNCOV
92
        connectivity: ConnectivityRequester,
×
UNCOV
93
        rules: ConsensusManager,
×
UNCOV
94
        sync_peers: &'a mut Vec<SyncPeer>,
×
UNCOV
95
        horizon_sync_height: u64,
×
UNCOV
96
        prover: Arc<RangeProofService>,
×
UNCOV
97
        final_state_validator: Arc<dyn FinalHorizonStateValidation<B>>,
×
UNCOV
98
    ) -> Self {
×
UNCOV
99
        let peer_ban_manager = PeerBanManager::new(config.clone(), connectivity.clone());
×
UNCOV
100
        Self {
×
UNCOV
101
            max_latency: config.initial_max_sync_latency,
×
UNCOV
102
            config,
×
UNCOV
103
            db,
×
UNCOV
104
            rules,
×
UNCOV
105
            connectivity,
×
UNCOV
106
            sync_peers,
×
UNCOV
107
            horizon_sync_height,
×
UNCOV
108
            prover,
×
UNCOV
109
            num_kernels: 0,
×
UNCOV
110
            num_outputs: 0,
×
UNCOV
111
            hooks: Hooks::default(),
×
UNCOV
112
            final_state_validator,
×
UNCOV
113
            peer_ban_manager,
×
UNCOV
114
        }
×
UNCOV
115
    }
×
116

UNCOV
117
    pub fn on_starting<H>(&mut self, hook: H)
×
UNCOV
118
    where for<'r> H: FnOnce(&SyncPeer) + Send + Sync + 'static {
×
UNCOV
119
        self.hooks.add_on_starting_hook(hook);
×
UNCOV
120
    }
×
121

UNCOV
122
    pub fn on_progress<H>(&mut self, hook: H)
×
UNCOV
123
    where H: Fn(HorizonSyncInfo) + Send + Sync + 'static {
×
UNCOV
124
        self.hooks.add_on_progress_horizon_hook(hook);
×
UNCOV
125
    }
×
126

UNCOV
127
    pub async fn synchronize(&mut self) -> Result<(), HorizonSyncError> {
×
UNCOV
128
        if self.sync_peers.is_empty() {
×
129
            return Err(HorizonSyncError::NoSyncPeers);
×
UNCOV
130
        }
×
UNCOV
131

×
UNCOV
132
        debug!(
×
133
            target: LOG_TARGET,
×
134
            "Preparing database for horizon sync to height #{}", self.horizon_sync_height
×
135
        );
UNCOV
136
        let to_header = self.db().fetch_header(self.horizon_sync_height).await?.ok_or_else(|| {
×
137
            ChainStorageError::ValueNotFound {
×
138
                entity: "Header",
×
139
                field: "height",
×
140
                value: self.horizon_sync_height.to_string(),
×
141
            }
×
UNCOV
142
        })?;
×
143

UNCOV
144
        let mut latency_increases_counter = 0;
×
145
        loop {
UNCOV
146
            match self.sync(&to_header).await {
×
147
                Ok(()) => return Ok(()),
×
148
                Err(err @ HorizonSyncError::AllSyncPeersExceedLatency) => {
×
149
                    // If we don't have many sync peers to select from, return the listening state and see if we can get
×
150
                    // some more.
×
151
                    warn!(
×
152
                        target: LOG_TARGET,
×
153
                        "Slow sync peers detected: {}",
×
154
                        self.sync_peers
×
155
                            .iter()
×
156
                            .map(|p| format!("{} ({:.2?})", p.node_id(), p.latency().unwrap_or_default()))
×
157
                            .collect::<Vec<_>>()
×
158
                            .join(", ")
×
159
                    );
160
                    if self.sync_peers.len() < 2 {
×
161
                        return Err(err);
×
162
                    }
×
163
                    self.max_latency += self.config.max_latency_increase;
×
164
                    latency_increases_counter += 1;
×
165
                    if latency_increases_counter > MAX_LATENCY_INCREASES {
×
166
                        return Err(err);
×
167
                    }
×
168
                },
169
                Err(err) => return Err(err),
×
170
            }
171
        }
172
    }
×
173

UNCOV
174
    async fn sync(&mut self, to_header: &BlockHeader) -> Result<(), HorizonSyncError> {
×
UNCOV
175
        let sync_peer_node_ids = self.sync_peers.iter().map(|p| p.node_id()).cloned().collect::<Vec<_>>();
×
UNCOV
176
        info!(
×
177
            target: LOG_TARGET,
×
178
            "Attempting to sync horizon state ({} sync peers)",
×
179
            sync_peer_node_ids.len()
×
180
        );
UNCOV
181
        let mut latency_counter = 0usize;
×
UNCOV
182
        for node_id in sync_peer_node_ids {
×
UNCOV
183
            match self.connect_and_attempt_sync(&node_id, to_header).await {
×
184
                Ok(_) => return Ok(()),
×
185
                // Try another peer
186
                Err(err) => {
×
187
                    let ban_reason = HorizonSyncError::get_ban_reason(&err);
×
188

189
                    if let Some(reason) = ban_reason {
×
190
                        let duration = match reason.ban_duration {
×
191
                            BanPeriod::Short => self.config.short_ban_period,
×
192
                            BanPeriod::Long => self.config.ban_period,
×
193
                        };
194
                        warn!(target: LOG_TARGET, "{}", err);
×
195
                        self.peer_ban_manager
×
196
                            .ban_peer_if_required(&node_id, reason.reason, duration)
×
197
                            .await;
×
198
                    }
×
199
                    if let HorizonSyncError::MaxLatencyExceeded { .. } = err {
×
200
                        latency_counter += 1;
×
201
                    } else {
×
202
                        self.remove_sync_peer(&node_id);
×
203
                    }
×
204
                },
205
            }
206
        }
207

208
        if self.sync_peers.is_empty() {
×
209
            Err(HorizonSyncError::NoMoreSyncPeers("Header sync failed".to_string()))
×
210
        } else if latency_counter >= self.sync_peers.len() {
×
211
            Err(HorizonSyncError::AllSyncPeersExceedLatency)
×
212
        } else {
213
            Err(HorizonSyncError::FailedSyncAllPeers)
×
214
        }
215
    }
×
216

UNCOV
217
    async fn connect_and_attempt_sync(
×
UNCOV
218
        &mut self,
×
UNCOV
219
        node_id: &NodeId,
×
UNCOV
220
        to_header: &BlockHeader,
×
UNCOV
221
    ) -> Result<(), HorizonSyncError> {
×
222
        // Connect
UNCOV
223
        let (mut client, sync_peer) = self.connect_sync_peer(node_id).await?;
×
224

225
        // Perform horizon sync
UNCOV
226
        debug!(target: LOG_TARGET, "Check if pruning is needed");
×
UNCOV
227
        self.prune_if_needed().await?;
×
UNCOV
228
        self.sync_kernels_and_outputs(sync_peer.clone(), &mut client, to_header)
×
UNCOV
229
            .await?;
×
230

231
        // Validate and finalize horizon sync
232
        self.finalize_horizon_sync(&sync_peer).await?;
×
233

234
        Ok(())
×
235
    }
×
236

UNCOV
237
    async fn connect_sync_peer(
×
UNCOV
238
        &mut self,
×
UNCOV
239
        node_id: &NodeId,
×
UNCOV
240
    ) -> Result<(BaseNodeSyncRpcClient, SyncPeer), HorizonSyncError> {
×
UNCOV
241
        let peer_index = self
×
UNCOV
242
            .get_sync_peer_index(node_id)
×
UNCOV
243
            .ok_or(HorizonSyncError::PeerNotFound)?;
×
UNCOV
244
        let sync_peer = &self.sync_peers[peer_index];
×
UNCOV
245
        self.hooks.call_on_starting_hook(sync_peer);
×
246

UNCOV
247
        let mut conn = self.dial_sync_peer(node_id).await?;
×
UNCOV
248
        debug!(
×
249
            target: LOG_TARGET,
×
250
            "Attempting to synchronize horizon state with `{}`", node_id
×
251
        );
252

UNCOV
253
        let config = RpcClient::builder()
×
UNCOV
254
            .with_deadline(self.config.rpc_deadline)
×
UNCOV
255
            .with_deadline_grace_period(Duration::from_secs(5));
×
256

UNCOV
257
        let mut client = conn
×
UNCOV
258
            .connect_rpc_using_builder::<rpc::BaseNodeSyncRpcClient>(config)
×
UNCOV
259
            .await?;
×
260

UNCOV
261
        let latency = client
×
UNCOV
262
            .get_last_request_latency()
×
UNCOV
263
            .expect("unreachable panic: last request latency must be set after connect");
×
UNCOV
264
        self.sync_peers[peer_index].set_latency(latency);
×
UNCOV
265
        if latency > self.max_latency {
×
266
            return Err(HorizonSyncError::MaxLatencyExceeded {
×
267
                peer: conn.peer_node_id().clone(),
×
268
                latency,
×
269
                max_latency: self.max_latency,
×
270
            });
×
UNCOV
271
        }
×
UNCOV
272
        debug!(target: LOG_TARGET, "Sync peer latency is {:.2?}", latency);
×
273

UNCOV
274
        Ok((client, self.sync_peers[peer_index].clone()))
×
UNCOV
275
    }
×
276

UNCOV
277
    async fn dial_sync_peer(&self, node_id: &NodeId) -> Result<PeerConnection, HorizonSyncError> {
×
UNCOV
278
        let timer = Instant::now();
×
UNCOV
279
        debug!(target: LOG_TARGET, "Dialing {} sync peer", node_id);
×
UNCOV
280
        let conn = self.connectivity.dial_peer(node_id.clone()).await?;
×
UNCOV
281
        info!(
×
282
            target: LOG_TARGET,
×
283
            "Successfully dialed sync peer {} in {:.2?}",
×
284
            node_id,
×
285
            timer.elapsed()
×
286
        );
UNCOV
287
        Ok(conn)
×
UNCOV
288
    }
×
289

UNCOV
290
    async fn sync_kernels_and_outputs(
×
UNCOV
291
        &mut self,
×
UNCOV
292
        sync_peer: SyncPeer,
×
UNCOV
293
        client: &mut rpc::BaseNodeSyncRpcClient,
×
UNCOV
294
        to_header: &BlockHeader,
×
UNCOV
295
    ) -> Result<(), HorizonSyncError> {
×
UNCOV
296
        // Note: We do not need to rewind kernels if the sync fails due to it being validated when inserted into
×
UNCOV
297
        //       the database. Furthermore, these kernels will also be successfully removed when we need to rewind
×
UNCOV
298
        //       the blockchain for whatever reason.
×
UNCOV
299
        debug!(target: LOG_TARGET, "Synchronizing kernels");
×
UNCOV
300
        self.synchronize_kernels(sync_peer.clone(), client, to_header).await?;
×
UNCOV
301
        debug!(target: LOG_TARGET, "Synchronizing outputs");
×
302
        // let cloned_backup_smt = self.db.inner().smt_read_access()?.clone();
UNCOV
303
        match self.synchronize_outputs(sync_peer, client, to_header).await {
×
304
            Ok(_) => Ok(()),
×
305
            Err(err) => {
×
306
                // We need to clean up the outputs
×
307
                let _ = self.clean_up_failed_output_sync(to_header).await;
×
308
                // let mut smt = self.db.inner().smt_write_access()?;
309
                // *smt = cloned_backup_smt;
310
                Err(err)
×
311
            },
312
        }
313
    }
×
314

315
    /// We clean up a failed output sync attempt and ignore any errors that occur during the clean up process.
316
    async fn clean_up_failed_output_sync(&mut self, to_header: &BlockHeader) {
×
317
        let tip_header = if let Ok(header) = self.db.fetch_tip_header().await {
×
318
            header
×
319
        } else {
320
            return;
×
321
        };
322
        let db = self.db().clone();
×
323
        let mut txn = db.write_transaction();
×
324
        let mut current_header = to_header.clone();
×
325
        loop {
326
            if let Ok(outputs) = self.db.fetch_outputs_in_block(current_header.hash()).await {
×
327
                for (count, output) in (1..=outputs.len()).zip(outputs.iter()) {
×
328
                    // Note: We do not need to clean up the SMT as it was not saved in the database yet, however, we
329
                    // need to clean up the outputs
330
                    txn.prune_output_from_all_dbs(
×
331
                        output.hash(),
×
332
                        output.commitment.clone(),
×
333
                        output.features.output_type,
×
334
                    );
×
335
                    if let Err(e) = txn.commit().await {
×
336
                        warn!(
×
337
                        target: LOG_TARGET,
×
338
                        "Clean up failed sync - prune output from all dbs for header '{}': {}",
×
339
                        current_header.hash(), e
×
340
                        );
341
                    }
×
342
                    if count % 100 == 0 || count == outputs.len() {
×
343
                        if let Err(e) = txn.commit().await {
×
344
                            warn!(
×
345
                                target: LOG_TARGET,
×
346
                                "Clean up failed sync - commit prune outputs for header '{}': {}",
×
347
                                current_header.hash(), e
×
348
                            );
349
                        }
×
350
                    }
×
351
                }
352
            }
×
353
            if let Err(e) = txn.commit().await {
×
354
                warn!(
×
355
                    target: LOG_TARGET, "Clean up failed output sync - commit delete kernels for header '{}': {}",
×
356
                    current_header.hash(), e
×
357
                );
358
            }
×
359
            if let Ok(header) = db.fetch_header_by_block_hash(current_header.prev_hash).await {
×
360
                if let Some(previous_header) = header {
×
361
                    current_header = previous_header;
×
362
                } else {
×
363
                    warn!(target: LOG_TARGET, "Could not clean up failed output sync, previous_header link missing frm db");
×
364
                    break;
×
365
                }
366
            } else {
367
                warn!(
×
368
                    target: LOG_TARGET,
×
369
                    "Could not clean up failed output sync, header '{}' not in db",
×
370
                    current_header.prev_hash.to_hex()
×
371
                );
372
                break;
×
373
            }
374
            if &current_header.hash() == tip_header.hash() {
×
375
                debug!(target: LOG_TARGET, "Finished cleaning up failed output sync");
×
376
                break;
×
377
            }
×
378
        }
379
    }
×
380

UNCOV
381
    async fn prune_if_needed(&mut self) -> Result<(), HorizonSyncError> {
×
UNCOV
382
        let local_metadata = self.db.get_chain_metadata().await?;
×
UNCOV
383
        let new_prune_height = cmp::min(local_metadata.best_block_height(), self.horizon_sync_height);
×
UNCOV
384
        if local_metadata.pruned_height() < new_prune_height {
×
385
            debug!(target: LOG_TARGET, "Pruning block chain to height {}", new_prune_height);
×
386
            self.db.prune_to_height(new_prune_height).await?;
×
UNCOV
387
        }
×
388

UNCOV
389
        Ok(())
×
UNCOV
390
    }
×
391

392
    #[allow(clippy::too_many_lines)]
UNCOV
393
    async fn synchronize_kernels(
×
UNCOV
394
        &mut self,
×
UNCOV
395
        mut sync_peer: SyncPeer,
×
UNCOV
396
        client: &mut rpc::BaseNodeSyncRpcClient,
×
UNCOV
397
        to_header: &BlockHeader,
×
UNCOV
398
    ) -> Result<(), HorizonSyncError> {
×
UNCOV
399
        info!(target: LOG_TARGET, "Starting kernel sync from peer {}", sync_peer);
×
UNCOV
400
        let local_num_kernels = self.db().fetch_mmr_size(MmrTree::Kernel).await?;
×
401

UNCOV
402
        let remote_num_kernels = to_header.kernel_mmr_size;
×
UNCOV
403
        self.num_kernels = remote_num_kernels;
×
UNCOV
404

×
UNCOV
405
        if local_num_kernels >= remote_num_kernels {
×
406
            debug!(target: LOG_TARGET, "Local kernel set already synchronized");
×
407
            return Ok(());
×
UNCOV
408
        }
×
UNCOV
409

×
UNCOV
410
        let info = HorizonSyncInfo::new(vec![sync_peer.node_id().clone()], HorizonSyncStatus::Kernels {
×
UNCOV
411
            current: local_num_kernels,
×
UNCOV
412
            total: remote_num_kernels,
×
UNCOV
413
            sync_peer: sync_peer.clone(),
×
UNCOV
414
        });
×
UNCOV
415
        self.hooks.call_on_progress_horizon_hooks(info);
×
UNCOV
416

×
UNCOV
417
        debug!(
×
418
            target: LOG_TARGET,
×
419
            "Requesting kernels from {} to {} ({} remaining)",
×
420
            local_num_kernels,
×
421
            remote_num_kernels,
×
422
            remote_num_kernels.saturating_sub(local_num_kernels),
×
423
        );
424

UNCOV
425
        let latency = client.get_last_request_latency();
×
UNCOV
426
        debug!(
×
427
            target: LOG_TARGET,
×
428
            "Initiating kernel sync with peer `{}` (latency = {}ms)",
×
429
            sync_peer.node_id(),
×
430
            latency.unwrap_or_default().as_millis()
×
431
        );
432

UNCOV
433
        let mut current_header = self.db().fetch_header_containing_kernel_mmr(local_num_kernels).await?;
×
UNCOV
434
        let req = SyncKernelsRequest {
×
UNCOV
435
            start: local_num_kernels,
×
UNCOV
436
            end_header_hash: to_header.hash().to_vec(),
×
UNCOV
437
        };
×
UNCOV
438
        let mut kernel_stream = client.sync_kernels(req).await?;
×
439

UNCOV
440
        debug!(
×
441
            target: LOG_TARGET,
×
442
            "Found header for kernels at mmr pos: {} height: {}",
×
443
            local_num_kernels,
×
444
            current_header.height()
×
445
        );
UNCOV
446
        let mut kernel_hashes = vec![];
×
UNCOV
447
        let db = self.db().clone();
×
UNCOV
448
        let mut txn = db.write_transaction();
×
UNCOV
449
        let mut mmr_position = local_num_kernels;
×
UNCOV
450
        let end = remote_num_kernels;
×
UNCOV
451
        let mut last_sync_timer = Instant::now();
×
UNCOV
452
        let mut avg_latency = RollingAverageTime::new(20);
×
UNCOV
453
        while let Some(kernel) = kernel_stream.next().await {
×
UNCOV
454
            let latency = last_sync_timer.elapsed();
×
UNCOV
455
            avg_latency.add_sample(latency);
×
UNCOV
456
            let kernel: TransactionKernel = kernel?.try_into().map_err(HorizonSyncError::ConversionError)?;
×
UNCOV
457
            kernel.verify_signature()?;
×
458

UNCOV
459
            kernel_hashes.push(kernel.hash());
×
UNCOV
460

×
UNCOV
461
            if mmr_position > end {
×
462
                return Err(HorizonSyncError::IncorrectResponse(
×
463
                    "Peer sent too many kernels".to_string(),
×
464
                ));
×
UNCOV
465
            }
×
UNCOV
466

×
UNCOV
467
            txn.insert_kernel_via_horizon_sync(kernel, *current_header.hash(), mmr_position);
×
UNCOV
468
            if mmr_position == current_header.header().kernel_mmr_size.saturating_sub(1) {
×
UNCOV
469
                let num_kernels = kernel_hashes.len();
×
UNCOV
470
                debug!(
×
471
                    target: LOG_TARGET,
×
472
                    "Header #{} ({} kernels, latency: {:.2?})",
×
473
                    current_header.height(),
×
474
                    num_kernels,
475
                    latency
476
                );
477
                // Validate root
UNCOV
478
                let block_data = db
×
UNCOV
479
                    .fetch_block_accumulated_data(current_header.header().prev_hash)
×
UNCOV
480
                    .await?;
×
UNCOV
481
                let kernel_pruned_set = block_data.dissolve();
×
UNCOV
482
                let mut kernel_mmr = PrunedKernelMmr::new(kernel_pruned_set);
×
483

UNCOV
484
                for hash in kernel_hashes.drain(..) {
×
UNCOV
485
                    kernel_mmr.push(hash.to_vec())?;
×
486
                }
487

UNCOV
488
                let mmr_root = kernel_mmr.get_merkle_root()?;
×
UNCOV
489
                if mmr_root.as_slice() != current_header.header().kernel_mr.as_slice() {
×
490
                    return Err(HorizonSyncError::InvalidMrRoot {
×
491
                        mr_tree: MmrTree::Kernel.to_string(),
×
492
                        at_height: current_header.height(),
×
493
                        expected_hex: current_header.header().kernel_mr.to_hex(),
×
494
                        actual_hex: mmr_root.to_hex(),
×
495
                    });
×
UNCOV
496
                }
×
497

UNCOV
498
                let kernel_hash_set = kernel_mmr.get_pruned_hash_set()?;
×
UNCOV
499
                debug!(
×
500
                    target: LOG_TARGET,
×
501
                    "Updating block data at height {}",
×
502
                    current_header.height()
×
503
                );
UNCOV
504
                txn.update_block_accumulated_data_via_horizon_sync(
×
UNCOV
505
                    *current_header.hash(),
×
UNCOV
506
                    UpdateBlockAccumulatedData {
×
UNCOV
507
                        kernel_hash_set: Some(kernel_hash_set),
×
UNCOV
508
                        ..Default::default()
×
UNCOV
509
                    },
×
UNCOV
510
                );
×
UNCOV
511

×
UNCOV
512
                txn.commit().await?;
×
UNCOV
513
                debug!(
×
514
                    target: LOG_TARGET,
×
515
                    "Committed {} kernel(s), ({}/{}) {} remaining",
×
516
                    num_kernels,
×
517
                    mmr_position + 1,
×
518
                    end,
×
519
                    end.saturating_sub(mmr_position + 1)
×
520
                );
UNCOV
521
                if mmr_position < end.saturating_sub(1) {
×
UNCOV
522
                    current_header = db.fetch_chain_header(current_header.height() + 1).await?;
×
UNCOV
523
                }
×
UNCOV
524
            }
×
UNCOV
525
            mmr_position += 1;
×
UNCOV
526

×
UNCOV
527
            sync_peer.set_latency(latency);
×
UNCOV
528
            sync_peer.add_sample(last_sync_timer.elapsed());
×
UNCOV
529
            if mmr_position % 100 == 0 || mmr_position == self.num_kernels {
×
UNCOV
530
                let info = HorizonSyncInfo::new(vec![sync_peer.node_id().clone()], HorizonSyncStatus::Kernels {
×
UNCOV
531
                    current: mmr_position,
×
UNCOV
532
                    total: self.num_kernels,
×
UNCOV
533
                    sync_peer: sync_peer.clone(),
×
UNCOV
534
                });
×
UNCOV
535
                self.hooks.call_on_progress_horizon_hooks(info);
×
UNCOV
536
            }
×
537

UNCOV
538
            self.check_latency(sync_peer.node_id(), &avg_latency)?;
×
539

UNCOV
540
            last_sync_timer = Instant::now();
×
541
        }
542

UNCOV
543
        if mmr_position != end {
×
544
            return Err(HorizonSyncError::IncorrectResponse(
×
545
                "Sync node did not send all kernels requested".to_string(),
×
546
            ));
×
UNCOV
547
        }
×
UNCOV
548
        Ok(())
×
UNCOV
549
    }
×
550

UNCOV
551
    fn check_latency(&self, peer: &NodeId, avg_latency: &RollingAverageTime) -> Result<(), HorizonSyncError> {
×
UNCOV
552
        if let Some(avg_latency) = avg_latency.calculate_average_with_min_samples(5) {
×
UNCOV
553
            if avg_latency > self.max_latency {
×
554
                return Err(HorizonSyncError::MaxLatencyExceeded {
×
555
                    peer: peer.clone(),
×
556
                    latency: avg_latency,
×
557
                    max_latency: self.max_latency,
×
558
                });
×
UNCOV
559
            }
×
UNCOV
560
        }
×
561

UNCOV
562
        Ok(())
×
UNCOV
563
    }
×
564

565
    // Synchronize outputs, returning true if any keys were deleted from the output SMT.
566
    #[allow(clippy::too_many_lines)]
UNCOV
567
    async fn synchronize_outputs(
×
UNCOV
568
        &mut self,
×
UNCOV
569
        mut sync_peer: SyncPeer,
×
UNCOV
570
        client: &mut rpc::BaseNodeSyncRpcClient,
×
UNCOV
571
        to_header: &BlockHeader,
×
UNCOV
572
    ) -> Result<(), HorizonSyncError> {
×
UNCOV
573
        info!(target: LOG_TARGET, "Starting output sync from peer {}", sync_peer);
×
UNCOV
574
        let db = self.db().clone();
×
UNCOV
575
        let tip_header = db.fetch_tip_header().await?;
×
576

577
        // Estimate the number of outputs to be downloaded; this cannot be known exactly until the sync is complete.
UNCOV
578
        let mut current_header = to_header.clone();
×
UNCOV
579
        self.num_outputs = 0;
×
580
        loop {
UNCOV
581
            current_header =
×
UNCOV
582
                if let Some(previous_header) = db.fetch_header_by_block_hash(current_header.prev_hash).await? {
×
UNCOV
583
                    self.num_outputs += current_header
×
UNCOV
584
                        .output_smt_size
×
UNCOV
585
                        .saturating_sub(previous_header.output_smt_size);
×
UNCOV
586
                    previous_header
×
587
                } else {
588
                    break;
×
589
                };
UNCOV
590
            if &current_header.hash() == tip_header.hash() {
×
UNCOV
591
                break;
×
UNCOV
592
            }
×
593
        }
594

UNCOV
595
        let info = HorizonSyncInfo::new(vec![sync_peer.node_id().clone()], HorizonSyncStatus::Outputs {
×
UNCOV
596
            current: 0,
×
UNCOV
597
            total: self.num_outputs,
×
UNCOV
598
            sync_peer: sync_peer.clone(),
×
UNCOV
599
        });
×
UNCOV
600
        self.hooks.call_on_progress_horizon_hooks(info);
×
UNCOV
601

×
UNCOV
602
        let latency = client.get_last_request_latency();
×
UNCOV
603
        debug!(
×
604
            target: LOG_TARGET,
×
605
            "Initiating output sync with peer `{}`, requesting ~{} outputs, tip_header height `{}`, \
×
606
            last_chain_header height `{}` (latency = {}ms)",
×
607
            sync_peer.node_id(),
×
608
            self.num_outputs,
×
609
            tip_header.height(),
×
610
            db.fetch_last_chain_header().await?.height(),
×
611
            latency.unwrap_or_default().as_millis(),
×
612
        );
613

UNCOV
614
        let start_chain_header = db.fetch_chain_header(tip_header.height() + 1).await?;
×
UNCOV
615
        let req = SyncUtxosRequest {
×
UNCOV
616
            start_header_hash: start_chain_header.hash().to_vec(),
×
UNCOV
617
            end_header_hash: to_header.hash().to_vec(),
×
UNCOV
618
        };
×
UNCOV
619
        let mut output_stream = client.sync_utxos(req).await?;
×
620

621
        // let mut txn = db.write_transaction();
UNCOV
622
        let mut utxo_counter = 0u64;
×
UNCOV
623
        let mut stxo_counter = 0u64;
×
UNCOV
624
        // let mut output_smt = (*db.inner().smt_write_access()?).clone();
×
UNCOV
625
        let mut last_sync_timer = Instant::now();
×
UNCOV
626
        let mut avg_latency = RollingAverageTime::new(20);
×
627

628
        // let mut inputs_to_delete = Vec::new();
UNCOV
629
        while let Some(response) = output_stream.next().await {
×
UNCOV
630
            let latency = last_sync_timer.elapsed();
×
UNCOV
631
            avg_latency.add_sample(latency);
×
UNCOV
632
            let res: SyncUtxosResponse = response?;
×
633

UNCOV
634
            let output_header_hash = FixedHash::try_from(res.mined_header)
×
UNCOV
635
                .map_err(|_| HorizonSyncError::IncorrectResponse("Peer sent no mined header".into()))?;
×
UNCOV
636
            let current_header = self
×
UNCOV
637
                .db()
×
UNCOV
638
                .fetch_header_by_block_hash(output_header_hash)
×
UNCOV
639
                .await?
×
UNCOV
640
                .ok_or_else(|| {
×
641
                    HorizonSyncError::IncorrectResponse("Peer sent mined header we do not know of".into())
×
UNCOV
642
                })?;
×
643

UNCOV
644
            let proto_output = res
×
UNCOV
645
                .txo
×
UNCOV
646
                .ok_or_else(|| HorizonSyncError::IncorrectResponse("Peer sent no transaction output data".into()))?;
×
UNCOV
647
            match proto_output {
×
UNCOV
648
                Txo::Output(output) => {
×
UNCOV
649
                    utxo_counter += 1;
×
UNCOV
650
                    // Increase the estimate number of outputs to be downloaded (for display purposes only).
×
UNCOV
651
                    if utxo_counter >= self.num_outputs {
×
652
                        self.num_outputs = utxo_counter + u64::from(current_header.hash() != to_header.hash());
×
UNCOV
653
                    }
×
654

UNCOV
655
                    let constants = self.rules.consensus_constants(current_header.height).clone();
×
UNCOV
656
                    let output = TransactionOutput::try_from(output).map_err(HorizonSyncError::ConversionError)?;
×
UNCOV
657
                    if !output.is_burned() {
×
UNCOV
658
                        debug!(
×
659
                            target: LOG_TARGET,
×
660
                            "UTXO `{}` received from sync peer ({} of {})",
×
661
                            output.hash(),
×
662
                            utxo_counter,
663
                            self.num_outputs,
664
                        );
UNCOV
665
                        validate_output_version(&constants, &output)?;
×
UNCOV
666
                        validate_individual_output(&output, &constants)?;
×
667

UNCOV
668
                        batch_verify_range_proofs(&self.prover, &[&output])?;
×
669
                        // let smt_key = NodeKey::try_from(output.commitment.as_bytes())?;
670
                        // let smt_node = ValueHash::try_from(output.smt_hash(current_header.height).as_slice())?;
671
                        // if let Err(e) = output_smt.insert(smt_key, smt_node) {
672
                        //     error!(
673
                        //         target: LOG_TARGET,
674
                        //         "Output commitment({}) already in SMT",
675
                        //         output.commitment.to_hex(),
676
                        //     );
677
                        //     return Err(e.into());
678
                        // }
UNCOV
679
                        todo!("Implement smt changes");
×
680
                        // txn.insert_output_via_horizon_sync(
681
                        //     output,
682
                        //     current_header.hash(),
683
                        //     current_header.height,
684
                        //     current_header.timestamp.as_u64(),
685
                        // );
686

687
                        // // We have checked the range proof, and we have checked that the linked to header exists.
688
                        // txn.commit().await?;
689
                    }
×
690
                },
691
                Txo::Commitment(commitment_bytes) => {
×
692
                    stxo_counter += 1;
×
693

694
                    let commitment = CompressedCommitment::from_canonical_bytes(commitment_bytes.as_slice())?;
×
695
                    match self
×
696
                        .db()
×
697
                        .fetch_unspent_output_hash_by_commitment(commitment.clone())
×
698
                        .await?
×
699
                    {
700
                        Some(output_hash) => {
×
701
                            debug!(
×
702
                                target: LOG_TARGET,
×
703
                                "STXO hash `{}` received from sync peer ({})",
×
704
                                output_hash,
705
                                stxo_counter,
706
                            );
707
                            // let smt_key = NodeKey::try_from(commitment_bytes.as_slice())?;
708
                            // match output_smt.delete(&smt_key)? {
709
                            //     DeleteResult::Deleted(_value_hash) => {},
710
                            //     DeleteResult::KeyNotFound => {
711
                            //         error!(
712
                            //             target: LOG_TARGET,
713
                            //             "Could not find input({}) in SMT",
714
                            //             commitment.to_hex(),
715
                            //         );
716
                            //         return Err(HorizonSyncError::ChainStorageError(
717
                            //             ChainStorageError::UnspendableInput,
718
                            //         ));
719
                            //     },
720
                            // };
721
                            todo!("Implement smt changes");
×
722
                            // This will only be committed once the SMT has been verified due to rewind difficulties if
723
                            // we need to abort the sync
724
                            // inputs_to_delete.push((output_hash, commitment));
725
                        },
726
                        None => {
727
                            return Err(HorizonSyncError::IncorrectResponse(
×
728
                                "Peer sent unknown commitment hash".into(),
×
729
                            ))
×
730
                        },
731
                    }
732
                },
733
            }
734

735
            if utxo_counter % 100 == 0 {
×
736
                let info = HorizonSyncInfo::new(vec![sync_peer.node_id().clone()], HorizonSyncStatus::Outputs {
×
737
                    current: utxo_counter,
×
738
                    total: self.num_outputs,
×
739
                    sync_peer: sync_peer.clone(),
×
740
                });
×
741
                self.hooks.call_on_progress_horizon_hooks(info);
×
742
            }
×
743
            sync_peer.set_latency(latency);
×
744
            sync_peer.add_sample(last_sync_timer.elapsed());
×
745
            last_sync_timer = Instant::now();
×
746
        }
747
        // The SMT can only be verified after all outputs have been downloaded, due to the way we optimize fetching
748
        // outputs from the sync peer. As an example:
749
        // 1. Initial sync:
750
        //    - We request outputs from height 0 to 100 (the tranche)
751
        //    - The sync peer only returns outputs per block that would still be unspent at height 100 and all inputs
752
        //      per block. All outputs that were created and spent within the tranche are never returned.
753
        //    - For example, an output is created in block 50 and spent in block 70. It would be included in the SMT for
754
        //      headers from height 50 to 69, but due to the optimization, the sync peer would never know about it.
755
        // 2. Consecutive sync:
756
        //    - We request outputs from height 101 to 200 (the tranche)
757
        //    - The sync peer only returns outputs per block that would still be unspent at height 200, as well as all
758
        //      inputs per block, but in this case, only those inputs that are not an output of the current tranche of
759
        //      outputs. Similarly, all outputs created and spent within the tranche are never returned.
760
        //    - For example, an output is created in block 110 and spent in block 180. It would be included in the SMT
761
        //      for headers from height 110 to 179, but due to the optimization, the sync peer would never know about
762
        //      it.
763
        // 3. In both cases it would be impossible to verify the SMT per block, as we would not be able to update the
764
        //    SMT with the outputs that were created and spent within the tranche.
765
        todo!("Implement SMT check");
×
766
        // HorizonStateSynchronization::<B>::check_output_smt_root_hash(&mut output_smt, to_header)?;
767

768
        // // Commit in chunks to avoid locking the database for too long
769
        // let inputs_to_delete_len = inputs_to_delete.len();
770
        // for (count, (output_hash, commitment)) in (1..=inputs_to_delete_len).zip(inputs_to_delete.into_iter()) {
771
        //     txn.prune_output_from_all_dbs(output_hash, commitment, OutputType::default());
772
        //     if count % 100 == 0 || count == inputs_to_delete_len {
773
        //         txn.commit().await?;
774
        //     }
775
        // }
776
        // let mut writing_lock_output_smt = db.inner().smt_write_access()?;
777
        // *writing_lock_output_smt = output_smt;
778
        // debug!(
779
        //     target: LOG_TARGET,
780
        //     "Finished syncing TXOs: {} unspent and {} spent downloaded in {:.2?}",
781
        //     utxo_counter,
782
        //     stxo_counter,
783
        //     timer.elapsed()
784
        // );
785
        // Ok(())
786
    }
×
787

788
    // Helper function to check the output SMT root hash against the expected root hash.
789
    // fn check_output_smt_root_hash(output_smt: &LmdbTreeReader, header: &BlockHeader) -> Result<(), HorizonSyncError>
790
    // {     let tree = JellyfishMerkleTree::<_, SmtHasher>::new(output_smt);
791
    //     let root = tree.get_root_hash(header.height).map_err(|e| HorizonSyncError::SMTError(()))
792
    //     if root != header.output_mr {
793
    //         warn!(
794
    //             target: LOG_TARGET,
795
    //             "Target root(#{}) did not match expected (#{})",
796
    //                 header.output_mr.to_hex(),
797
    //                 root.to_hex(),
798
    //         );
799
    //         return Err(HorizonSyncError::InvalidMrRoot {
800
    //             mr_tree: "UTXO SMT".to_string(),
801
    //             at_height: header.height,
802
    //             expected_hex: header.output_mr.to_hex(),
803
    //             actual_hex: root.to_hex(),
804
    //         });
805
    //     }
806
    //     Ok(())
807
    // }
808

809
    // Finalize the horizon state synchronization by setting the chain metadata to the local tip and committing
810
    // the horizon state to the blockchain backend.
811
    async fn finalize_horizon_sync(&mut self, sync_peer: &SyncPeer) -> Result<(), HorizonSyncError> {
×
812
        debug!(target: LOG_TARGET, "Validating horizon state");
×
813

814
        self.hooks.call_on_progress_horizon_hooks(HorizonSyncInfo::new(
×
815
            vec![sync_peer.node_id().clone()],
×
816
            HorizonSyncStatus::Finalizing,
×
817
        ));
×
818

819
        let header = self.db().fetch_chain_header(self.horizon_sync_height).await?;
×
820
        let (calc_utxo_sum, calc_kernel_sum, calc_burned_sum) = self.calculate_commitment_sums(&header).await?;
×
821

822
        self.final_state_validator
×
823
            .validate(
×
824
                &*self.db().inner().db_read_access()?,
×
825
                header.height(),
×
826
                &calc_utxo_sum,
×
827
                &calc_kernel_sum,
×
828
                &calc_burned_sum,
×
829
            )
×
830
            .map_err(HorizonSyncError::FinalStateValidationFailed)?;
×
831

832
        let metadata = self.db().get_chain_metadata().await?;
×
833
        info!(
×
834
            target: LOG_TARGET,
×
835
            "Horizon state validation succeeded! Committing horizon state."
×
836
        );
837
        self.db()
×
838
            .write_transaction()
×
839
            .set_best_block(
×
840
                header.height(),
×
841
                *header.hash(),
×
842
                header.accumulated_data().total_accumulated_difficulty,
×
843
                *metadata.best_block_hash(),
×
844
                header.timestamp(),
×
845
            )
×
846
            .set_pruned_height(header.height())
×
847
            .set_horizon_data(calc_kernel_sum, calc_utxo_sum)
×
848
            .commit()
×
849
            .await?;
×
850

851
        Ok(())
×
852
    }
×
853

854
    /// (UTXO sum, Kernel sum)
855
    async fn calculate_commitment_sums(
×
856
        &mut self,
×
857
        header: &ChainHeader,
×
858
    ) -> Result<(CompressedCommitment, CompressedCommitment, CompressedCommitment), HorizonSyncError> {
×
859
        let mut utxo_sum = HomomorphicCommitment::default();
×
860
        let mut kernel_sum = HomomorphicCommitment::default();
×
861
        let mut burned_sum = HomomorphicCommitment::default();
×
862

×
863
        let mut prev_kernel_mmr = 0;
×
864

×
865
        let height = header.height();
×
866
        let db = self.db().inner().clone();
×
867
        let header_hash = *header.hash();
×
868
        task::spawn_blocking(move || {
×
869
            for h in 0..=height {
×
870
                let curr_header = db.fetch_chain_header(h)?;
×
871
                trace!(
×
872
                    target: LOG_TARGET,
×
873
                    "Fetching utxos from db: height:{}",
×
874
                    curr_header.height(),
×
875
                );
876
                let utxos = db.fetch_outputs_in_block_with_spend_state(*curr_header.hash(), Some(header_hash))?;
×
877
                debug!(
×
878
                    target: LOG_TARGET,
×
879
                    "{} output(s) loaded for height {}",
×
880
                    utxos.len(),
×
881
                    curr_header.height()
×
882
                );
883
                trace!(
×
884
                    target: LOG_TARGET,
×
885
                    "Fetching kernels from db: height:{}, header.kernel_mmr:{}, prev_mmr:{}, end:{}",
×
886
                    curr_header.height(),
×
887
                    curr_header.header().kernel_mmr_size,
×
888
                    prev_kernel_mmr,
×
889
                    curr_header.header().kernel_mmr_size.saturating_sub(1)
×
890
                );
891

892
                trace!(target: LOG_TARGET, "Number of utxos returned: {}", utxos.len());
×
893
                for (u, spent) in utxos {
×
894
                    if !spent {
×
895
                        utxo_sum = &u.commitment.to_commitment()? + &utxo_sum;
×
896
                    }
×
897
                }
898

899
                let kernels = db.fetch_kernels_in_block(*curr_header.hash())?;
×
900
                trace!(target: LOG_TARGET, "Number of kernels returned: {}", kernels.len());
×
901
                for k in kernels {
×
902
                    kernel_sum = &k.excess.to_commitment()? + &kernel_sum;
×
903
                    if k.is_burned() {
×
904
                        burned_sum = &(k.get_burn_commitment()?.to_commitment()?) + &burned_sum;
×
905
                    }
×
906
                }
907
                prev_kernel_mmr = curr_header.header().kernel_mmr_size;
×
908

×
909
                if h % 1000 == 0 && height != 0 {
×
910
                    debug!(
×
911
                        target: LOG_TARGET,
×
912
                        "Final Validation: {:.2}% complete. Height: {} sync",
×
913
                        (h as f32 / height as f32) * 100.0,
×
914
                        h,
915
                    );
916
                }
×
917
            }
918

919
            Ok((
×
920
                CompressedCommitment::from_commitment(utxo_sum),
×
921
                CompressedCommitment::from_commitment(kernel_sum),
×
922
                CompressedCommitment::from_commitment(burned_sum),
×
923
            ))
×
924
        })
×
925
        .await?
×
926
    }
×
927

928
    // Sync peers are also removed from the list of sync peers if the ban duration is longer than the short ban period.
929
    fn remove_sync_peer(&mut self, node_id: &NodeId) {
×
930
        if let Some(pos) = self.sync_peers.iter().position(|p| p.node_id() == node_id) {
×
931
            self.sync_peers.remove(pos);
×
932
        }
×
933
    }
×
934

935
    // Helper function to get the index to the node_id inside of the vec of peers
UNCOV
936
    fn get_sync_peer_index(&mut self, node_id: &NodeId) -> Option<usize> {
×
UNCOV
937
        self.sync_peers.iter().position(|p| p.node_id() == node_id)
×
UNCOV
938
    }
×
939

940
    #[inline]
UNCOV
941
    fn db(&self) -> &AsyncBlockchainDb<B> {
×
UNCOV
942
        &self.db
×
UNCOV
943
    }
×
944
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc