• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tari-project / tari / 27141517531

08 Jun 2026 01:37PM UTC coverage: 61.298% (-0.01%) from 61.308%
27141517531

push

github

SWvheerden
chore: new release v5.4.0-pre.3

72248 of 117863 relevant lines covered (61.3%)

221811.34 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.46
/base_layer/core/src/base_node/sync/block_sync/synchronizer.rs
1
//  Copyright 2020, The Tari Project
2
//
3
//  Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
4
//  following conditions are met:
5
//
6
//  1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
7
//  disclaimer.
8
//
9
//  2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
10
//  following disclaimer in the documentation and/or other materials provided with the distribution.
11
//
12
//  3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
13
//  products derived from this software without specific prior written permission.
14
//
15
//  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
16
//  INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17
//  DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
18
//  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
19
//  SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
20
//  WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
21
//  USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
22

23
use std::{
24
    convert::{TryFrom, TryInto},
25
    sync::Arc,
26
    time::{Duration, Instant},
27
};
28

29
use futures::StreamExt;
30
use log::*;
31
use tari_comms::{
32
    PeerConnection,
33
    RefKind,
34
    connectivity::ConnectivityRequester,
35
    peer_manager::NodeId,
36
    protocol::rpc::RpcClient,
37
};
38
use tari_node_components::blocks::{Block, ChainBlock};
39
use tari_transaction_components::{BanPeriod, aggregated_body::AggregateBody};
40
use tari_utilities::hex::Hex;
41

42
use super::error::BlockSyncError;
43
use crate::{
44
    base_node::{
45
        BlockchainSyncConfig,
46
        sync::{SyncPeer, ban::PeerBanManager, hooks::Hooks, rpc},
47
    },
48
    chain_storage::{BlockchainBackend, async_db::AsyncBlockchainDb},
49
    common::rolling_avg::RollingAverageTime,
50
    proto::base_node::SyncBlocksRequest,
51
    validation::{BlockBodyValidator, ValidationError},
52
};
53
const LOG_TARGET: &str = "c::bn::block_sync";
54

55
const MAX_LATENCY_INCREASES: usize = 5;
56

57
pub struct BlockSynchronizer<'a, B> {
58
    config: BlockchainSyncConfig,
59
    db: AsyncBlockchainDb<B>,
60
    connectivity: ConnectivityRequester,
61
    sync_peers: &'a mut Vec<SyncPeer>,
62
    block_validator: Arc<dyn BlockBodyValidator<B>>,
63
    hooks: Hooks,
64
    peer_ban_manager: PeerBanManager,
65
}
66

67
impl<'a, B: BlockchainBackend + 'static> BlockSynchronizer<'a, B> {
68
    pub fn new(
7✔
69
        config: BlockchainSyncConfig,
7✔
70
        db: AsyncBlockchainDb<B>,
7✔
71
        connectivity: ConnectivityRequester,
7✔
72
        sync_peers: &'a mut Vec<SyncPeer>,
7✔
73
        block_validator: Arc<dyn BlockBodyValidator<B>>,
7✔
74
    ) -> Self {
7✔
75
        let peer_ban_manager = PeerBanManager::new(config.clone(), connectivity.clone());
7✔
76
        Self {
7✔
77
            config,
7✔
78
            db,
7✔
79
            connectivity,
7✔
80
            sync_peers,
7✔
81
            block_validator,
7✔
82
            hooks: Default::default(),
7✔
83
            peer_ban_manager,
7✔
84
        }
7✔
85
    }
7✔
86

87
    pub fn on_starting<H>(&mut self, hook: H)
7✔
88
    where for<'r> H: FnOnce(&SyncPeer) + Send + Sync + 'static {
7✔
89
        self.hooks.add_on_starting_hook(hook);
7✔
90
    }
7✔
91

92
    pub fn on_progress<H>(&mut self, hook: H)
7✔
93
    where H: Fn(Arc<ChainBlock>, u64, &SyncPeer) + Send + Sync + 'static {
7✔
94
        self.hooks.add_on_progress_block_hook(hook);
7✔
95
    }
7✔
96

97
    pub fn on_complete<H>(&mut self, hook: H)
7✔
98
    where H: Fn(Arc<ChainBlock>, u64) + Send + Sync + 'static {
7✔
99
        self.hooks.add_on_complete_hook(hook);
7✔
100
    }
7✔
101

102
    pub async fn synchronize(&mut self) -> Result<(), BlockSyncError> {
7✔
103
        // Sync peers typically arrive here with a connection attached by header_sync (which
104
        // downgrades it to Weak on exit). Upgrade each in place to Strong for the duration of
105
        // block sync, or dial Strong if no connection is attached. The Strong handle pins the
106
        // peer against background reapers; we'll downgrade back to Weak on exit so reuse by
107
        // later stages remains free while leaving the peer eligible for reaping again.
108
        for peer in self.sync_peers.iter_mut() {
7✔
109
            if let Some(conn) = peer.connection() &&
7✔
110
                !conn.is_connected()
×
111
            {
×
112
                peer.clear_connection();
×
113
            }
7✔
114
            if peer.ensure_strong_connection() {
7✔
115
                continue;
×
116
            }
7✔
117
            match self
7✔
118
                .connectivity
7✔
119
                .dial_peer(peer.node_id().clone(), RefKind::Strong)
7✔
120
                .await
7✔
121
            {
122
                Ok(conn) => peer.set_connection(conn),
7✔
123
                Err(e) => debug!(
×
124
                    target: LOG_TARGET,
×
125
                    "Failed to dial sync peer {} as strong: {e}", peer.node_id()
×
126
                ),
127
            }
128
        }
129

130
        let result = self.synchronize_inner().await;
7✔
131

132
        // Release the Strong pin on every remaining sync peer; the connection stays attached
133
        // (as Weak) so horizon_state_sync can upgrade it again without redialling.
134
        for peer in self.sync_peers.iter_mut() {
7✔
135
            peer.downgrade_connection();
5✔
136
        }
5✔
137

138
        result
7✔
139
    }
7✔
140

141
    async fn synchronize_inner(&mut self) -> Result<(), BlockSyncError> {
7✔
142
        let mut max_latency = self.config.initial_max_sync_latency;
7✔
143
        let mut sync_round = 0;
7✔
144
        let mut latency_increases_counter = 0;
7✔
145
        loop {
146
            match self.attempt_block_sync(max_latency).await {
7✔
147
                Ok(_) => return Ok(()),
5✔
148
                Err(err @ BlockSyncError::AllSyncPeersExceedLatency) => {
×
149
                    warn!(target: LOG_TARGET, "{err}");
×
150
                    max_latency += self.config.max_latency_increase;
×
151
                    warn!(
×
152
                        target: LOG_TARGET,
×
153
                        "Retrying block sync with increased max latency {:.2?} with {} sync peers",
154
                        max_latency,
155
                        self.sync_peers.len()
×
156
                    );
157
                    latency_increases_counter += 1;
×
158
                    if latency_increases_counter > MAX_LATENCY_INCREASES {
×
159
                        return Err(err);
×
160
                    }
×
161
                    // Prohibit using a few slow sync peers only, rather get new sync peers assigned
162
                    if self.sync_peers.len() < 2 {
×
163
                        return Err(err);
×
164
                    } else {
165
                        continue;
×
166
                    }
167
                },
168
                Err(err @ BlockSyncError::SyncRoundFailed) => {
×
169
                    sync_round += 1;
×
170
                    warn!(target: LOG_TARGET, "{err} ({sync_round})");
×
171
                    continue;
×
172
                },
173
                Err(err) => {
2✔
174
                    return Err(err);
2✔
175
                },
176
            }
177
        }
178
    }
7✔
179

180
    #[allow(clippy::too_many_lines)]
181
    async fn attempt_block_sync(&mut self, max_latency: Duration) -> Result<(), BlockSyncError> {
7✔
182
        let sync_peer_node_ids = self.sync_peers.iter().map(|p| p.node_id()).cloned().collect::<Vec<_>>();
7✔
183
        info!(
7✔
184
            target: LOG_TARGET,
×
185
            "Attempting to sync blocks({} sync peers)",
186
            sync_peer_node_ids.len()
×
187
        );
188
        let mut latency_counter = 0usize;
7✔
189
        for node_id in sync_peer_node_ids {
7✔
190
            let peer_index = self.get_sync_peer_index(&node_id).ok_or(BlockSyncError::PeerNotFound)?;
7✔
191
            let sync_peer = self.sync_peers.get(peer_index).expect("Already checked");
7✔
192
            self.hooks.call_on_starting_hook(sync_peer);
7✔
193
            // Prefer the connection that travelled with this SyncPeer from header_sync (Strong).
194
            // We hand the attempt code a Weak clone so dropping it has no effect on the strong
195
            // counter — the SyncPeer's Strong handle still pins the connection.
196
            let stored_conn = sync_peer
7✔
197
                .connection()
7✔
198
                .filter(|c| c.is_connected())
7✔
199
                .map(|c| c.clone_weak());
7✔
200
            let mut conn = match stored_conn {
7✔
201
                Some(c) => c,
7✔
202
                None => match self.connect_to_sync_peer(node_id.clone()).await {
×
203
                    Ok(val) => val,
×
204
                    Err(e) => {
×
205
                        warn!(
×
206
                            target: LOG_TARGET,
×
207
                            "Failed to connect to sync peer `{node_id}`: {e}"
208
                        );
209
                        self.remove_sync_peer(&node_id);
×
210
                        continue;
×
211
                    },
212
                },
213
            };
214
            // Defensive: the connection may have been torn down by another subsystem between
215
            // dial_peer() returning and this point (e.g. DhtConnectivity pruning). Without this
216
            // check we would attempt an RPC negotiation on a dead channel.
217
            if !conn.is_connected() {
7✔
218
                warn!(
×
219
                    target: LOG_TARGET,
×
220
                    "Sync peer `{node_id}` was disconnected before RPC negotiation could begin"
221
                );
222
                self.remove_sync_peer(&node_id);
×
223
                continue;
×
224
            }
7✔
225
            let config = RpcClient::builder()
7✔
226
                .with_deadline(self.config.rpc_deadline)
7✔
227
                .with_deadline_grace_period(Duration::from_secs(5));
7✔
228
            // Bound RPC negotiation: without this the negotiation itself (as opposed to
229
            // individual RPC requests) has no timeout and can wedge the sync loop indefinitely.
230
            let mut client = match tokio::time::timeout(
7✔
231
                self.config.rpc_deadline,
7✔
232
                conn.connect_rpc_using_builder::<rpc::BaseNodeSyncRpcClient>(config),
7✔
233
            )
234
            .await
7✔
235
            {
236
                Ok(Ok(val)) => val,
7✔
237
                Ok(Err(e)) => {
×
238
                    warn!(
×
239
                        target: LOG_TARGET,
×
240
                        "Failed to obtain RPC connection from sync peer `{node_id}`: {e}"
241
                    );
242
                    self.remove_sync_peer(&node_id);
×
243
                    continue;
×
244
                },
245
                Err(_) => {
246
                    warn!(
×
247
                        target: LOG_TARGET,
×
248
                        "Timed out establishing RPC connection with sync peer `{node_id}` after {:.2?}",
249
                        self.config.rpc_deadline,
250
                    );
251
                    self.remove_sync_peer(&node_id);
×
252
                    continue;
×
253
                },
254
            };
255
            let latency = client
7✔
256
                .get_last_request_latency()
7✔
257
                .expect("unreachable panic: last request latency must be set after connect");
7✔
258
            self.sync_peers
7✔
259
                .get_mut(peer_index)
7✔
260
                .expect("Already checked")
7✔
261
                .set_latency(latency);
7✔
262
            let sync_peer = self.sync_peers.get(peer_index).expect("Already checked").clone();
7✔
263
            info!(
7✔
264
                target: LOG_TARGET,
×
265
                "Attempting to synchronize blocks with `{node_id}` latency: {latency:.2?}"
266
            );
267
            match self.synchronize_blocks(sync_peer, client, max_latency).await {
7✔
268
                Ok(_) => return Ok(()),
5✔
269
                Err(err) => {
2✔
270
                    warn!(target: LOG_TARGET, "{err}");
2✔
271
                    let ban_reason = BlockSyncError::get_ban_reason(&err);
2✔
272
                    if let Some(reason) = ban_reason {
2✔
273
                        let duration = match reason.ban_duration {
2✔
274
                            BanPeriod::Short => self.config.short_ban_period,
2✔
275
                            BanPeriod::Long => self.config.ban_period,
×
276
                        };
277
                        self.peer_ban_manager
2✔
278
                            .ban_peer_if_required(&node_id, reason.reason, duration)
2✔
279
                            .await;
2✔
280
                    }
×
281
                    if let BlockSyncError::MaxLatencyExceeded { .. } = err {
2✔
282
                        latency_counter += 1;
×
283
                    } else {
2✔
284
                        self.remove_sync_peer(&node_id);
2✔
285
                    }
2✔
286
                },
287
            }
288
        }
289

290
        if self.sync_peers.is_empty() {
2✔
291
            Err(BlockSyncError::NoMoreSyncPeers("Block sync failed".to_string()))
2✔
292
        } else if latency_counter >= self.sync_peers.len() {
×
293
            Err(BlockSyncError::AllSyncPeersExceedLatency)
×
294
        } else {
295
            Err(BlockSyncError::SyncRoundFailed)
×
296
        }
297
    }
7✔
298

299
    async fn connect_to_sync_peer(&self, peer: NodeId) -> Result<PeerConnection, BlockSyncError> {
×
300
        // `synchronize()` already holds a Strong reference to every sync peer for the duration
301
        // of this sync, so the per-attempt dial here can be Weak — the strong handle in the
302
        // outer guard list keeps the connection pinned.
303
        let connection = self.connectivity.dial_peer(peer, RefKind::Weak).await?;
×
304
        Ok(connection)
×
305
    }
×
306

307
    #[allow(clippy::too_many_lines)]
308
    async fn synchronize_blocks(
7✔
309
        &mut self,
7✔
310
        mut sync_peer: SyncPeer,
7✔
311
        mut client: rpc::BaseNodeSyncRpcClient,
7✔
312
        max_latency: Duration,
7✔
313
    ) -> Result<(), BlockSyncError> {
7✔
314
        info!(target: LOG_TARGET, "Starting block sync from peer {}", sync_peer.node_id());
7✔
315

316
        let tip_header = self.db.fetch_last_header().await?;
7✔
317
        let local_metadata = self.db.get_chain_metadata().await?;
7✔
318

319
        if tip_header.height <= local_metadata.best_block_height() {
7✔
320
            debug!(
1✔
321
                target: LOG_TARGET,
×
322
                "Blocks already synchronized to height {}.", tip_header.height
323
            );
324
            return Ok(());
1✔
325
        }
6✔
326

327
        let tip_hash = tip_header.hash();
6✔
328
        let tip_height = tip_header.height;
6✔
329
        let best_height = local_metadata.best_block_height();
6✔
330
        let chain_header = self.db.fetch_chain_header(best_height).await?;
6✔
331

332
        let best_full_block_hash = chain_header.accumulated_data().hash;
6✔
333
        debug!(
6✔
334
            target: LOG_TARGET,
×
335
            "Starting block sync from peer `{}`. Current best block is #{} `{}`. Syncing to #{} ({}).",
336
            sync_peer,
337
            best_height,
338
            best_full_block_hash.to_hex(),
×
339
            tip_height,
340
            tip_hash.to_hex()
×
341
        );
342
        let request = SyncBlocksRequest {
6✔
343
            start_hash: best_full_block_hash.to_vec(),
6✔
344
            // To the tip!
6✔
345
            end_hash: tip_hash.to_vec(),
6✔
346
        };
6✔
347

348
        let mut block_stream = client.sync_blocks(request).await?;
6✔
349
        let mut prev_hash = best_full_block_hash;
6✔
350
        let mut current_block = None;
6✔
351
        let mut last_sync_timer = Instant::now();
6✔
352
        let mut avg_latency = RollingAverageTime::new(20);
6✔
353
        while let Some(block_result) = block_stream.next().await {
23✔
354
            let latency = last_sync_timer.elapsed();
19✔
355
            avg_latency.add_sample(latency);
19✔
356
            let block_body_response = block_result?;
19✔
357

358
            let header = self
17✔
359
                .db
17✔
360
                .fetch_chain_header_by_block_hash(block_body_response.hash.clone().try_into()?)
17✔
361
                .await?
17✔
362
                .ok_or_else(|| {
17✔
363
                    BlockSyncError::UnknownHeaderHash(format!(
×
364
                        "Peer sent hash ({}) for block header we do not have",
×
365
                        block_body_response.hash.to_hex()
×
366
                    ))
×
367
                })?;
×
368

369
            let current_height = header.height();
17✔
370
            let header_hash = *header.hash();
17✔
371
            let timestamp = header.timestamp();
17✔
372

373
            if header.header().prev_hash != prev_hash {
17✔
374
                return Err(BlockSyncError::BlockWithoutParent {
×
375
                    expected: prev_hash.to_hex(),
×
376
                    got: header.header().prev_hash.to_hex(),
×
377
                });
×
378
            }
17✔
379

380
            prev_hash = header_hash;
17✔
381

382
            let body = block_body_response
17✔
383
                .body
17✔
384
                .map(AggregateBody::try_from)
17✔
385
                .ok_or_else(|| BlockSyncError::InvalidBlockBody("Peer sent empty block".to_string()))?
17✔
386
                .map_err(BlockSyncError::InvalidBlockBody)?;
17✔
387

388
            debug!(
17✔
389
                target: LOG_TARGET,
×
390
                "Validating block body #{} (PoW = {}, {}, latency: {:.2?})",
391
                current_height,
392
                header.header().pow_algo(),
×
393
                body.to_counts_string(),
×
394
                latency
395
            );
396

397
            let timer = Instant::now();
17✔
398
            let (header, header_accum_data) = header.into_parts();
17✔
399
            let block = Block::new(header, body);
17✔
400

401
            // Validate the block inside a tokio task
402
            let task_block = block.clone();
17✔
403
            let db = self.db.inner().clone();
17✔
404
            let validator = self.block_validator.clone();
17✔
405
            let res = {
17✔
406
                let txn = db.db_read_access()?;
17✔
407
                validator.validate_body(&*txn, &task_block)
17✔
408
            };
409

410
            let block = match res {
17✔
411
                Ok(block) => block,
17✔
412
                Err(err @ ValidationError::BadBlockFound { .. }) | Err(err @ ValidationError::FatalStorageError(_)) => {
×
413
                    return Err(err.into());
×
414
                },
415
                Err(err) => {
×
416
                    // Add to bad blocks
417
                    if let Err(err) = self
×
418
                        .db
×
419
                        .write_transaction()
×
420
                        .delete_orphan(header_hash)
×
421
                        .insert_bad_block(header_hash, current_height, err.to_string())
×
422
                        .commit()
×
423
                        .await
×
424
                    {
425
                        error!(target: LOG_TARGET, "Failed to insert bad block: {err}");
×
426
                    }
×
427
                    return Err(err.into());
×
428
                },
429
            };
430

431
            let block = ChainBlock::try_construct(Arc::new(block), header_accum_data)
17✔
432
                .map(Arc::new)
17✔
433
                .ok_or(BlockSyncError::FailedToConstructChainBlock)?;
17✔
434

435
            debug!(
17✔
436
                target: LOG_TARGET,
×
437
                "Validated in {:.0?}. Storing block body #{} (PoW = {}, {})",
438
                timer.elapsed(),
×
439
                block.header().height,
×
440
                block.header().pow_algo(),
×
441
                block.block().body.to_counts_string(),
×
442
            );
443
            trace!(
17✔
444
                target: LOG_TARGET,
×
445
                "{block}"
446
            );
447

448
            self.db
17✔
449
                .write_transaction()
17✔
450
                .delete_orphan(header_hash)
17✔
451
                .insert_tip_block_body(block.clone())
17✔
452
                .set_best_block(
17✔
453
                    block.height(),
17✔
454
                    header_hash,
17✔
455
                    block.accumulated_data().total_accumulated_difficulty,
17✔
456
                    block.header().prev_hash,
17✔
457
                    timestamp,
17✔
458
                )
17✔
459
                .commit()
17✔
460
                .await?;
17✔
461

462
            // Average time between receiving blocks from the peer - used to detect a slow sync peer
463
            let last_avg_latency = avg_latency.calculate_average_with_min_samples(5);
17✔
464
            if let Some(latency) = last_avg_latency {
17✔
465
                sync_peer.set_latency(latency);
7✔
466
            }
10✔
467
            // Includes time to add block to database, used to show blocks/s on status line
468
            sync_peer.add_sample(last_sync_timer.elapsed());
17✔
469
            self.hooks
17✔
470
                .call_on_progress_block_hooks(block.clone(), tip_height, &sync_peer);
17✔
471

472
            if let Some(avg_latency) = last_avg_latency &&
17✔
473
                avg_latency > max_latency
7✔
474
            {
475
                return Err(BlockSyncError::MaxLatencyExceeded {
×
476
                    peer: sync_peer.node_id().clone(),
×
477
                    latency: avg_latency,
×
478
                    max_latency,
×
479
                });
×
480
            }
17✔
481

482
            current_block = Some(block);
17✔
483
            last_sync_timer = Instant::now();
17✔
484
        }
485
        debug!(
4✔
486
            "Sync peer claim at start  - height: {}, accumulated difficulty: {}",
487
            sync_peer.claimed_chain_metadata().best_block_height(),
×
488
            sync_peer.claimed_chain_metadata().accumulated_difficulty(),
×
489
        );
490
        debug!(
4✔
491
            "Our best header at start  - height: {}, accumulated difficulty: {}",
492
            best_height,
493
            chain_header.accumulated_data().total_accumulated_difficulty,
×
494
        );
495
        let metadata_after_sync = self.db.get_chain_metadata().await?;
4✔
496
        debug!(
4✔
497
            "Our best block after sync - height: {}, accumulated difficulty: {}",
498
            metadata_after_sync.best_block_height(),
×
499
            metadata_after_sync.accumulated_difficulty(),
×
500
        );
501

502
        if metadata_after_sync.accumulated_difficulty() < sync_peer.claimed_chain_metadata().accumulated_difficulty() {
4✔
503
            return Err(BlockSyncError::PeerDidNotSupplyAllClaimedBlocks(format!(
×
504
                "Their claim - height: {}, accumulated difficulty: {}. Our status after block sync - height: {}, \
×
505
                 accumulated difficulty: {}",
×
506
                sync_peer.claimed_chain_metadata().best_block_height(),
×
507
                sync_peer.claimed_chain_metadata().accumulated_difficulty(),
×
508
                metadata_after_sync.best_block_height(),
×
509
                metadata_after_sync.accumulated_difficulty(),
×
510
            )));
×
511
        }
4✔
512

513
        if let Some(block) = current_block {
4✔
514
            self.hooks.call_on_complete_hooks(block, best_height);
4✔
515
        }
4✔
516

517
        debug!(target: LOG_TARGET, "Completed block sync with peer `{sync_peer}`");
4✔
518

519
        Ok(())
4✔
520
    }
7✔
521

522
    // Sync peers are also removed from the list of sync peers if the ban duration is longer than the short ban period.
523
    fn remove_sync_peer(&mut self, node_id: &NodeId) {
2✔
524
        if let Some(pos) = self.sync_peers.iter().position(|p| p.node_id() == node_id) {
2✔
525
            self.sync_peers.remove(pos);
2✔
526
        }
2✔
527
    }
2✔
528

529
    // Helper function to get the index to the node_id inside of the vec of peers
530
    fn get_sync_peer_index(&mut self, node_id: &NodeId) -> Option<usize> {
7✔
531
        self.sync_peers.iter().position(|p| p.node_id() == node_id)
7✔
532
    }
7✔
533
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc