• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tari-project / tari / 16261878912

14 Jul 2025 08:26AM UTC coverage: 57.075% (-1.0%) from 58.046%
16261878912

push

github

web-flow
fix: wallet sync command (#7305)

Description
---
Fixes the wallet sync command
Fixes the connected URL address to correctly display
Remove used utxo scanner states

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->

## Summary by CodeRabbit

* **New Features**
* Improved wallet performance by streamlining peer discovery and
synchronization, removing explicit connectivity wait times.
* **Refactor**
* Simplified event reporting by removing detailed connection status
updates and related error messages during scanning and recovery
processes.
* Updated address retrieval methods to use asynchronous calls for better
responsiveness.
* **Bug Fixes**
* Reduced unnecessary delays and redundant connection checks, resulting
in a smoother user experience during wallet operations.

<!-- end of auto-generated comment: release notes by coderabbit.ai -->

0 of 7 new or added lines in 3 files covered. (0.0%)

1506 existing lines in 39 files now uncovered.

68579 of 120155 relevant lines covered (57.08%)

530309.75 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

12.29
/base_layer/core/src/base_node/sync/rpc/service.rs
1
//  Copyright 2020, The Tari Project
2
//
3
//  Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
4
//  following conditions are met:
5
//
6
//  1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
7
//  disclaimer.
8
//
9
//  2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
10
//  following disclaimer in the documentation and/or other materials provided with the distribution.
11
//
12
//  3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
13
//  products derived from this software without specific prior written permission.
14
//
15
//  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
16
//  INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17
//  DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
18
//  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
19
//  SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
20
//  WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
21
//  USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
22

23
use std::{
24
    cmp,
25
    convert::{TryFrom, TryInto},
26
    sync::{Arc, Weak},
27
};
28

29
use log::*;
30
use tari_common_types::types::FixedHash;
31
use tari_comms::{
32
    peer_manager::NodeId,
33
    protocol::rpc::{Request, Response, RpcStatus, RpcStatusResultExt, Streaming},
34
    utils,
35
};
36
use tari_utilities::hex::Hex;
37
use tokio::{
38
    sync::{mpsc, Mutex},
39
    task,
40
};
41
use tracing::{instrument, span, Instrument, Level};
42

43
#[cfg(feature = "metrics")]
44
use crate::base_node::metrics;
45
use crate::{
46
    base_node::{
47
        comms_interface::{BlockEvent, BlockEvent::BlockSyncRewind},
48
        sync::{
49
            header_sync::HEADER_SYNC_INITIAL_MAX_HEADERS,
50
            rpc::{sync_utxos_task::SyncUtxosTask, BaseNodeSyncService},
51
        },
52
        LocalNodeCommsInterface,
53
    },
54
    chain_storage::{async_db::AsyncBlockchainDb, BlockAddResult, BlockchainBackend},
55
    iterators::NonOverlappingIntegerPairIter,
56
    proto,
57
    proto::base_node::{
58
        FindChainSplitRequest,
59
        FindChainSplitResponse,
60
        SyncBlocksRequest,
61
        SyncHeadersRequest,
62
        SyncKernelsRequest,
63
        SyncUtxosRequest,
64
        SyncUtxosResponse,
65
    },
66
};
67

68
const LOG_TARGET: &str = "c::base_node::sync_rpc";
69

70
pub struct BaseNodeSyncRpcService<B> {
71
    db: AsyncBlockchainDb<B>,
72
    active_sessions: Mutex<Vec<Weak<NodeId>>>,
73
    base_node_service: LocalNodeCommsInterface,
74
}
75

76
impl<B: BlockchainBackend + 'static> BaseNodeSyncRpcService<B> {
77
    pub fn new(db: AsyncBlockchainDb<B>, base_node_service: LocalNodeCommsInterface) -> Self {
5✔
78
        Self {
5✔
79
            db,
5✔
80
            active_sessions: Mutex::new(Vec::new()),
5✔
81
            base_node_service,
5✔
82
        }
5✔
83
    }
5✔
84

85
    #[inline]
86
    fn db(&self) -> AsyncBlockchainDb<B> {
2✔
87
        self.db.clone()
2✔
88
    }
2✔
89

90
    pub async fn try_add_exclusive_session(&self, peer: NodeId) -> Result<Arc<NodeId>, RpcStatus> {
1✔
91
        let mut lock = self.active_sessions.lock().await;
1✔
92
        *lock = lock.drain(..).filter(|l| l.strong_count() > 0).collect();
1✔
93
        debug!(target: LOG_TARGET, "Number of active sync sessions: {}", lock.len());
1✔
94

95
        if lock.iter().any(|p| p.upgrade().filter(|p| **p == peer).is_some()) {
1✔
96
            return Err(RpcStatus::forbidden(
×
97
                "Existing sync session found for this client. Only a single session is permitted",
×
98
            ));
×
99
        }
1✔
100

1✔
101
        let token = Arc::new(peer);
1✔
102
        lock.push(Arc::downgrade(&token));
1✔
103
        #[allow(clippy::cast_possible_wrap)]
1✔
104
        #[cfg(feature = "metrics")]
1✔
105
        metrics::active_sync_peers().set(lock.len() as i64);
1✔
106
        Ok(token)
1✔
107
    }
1✔
108
}
109

110
#[tari_comms::async_trait]
111
impl<B: BlockchainBackend + 'static> BaseNodeSyncService for BaseNodeSyncRpcService<B> {
112
    #[instrument(level = "trace", name = "sync_rpc::sync_blocks", skip(self), err)]
113
    #[allow(clippy::blocks_in_conditions)]
114
    async fn sync_blocks(
115
        &self,
116
        request: Request<SyncBlocksRequest>,
117
    ) -> Result<Streaming<proto::base_node::BlockBodyResponse>, RpcStatus> {
1✔
118
        let peer_node_id = request.context().peer_node_id().clone();
1✔
119
        let message = request.into_message();
1✔
120
        let mut block_event_stream = self.base_node_service.get_block_event_stream();
1✔
121

1✔
122
        let db = self.db();
1✔
123
        let hash = message
1✔
124
            .start_hash
1✔
125
            .try_into()
1✔
126
            .map_err(|_| RpcStatus::bad_request(&"Malformed starting hash received".to_string()))?;
1✔
127
        if db.fetch_block_by_hash(hash, true).await.is_err() {
1✔
128
            return Err(RpcStatus::not_found("Requested start block sync hash was not found"));
×
129
        }
1✔
130
        let start_header = db
1✔
131
            .fetch_header_by_block_hash(hash)
1✔
132
            .await
1✔
133
            .rpc_status_internal_error(LOG_TARGET)?
1✔
134
            .ok_or_else(|| RpcStatus::not_found("Header not found with given hash"))?;
1✔
135

UNCOV
136
        let metadata = db.get_chain_metadata().await.rpc_status_internal_error(LOG_TARGET)?;
×
137

UNCOV
138
        let start_height = start_header.height + 1;
×
UNCOV
139
        if start_height < metadata.pruned_height() {
×
140
            return Err(RpcStatus::bad_request(&format!(
×
141
                "Requested full block body at height {}, however this node has an effective pruned height of {}",
×
142
                start_height,
×
143
                metadata.pruned_height()
×
144
            )));
×
UNCOV
145
        }
×
146

UNCOV
147
        let hash = message
×
UNCOV
148
            .end_hash
×
UNCOV
149
            .try_into()
×
UNCOV
150
            .map_err(|_| RpcStatus::bad_request(&"Malformed end hash received".to_string()))?;
×
UNCOV
151
        if db.fetch_block_by_hash(hash, true).await.is_err() {
×
152
            return Err(RpcStatus::not_found("Requested end block sync hash was not found"));
×
UNCOV
153
        }
×
154

UNCOV
155
        let end_header = db
×
UNCOV
156
            .fetch_header_by_block_hash(hash)
×
UNCOV
157
            .await
×
UNCOV
158
            .rpc_status_internal_error(LOG_TARGET)?
×
UNCOV
159
            .ok_or_else(|| RpcStatus::not_found("Requested end block sync hash was not found"))?;
×
160

UNCOV
161
        let end_height = end_header.height;
×
UNCOV
162
        if start_height > end_height {
×
UNCOV
163
            return Err(RpcStatus::bad_request(&format!(
×
UNCOV
164
                "Start block #{} is higher than end block #{}",
×
UNCOV
165
                start_height, end_height
×
UNCOV
166
            )));
×
UNCOV
167
        }
×
UNCOV
168

×
UNCOV
169
        debug!(
×
170
            target: LOG_TARGET,
×
171
            "Initiating block sync with peer `{}` from height {} to {}", peer_node_id, start_height, end_height,
×
172
        );
173

UNCOV
174
        let session_token = self.try_add_exclusive_session(peer_node_id).await?;
×
175
        // Number of blocks to load and push to the stream before loading the next batch
176
        const BATCH_SIZE: usize = 2;
UNCOV
177
        let (tx, rx) = mpsc::channel(BATCH_SIZE);
×
178

UNCOV
179
        let span = span!(Level::TRACE, "sync_rpc::block_sync::inner_worker");
×
UNCOV
180
        let iter = NonOverlappingIntegerPairIter::new(start_height, end_height + 1, BATCH_SIZE)
×
UNCOV
181
            .map_err(|e| RpcStatus::bad_request(&e))?;
×
UNCOV
182
        task::spawn(
×
UNCOV
183
            async move {
×
UNCOV
184
                // Move token into this task
×
UNCOV
185
                let peer_node_id = session_token;
×
UNCOV
186
                for (start, end) in iter {
×
UNCOV
187
                    if tx.is_closed() {
×
188
                        break;
×
UNCOV
189
                    }
×
190

191
                    // Check for reorgs during sync
UNCOV
192
                    while let Ok(block_event) = block_event_stream.try_recv() {
×
193
                        if let BlockEvent::ValidBlockAdded(_, BlockAddResult::ChainReorg { removed, .. }) |  BlockSyncRewind(removed)  =
×
194
                            &*block_event
×
195
                        {
196
                            //add BlockSyncRewind(Vec<Arc<ChainBlock>>),
197
                            if let Some(reorg_block) = removed
×
198
                                .iter()
×
199
                                // If the reorg happens before the end height of sync we let the peer know that the chain they are syncing with has changed
×
200
                                .find(|block| block.height() <= end_height)
×
201
                            {
202
                                warn!(
×
203
                                    target: LOG_TARGET,
×
204
                                    "Block reorg/rewind detected at height {} during sync, letting the sync peer {} know.",
×
205
                                    reorg_block.height(),
×
206
                                    peer_node_id
207
                                );
208
                                let _result = tx.send(Err(RpcStatus::conflict(&format!(
×
209
                                    "Reorg at height {} detected",
×
210
                                    reorg_block.height()
×
211
                                ))));
×
212
                                return;
×
213
                            }
×
214
                        }
×
215
                    }
216

UNCOV
217
                    debug!(
×
218
                        target: LOG_TARGET,
×
219
                        "Sending blocks #{} - #{} to '{}'", start, end, peer_node_id
×
220
                    );
UNCOV
221
                    let blocks = db
×
UNCOV
222
                        .fetch_blocks(start..=end, true)
×
UNCOV
223
                        .await
×
UNCOV
224
                        .map_err(RpcStatus::log_internal_error(LOG_TARGET));
×
UNCOV
225

×
UNCOV
226
                    if tx.is_closed() {
×
227
                        debug!(
×
228
                            target: LOG_TARGET,
×
229
                            "Block sync session for peer '{}' terminated early", peer_node_id
×
230
                        );
231
                        break;
×
UNCOV
232
                    }
×
233

UNCOV
234
                    match blocks {
×
UNCOV
235
                        Ok(blocks) if blocks.is_empty() => {
×
236
                            break;
×
237
                        },
UNCOV
238
                        Ok(blocks) => {
×
UNCOV
239
                            let blocks = blocks.into_iter().map(|hb| {
×
UNCOV
240
                                let block = hb.into_block();
×
UNCOV
241
                                proto::base_node::BlockBodyResponse::try_from(block).map_err(|e| {
×
242
                                    log::error!(target: LOG_TARGET, "Internal error: {}", e);
×
243
                                    RpcStatus::general_default()
×
UNCOV
244
                                })
×
UNCOV
245
                            });
×
UNCOV
246

×
UNCOV
247
                            // Ensure task stops if the peer prematurely stops their RPC session
×
UNCOV
248
                            if utils::mpsc::send_all(&tx, blocks).await.is_err() {
×
249
                                debug!(
×
250
                                    target: LOG_TARGET,
×
251
                                    "Block sync session for peer '{}' terminated early", peer_node_id
×
252
                                );
253
                                break;
×
UNCOV
254
                            }
×
255
                        },
256
                        Err(err) => {
×
257
                            let _result = tx.send(Err(err)).await;
×
258
                            break;
×
259
                        },
260
                    }
261
                }
262

263
                #[cfg(feature = "metrics")]
UNCOV
264
                metrics::active_sync_peers().dec();
×
UNCOV
265
                debug!(
×
266
                    target: LOG_TARGET,
×
267
                    "Block sync round complete for peer `{}`.", peer_node_id,
×
268
                );
UNCOV
269
            }
×
UNCOV
270
            .instrument(span),
×
UNCOV
271
        );
×
UNCOV
272

×
UNCOV
273
        Ok(Streaming::new(rx))
×
274
    }
2✔
275

276
    #[instrument(level = "trace", name = "sync_rpc::sync_headers", skip(self), err)]
277
    #[allow(clippy::blocks_in_conditions)]
278
    async fn sync_headers(
279
        &self,
280
        request: Request<SyncHeadersRequest>,
281
    ) -> Result<Streaming<proto::core::BlockHeader>, RpcStatus> {
×
282
        let db = self.db();
×
283
        let peer_node_id = request.context().peer_node_id().clone();
×
284
        let message = request.into_message();
×
285
        let hash = message
×
286
            .start_hash
×
287
            .try_into()
×
288
            .map_err(|_| RpcStatus::bad_request(&"Malformed starting hash received".to_string()))?;
×
289
        let start_header = db
×
290
            .fetch_header_by_block_hash(hash)
×
291
            .await
×
292
            .rpc_status_internal_error(LOG_TARGET)?
×
293
            .ok_or_else(|| RpcStatus::not_found("Header not found with given hash"))?;
×
294

295
        let mut count = message.count;
×
296
        if count == 0 {
×
297
            let tip_header = db.fetch_tip_header().await.rpc_status_internal_error(LOG_TARGET)?;
×
298
            count = tip_header.height().saturating_sub(start_header.height);
×
299
        }
×
300
        // if its the start(tip_header == tip), return empty
301
        if count == 0 {
×
302
            return Ok(Streaming::empty());
×
303
        }
×
304

×
305
        #[allow(clippy::cast_possible_truncation)]
×
306
        let chunk_size = cmp::min(100, count) as usize;
×
307
        debug!(
×
308
            target: LOG_TARGET,
×
309
            "Initiating header sync with peer `{}` from height {} to {} (chunk_size={})",
×
310
            peer_node_id,
311
            start_header.height,
312
            count,
313
            chunk_size
314
        );
315

316
        let session_token = self.try_add_exclusive_session(peer_node_id.clone()).await?;
×
317
        let (tx, rx) = mpsc::channel(chunk_size);
×
318
        let span = span!(Level::TRACE, "sync_rpc::sync_headers::inner_worker");
×
319
        let iter = NonOverlappingIntegerPairIter::new(
×
320
            start_header.height.saturating_add(1),
×
321
            start_header.height.saturating_add(count).saturating_add(1),
×
322
            chunk_size,
×
323
        )
×
324
        .map_err(|e| RpcStatus::bad_request(&e))?;
×
325
        task::spawn(
×
326
            async move {
×
327
                // Move token into this task
×
328
                let peer_node_id = session_token;
×
329
                for (start, end) in iter {
×
330
                    if tx.is_closed() {
×
331
                        break;
×
332
                    }
×
333
                    debug!(target: LOG_TARGET, "Sending headers #{} - #{}", start, end);
×
334
                    let headers = db
×
335
                        .fetch_headers(start..=end)
×
336
                        .await
×
337
                        .map_err(RpcStatus::log_internal_error(LOG_TARGET));
×
338

×
339
                    if tx.is_closed() {
×
340
                        debug!(
×
341
                            target: LOG_TARGET,
×
342
                            "Header sync session for peer '{}' terminated early", peer_node_id
×
343
                        );
344
                        break;
×
345
                    }
×
346
                    match headers {
×
347
                        Ok(headers) if headers.is_empty() => {
×
348
                            break;
×
349
                        },
350
                        Ok(headers) => {
×
351
                            let headers = headers.into_iter().map(proto::core::BlockHeader::from).map(Ok);
×
352
                            // Ensure task stops if the peer prematurely stops their RPC session
×
353
                            if utils::mpsc::send_all(&tx, headers).await.is_err() {
×
354
                                break;
×
355
                            }
×
356
                        },
357
                        Err(err) => {
×
358
                            let _result = tx.send(Err(err)).await;
×
359
                            break;
×
360
                        },
361
                    }
362
                }
363

364
                #[cfg(feature = "metrics")]
365
                metrics::active_sync_peers().dec();
×
366
                debug!(
×
367
                    target: LOG_TARGET,
×
368
                    "Header sync round complete for peer `{}`.", peer_node_id,
×
369
                );
370
            }
×
371
            .instrument(span),
×
372
        );
×
373

×
374
        Ok(Streaming::new(rx))
×
375
    }
×
376

377
    #[instrument(level = "trace", skip(self), err)]
378
    #[allow(clippy::blocks_in_conditions)]
379
    async fn get_header_by_height(
380
        &self,
381
        request: Request<u64>,
382
    ) -> Result<Response<proto::core::BlockHeader>, RpcStatus> {
×
383
        let height = request.into_message();
×
384
        let header = self
×
385
            .db()
×
386
            .fetch_header(height)
×
387
            .await
×
388
            .rpc_status_internal_error(LOG_TARGET)?
×
389
            .ok_or_else(|| RpcStatus::not_found(&format!("Header not found at height {}", height)))?;
×
390

391
        Ok(Response::new(header.into()))
×
392
    }
×
393

394
    #[instrument(level = "trace", skip(self), err)]
395
    #[allow(clippy::blocks_in_conditions)]
396
    async fn find_chain_split(
397
        &self,
398
        request: Request<FindChainSplitRequest>,
399
    ) -> Result<Response<FindChainSplitResponse>, RpcStatus> {
×
400
        const MAX_ALLOWED_BLOCK_HASHES: usize = 1000;
401

402
        let peer = request.context().peer_node_id().clone();
×
403
        let message = request.into_message();
×
404
        if message.block_hashes.is_empty() {
×
405
            return Err(RpcStatus::bad_request(
×
406
                "Cannot find chain split because no hashes were sent",
×
407
            ));
×
408
        }
×
409
        if message.block_hashes.len() > MAX_ALLOWED_BLOCK_HASHES {
×
410
            return Err(RpcStatus::bad_request(&format!(
×
411
                "Cannot query more than {} block hashes",
×
412
                MAX_ALLOWED_BLOCK_HASHES,
×
413
            )));
×
414
        }
×
415
        if message.header_count > (HEADER_SYNC_INITIAL_MAX_HEADERS as u64) {
×
416
            return Err(RpcStatus::bad_request(&format!(
×
417
                "Cannot ask for more than {} headers",
×
418
                HEADER_SYNC_INITIAL_MAX_HEADERS,
×
419
            )));
×
420
        }
×
421

×
422
        let db = self.db();
×
423
        let hashes: Vec<FixedHash> = message
×
424
            .block_hashes
×
425
            .into_iter()
×
426
            .map(|hash| hash.try_into().map_err(|_| "Malformed pruned hash".to_string()))
×
427
            .collect::<Result<_, _>>()
×
428
            .map_err(|_| RpcStatus::bad_request(&"Malformed block hash received".to_string()))?;
×
429
        let maybe_headers = db
×
430
            .find_headers_after_hash(hashes, message.header_count)
×
431
            .await
×
432
            .rpc_status_internal_error(LOG_TARGET)?;
×
433
        match maybe_headers {
×
434
            Some((idx, headers)) => {
×
435
                debug!(
×
436
                    target: LOG_TARGET,
×
437
                    "Sending forked index {} and {} header(s) to peer `{}`",
×
438
                    idx,
×
439
                    headers.len(),
×
440
                    peer
441
                );
442

443
                Ok(Response::new(FindChainSplitResponse {
×
444
                    fork_hash_index: idx as u64,
×
445
                    headers: headers.into_iter().map(Into::into).collect(),
×
446
                }))
×
447
            },
448
            None => {
449
                debug!(
×
450
                    target: LOG_TARGET,
×
451
                    "Unable to find link to main chain from peer `{}`", peer
×
452
                );
453
                Err(RpcStatus::not_found("No link found to main chain"))
×
454
            },
455
        }
456
    }
×
457

458
    #[instrument(level = "trace", skip(self), err)]
459
    #[allow(clippy::blocks_in_conditions)]
460
    async fn get_chain_metadata(&self, _: Request<()>) -> Result<Response<proto::base_node::ChainMetadata>, RpcStatus> {
×
461
        let chain_metadata = self
×
462
            .db()
×
463
            .get_chain_metadata()
×
464
            .await
×
465
            .rpc_status_internal_error(LOG_TARGET)?;
×
466
        Ok(Response::new(chain_metadata.into()))
×
467
    }
×
468

469
    #[instrument(level = "trace", skip(self), err)]
470
    #[allow(clippy::blocks_in_conditions)]
471
    async fn sync_kernels(
472
        &self,
473
        request: Request<SyncKernelsRequest>,
474
    ) -> Result<Streaming<proto::types::TransactionKernel>, RpcStatus> {
×
475
        let peer_node_id = request.context().peer_node_id().clone();
×
476
        let req = request.into_message();
×
477
        let (tx, rx) = mpsc::channel(100);
×
478
        let db = self.db();
×
479

480
        let start_header = db
×
481
            .fetch_header_containing_kernel_mmr(req.start)
×
482
            .await
×
483
            .rpc_status_internal_error(LOG_TARGET)?
×
484
            .into_header();
×
485
        let hash = req
×
486
            .end_header_hash
×
487
            .try_into()
×
488
            .map_err(|_| RpcStatus::bad_request(&"Malformed end hash received".to_string()))?;
×
489
        let end_header = db
×
490
            .fetch_header_by_block_hash(hash)
×
491
            .await
×
492
            .rpc_status_internal_error(LOG_TARGET)?
×
493
            .ok_or_else(|| RpcStatus::not_found("Unknown end header"))?;
×
494

495
        let mut current_height = start_header.height;
×
496
        let end_height = end_header.height;
×
497
        let mut current_mmr_position = start_header.kernel_mmr_size;
×
498
        let mut current_header_hash = start_header.hash();
×
499

×
500
        if current_height > end_height {
×
501
            return Err(RpcStatus::bad_request("start header height is after end header"));
×
502
        }
×
503

504
        let session_token = self.try_add_exclusive_session(peer_node_id).await?;
×
505
        task::spawn(async move {
×
506
            // Move session token into task
×
507
            let peer_node_id = session_token;
×
508
            while current_height <= end_height {
×
509
                if tx.is_closed() {
×
510
                    break;
×
511
                }
×
512
                let res = db
×
513
                    .fetch_kernels_in_block(current_header_hash)
×
514
                    .await
×
515
                    .map_err(RpcStatus::log_internal_error(LOG_TARGET));
×
516

×
517
                if tx.is_closed() {
×
518
                    debug!(
×
519
                        target: LOG_TARGET,
×
520
                        "Kernel sync session for peer '{}' terminated early", peer_node_id
×
521
                    );
522
                    break;
×
523
                }
×
524

525
                match res {
×
526
                    Ok(kernels) if kernels.is_empty() => {
×
527
                        let _result = tx
×
528
                            .send(Err(RpcStatus::general(&format!(
×
529
                                "No kernels in block {}",
×
530
                                current_header_hash.to_hex()
×
531
                            ))))
×
532
                            .await;
×
533
                        break;
×
534
                    },
535
                    Ok(kernels) => {
×
536
                        debug!(
×
537
                            target: LOG_TARGET,
×
538
                            "Streaming kernels {} to {}",
×
539
                            current_mmr_position,
×
540
                            current_mmr_position + kernels.len() as u64
×
541
                        );
542
                        current_mmr_position += kernels.len() as u64;
×
543
                        let kernels = kernels.into_iter().map(proto::types::TransactionKernel::from).map(Ok);
×
544
                        // Ensure task stops if the peer prematurely stops their RPC session
×
545
                        if utils::mpsc::send_all(&tx, kernels).await.is_err() {
×
546
                            break;
×
547
                        }
×
548
                    },
549
                    Err(err) => {
×
550
                        let _result = tx.send(Err(err)).await;
×
551
                        break;
×
552
                    },
553
                }
554

555
                current_height += 1;
×
556

×
557
                if current_height <= end_height {
×
558
                    let res = db
×
559
                        .fetch_header(current_height)
×
560
                        .await
×
561
                        .map_err(RpcStatus::log_internal_error(LOG_TARGET));
×
562
                    match res {
×
563
                        Ok(Some(header)) => {
×
564
                            current_header_hash = header.hash();
×
565
                        },
×
566
                        Ok(None) => {
567
                            let _result = tx
×
568
                                .send(Err(RpcStatus::not_found(&format!(
×
569
                                    "Could not find header #{} while streaming UTXOs after position {}",
×
570
                                    current_height, current_mmr_position
×
571
                                ))))
×
572
                                .await;
×
573
                            break;
×
574
                        },
575
                        Err(err) => {
×
576
                            error!(target: LOG_TARGET, "DB error while streaming kernels: {}", err);
×
577
                            let _result = tx
×
578
                                .send(Err(RpcStatus::general("DB error while streaming kernels")))
×
579
                                .await;
×
580
                            break;
×
581
                        },
582
                    }
583
                }
×
584
            }
585

586
            #[cfg(feature = "metrics")]
587
            metrics::active_sync_peers().dec();
×
588
            debug!(
×
589
                target: LOG_TARGET,
×
590
                "Kernel sync round complete for peer `{}`.", peer_node_id,
×
591
            );
592
        });
×
593
        Ok(Streaming::new(rx))
×
594
    }
×
595

596
    #[instrument(level = "trace", skip(self), err)]
597
    #[allow(clippy::blocks_in_conditions)]
598
    async fn sync_utxos(&self, request: Request<SyncUtxosRequest>) -> Result<Streaming<SyncUtxosResponse>, RpcStatus> {
1✔
599
        let req = request.message();
1✔
600
        let peer_node_id = request.context().peer_node_id();
1✔
601
        debug!(
1✔
602
            target: LOG_TARGET,
×
603
            "Received sync_utxos-{} request from header {} to {}",
×
604
            peer_node_id,
×
605
            req.start_header_hash.to_hex(),
×
606
            req.end_header_hash.to_hex(),
×
607
        );
608

609
        let session_token = self.try_add_exclusive_session(peer_node_id.clone()).await?;
1✔
610
        let (tx, rx) = mpsc::channel(200);
1✔
611
        let task = SyncUtxosTask::new(self.db(), session_token);
1✔
612
        task.run(request, tx).await?;
1✔
613

614
        Ok(Streaming::new(rx))
×
615
    }
2✔
616
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc