• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tari-project / tari / 16272458029

14 Jul 2025 04:32PM UTC coverage: 57.167% (-0.9%) from 58.047%
16272458029

push

github

web-flow
feat: modify soft disconnect criteria (#7307)

Description
---
We can be more efficient with soft disconnects when we compare against
expected RPC sessions and substream counts. This PR adds finer
discernment when doing soft peer disconnects.

Motivation and Context
---
The health check opens 2 substreams and 0 PRC sessions - that should
result in a disconnect if those are the only opened resources.

How Has This Been Tested?
---
System-level testing
```rust
2025-07-11 13:52:26.703289300 [comms::connection_manager::peer_connection] TRACE Hard disconnect - requester: 'Health check', peer: `d7c289e9e3c8377705ce599a96`, RPC clients: 0, substreams 2
2025-07-11 13:52:26.705658100 [comms::connection_manager::peer_connection] TRACE Soft disconnect - requester: 'Health check', peer: `0984896e74022c442c1034852c`, RPC clients: 1, substreams 3, NOT disconnecting
2025-07-11 13:52:26.705735900 [comms::connection_manager::peer_connection] TRACE Hard disconnect - requester: 'Health check', peer: `d025bc9e4bd423a9b304c491b8`, RPC clients: 0, substreams 2
2025-07-11 13:52:26.707647400 [comms::connection_manager::peer_connection] TRACE Hard disconnect - requester: 'Health check', peer: `51af08aff11f7129b4681d9950`, RPC clients: 0, substreams 2
```

What process can a PR reviewer use to test or verify this change?
---
Code review

<!-- Checklist -->
<!-- 1. Is the title of your PR in the form that would make nice release
notes? The title, excluding the conventional commit
tag, will be included exactly as is in the CHANGELOG, so please think
about it carefully. -->


Breaking Changes
---

- [x] None
- [ ] Requires data directory on base node to be deleted
- [ ] Requires hard fork
- [ ] Other - Please specify

<!-- Does this include a breaking change? If so, include this line as a
footer -->
<!-- BREAKING CHANGE: Description what the user should do, e.g. delete a
database, resync the chain -->


<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
##... (continued)

31 of 46 new or added lines in 6 files covered. (67.39%)

1102 existing lines in 27 files now uncovered.

68701 of 120177 relevant lines covered (57.17%)

226749.69 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

12.29
/base_layer/core/src/base_node/sync/rpc/service.rs
1
//  Copyright 2020, The Tari Project
2
//
3
//  Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
4
//  following conditions are met:
5
//
6
//  1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
7
//  disclaimer.
8
//
9
//  2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
10
//  following disclaimer in the documentation and/or other materials provided with the distribution.
11
//
12
//  3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
13
//  products derived from this software without specific prior written permission.
14
//
15
//  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
16
//  INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17
//  DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
18
//  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
19
//  SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
20
//  WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
21
//  USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
22

23
use std::{
24
    cmp,
25
    convert::{TryFrom, TryInto},
26
    sync::{Arc, Weak},
27
};
28

29
use log::*;
30
use tari_common_types::types::FixedHash;
31
use tari_comms::{
32
    peer_manager::NodeId,
33
    protocol::rpc::{Request, Response, RpcStatus, RpcStatusResultExt, Streaming},
34
    utils,
35
};
36
use tari_utilities::hex::Hex;
37
use tokio::{
38
    sync::{mpsc, Mutex},
39
    task,
40
};
41
use tracing::{instrument, span, Instrument, Level};
42

43
#[cfg(feature = "metrics")]
44
use crate::base_node::metrics;
45
use crate::{
46
    base_node::{
47
        comms_interface::{BlockEvent, BlockEvent::BlockSyncRewind},
48
        sync::{
49
            header_sync::HEADER_SYNC_INITIAL_MAX_HEADERS,
50
            rpc::{sync_utxos_task::SyncUtxosTask, BaseNodeSyncService},
51
        },
52
        LocalNodeCommsInterface,
53
    },
54
    chain_storage::{async_db::AsyncBlockchainDb, BlockAddResult, BlockchainBackend},
55
    iterators::NonOverlappingIntegerPairIter,
56
    proto,
57
    proto::base_node::{
58
        FindChainSplitRequest,
59
        FindChainSplitResponse,
60
        SyncBlocksRequest,
61
        SyncHeadersRequest,
62
        SyncKernelsRequest,
63
        SyncUtxosRequest,
64
        SyncUtxosResponse,
65
    },
66
};
67

68
const LOG_TARGET: &str = "c::base_node::sync_rpc";
69

70
pub struct BaseNodeSyncRpcService<B> {
71
    db: AsyncBlockchainDb<B>,
72
    active_sessions: Mutex<Vec<Weak<NodeId>>>,
73
    base_node_service: LocalNodeCommsInterface,
74
}
75

76
impl<B: BlockchainBackend + 'static> BaseNodeSyncRpcService<B> {
77
    pub fn new(db: AsyncBlockchainDb<B>, base_node_service: LocalNodeCommsInterface) -> Self {
5✔
78
        Self {
5✔
79
            db,
5✔
80
            active_sessions: Mutex::new(Vec::new()),
5✔
81
            base_node_service,
5✔
82
        }
5✔
83
    }
5✔
84

85
    #[inline]
86
    fn db(&self) -> AsyncBlockchainDb<B> {
2✔
87
        self.db.clone()
2✔
88
    }
2✔
89

90
    pub async fn try_add_exclusive_session(&self, peer: NodeId) -> Result<Arc<NodeId>, RpcStatus> {
1✔
91
        let mut lock = self.active_sessions.lock().await;
1✔
92
        *lock = lock.drain(..).filter(|l| l.strong_count() > 0).collect();
1✔
93
        debug!(target: LOG_TARGET, "Number of active sync sessions: {}", lock.len());
1✔
94

95
        if lock.iter().any(|p| p.upgrade().filter(|p| **p == peer).is_some()) {
1✔
96
            return Err(RpcStatus::forbidden(
×
97
                "Existing sync session found for this client. Only a single session is permitted",
×
98
            ));
×
99
        }
1✔
100

1✔
101
        let token = Arc::new(peer);
1✔
102
        lock.push(Arc::downgrade(&token));
1✔
103
        #[allow(clippy::cast_possible_wrap)]
1✔
104
        #[cfg(feature = "metrics")]
1✔
105
        metrics::active_sync_peers().set(lock.len() as i64);
1✔
106
        Ok(token)
1✔
107
    }
1✔
108
}
109

110
#[tari_comms::async_trait]
111
impl<B: BlockchainBackend + 'static> BaseNodeSyncService for BaseNodeSyncRpcService<B> {
112
    #[instrument(level = "trace", name = "sync_rpc::sync_blocks", skip(self), err)]
113
    #[allow(clippy::blocks_in_conditions)]
114
    async fn sync_blocks(
115
        &self,
116
        request: Request<SyncBlocksRequest>,
117
    ) -> Result<Streaming<proto::base_node::BlockBodyResponse>, RpcStatus> {
1✔
118
        let peer_node_id = request.context().peer_node_id().clone();
1✔
119
        let message = request.into_message();
1✔
120
        let mut block_event_stream = self.base_node_service.get_block_event_stream();
1✔
121

1✔
122
        let db = self.db();
1✔
123
        let hash = message
1✔
124
            .start_hash
1✔
125
            .try_into()
1✔
126
            .map_err(|_| RpcStatus::bad_request(&"Malformed starting hash received".to_string()))?;
1✔
127
        if db.fetch_block_by_hash(hash, true).await.is_err() {
1✔
128
            return Err(RpcStatus::not_found("Requested start block sync hash was not found"));
×
129
        }
1✔
130
        let start_header = db
1✔
131
            .fetch_header_by_block_hash(hash)
1✔
132
            .await
1✔
133
            .rpc_status_internal_error(LOG_TARGET)?
1✔
134
            .ok_or_else(|| RpcStatus::not_found("Header not found with given hash"))?;
1✔
135

UNCOV
136
        let metadata = db.get_chain_metadata().await.rpc_status_internal_error(LOG_TARGET)?;
×
137

UNCOV
138
        let start_height = start_header.height + 1;
×
UNCOV
139
        if start_height < metadata.pruned_height() {
×
140
            return Err(RpcStatus::bad_request(&format!(
×
141
                "Requested full block body at height {}, however this node has an effective pruned height of {}",
×
142
                start_height,
×
143
                metadata.pruned_height()
×
144
            )));
×
UNCOV
145
        }
×
146

UNCOV
147
        let hash = message
×
UNCOV
148
            .end_hash
×
UNCOV
149
            .try_into()
×
UNCOV
150
            .map_err(|_| RpcStatus::bad_request(&"Malformed end hash received".to_string()))?;
×
UNCOV
151
        if db.fetch_block_by_hash(hash, true).await.is_err() {
×
152
            return Err(RpcStatus::not_found("Requested end block sync hash was not found"));
×
UNCOV
153
        }
×
154

UNCOV
155
        let end_header = db
×
UNCOV
156
            .fetch_header_by_block_hash(hash)
×
UNCOV
157
            .await
×
UNCOV
158
            .rpc_status_internal_error(LOG_TARGET)?
×
UNCOV
159
            .ok_or_else(|| RpcStatus::not_found("Requested end block sync hash was not found"))?;
×
160

UNCOV
161
        let end_height = end_header.height;
×
UNCOV
162
        if start_height > end_height {
×
UNCOV
163
            return Err(RpcStatus::bad_request(&format!(
×
UNCOV
164
                "Start block #{} is higher than end block #{}",
×
UNCOV
165
                start_height, end_height
×
UNCOV
166
            )));
×
UNCOV
167
        }
×
UNCOV
168

×
UNCOV
169
        debug!(
×
170
            target: LOG_TARGET,
×
171
            "Initiating block sync with peer `{}` from height {} to {}", peer_node_id, start_height, end_height,
×
172
        );
173

UNCOV
174
        let session_token = self.try_add_exclusive_session(peer_node_id).await?;
×
175
        // Number of blocks to load and push to the stream before loading the next batch
176
        const BATCH_SIZE: usize = 2;
UNCOV
177
        let (tx, rx) = mpsc::channel(BATCH_SIZE);
×
178

UNCOV
179
        let span = span!(Level::TRACE, "sync_rpc::block_sync::inner_worker");
×
UNCOV
180
        let iter = NonOverlappingIntegerPairIter::new(start_height, end_height + 1, BATCH_SIZE)
×
UNCOV
181
            .map_err(|e| RpcStatus::bad_request(&e))?;
×
UNCOV
182
        task::spawn(
×
UNCOV
183
            async move {
×
UNCOV
184
                // Move token into this task
×
UNCOV
185
                let peer_node_id = session_token;
×
UNCOV
186
                for (start, end) in iter {
×
UNCOV
187
                    if tx.is_closed() {
×
188
                        break;
×
UNCOV
189
                    }
×
190

191
                    // Check for reorgs during sync
UNCOV
192
                    while let Ok(block_event) = block_event_stream.try_recv() {
×
193
                        if let BlockEvent::ValidBlockAdded(_, BlockAddResult::ChainReorg { removed, .. }) |  BlockSyncRewind(removed)  =
×
194
                            &*block_event
×
195
                        {
196
                            //add BlockSyncRewind(Vec<Arc<ChainBlock>>),
197
                            if let Some(reorg_block) = removed
×
198
                                .iter()
×
199
                                // If the reorg happens before the end height of sync we let the peer know that the chain they are syncing with has changed
×
200
                                .find(|block| block.height() <= end_height)
×
201
                            {
202
                                warn!(
×
203
                                    target: LOG_TARGET,
×
204
                                    "Block reorg/rewind detected at height {} during sync, letting the sync peer {} know.",
×
205
                                    reorg_block.height(),
×
206
                                    peer_node_id
207
                                );
208
                                let _result = tx.send(Err(RpcStatus::conflict(&format!(
×
209
                                    "Reorg at height {} detected",
×
210
                                    reorg_block.height()
×
211
                                ))));
×
212
                                return;
×
213
                            }
×
214
                        }
×
215
                    }
216

UNCOV
217
                    debug!(
×
218
                        target: LOG_TARGET,
×
219
                        "Sending blocks #{} - #{} to '{}'", start, end, peer_node_id
×
220
                    );
UNCOV
221
                    let blocks = db
×
UNCOV
222
                        .fetch_blocks(start..=end, true)
×
UNCOV
223
                        .await
×
UNCOV
224
                        .map_err(RpcStatus::log_internal_error(LOG_TARGET));
×
UNCOV
225

×
UNCOV
226
                    if tx.is_closed() {
×
227
                        debug!(
×
228
                            target: LOG_TARGET,
×
229
                            "Block sync session for peer '{}' terminated early", peer_node_id
×
230
                        );
231
                        break;
×
UNCOV
232
                    }
×
233

UNCOV
234
                    match blocks {
×
UNCOV
235
                        Ok(blocks) if blocks.is_empty() => {
×
236
                            break;
×
237
                        },
UNCOV
238
                        Ok(blocks) => {
×
UNCOV
239
                            let blocks = blocks.into_iter().map(|hb| {
×
UNCOV
240
                                let block = hb.into_block();
×
UNCOV
241
                                proto::base_node::BlockBodyResponse::try_from(block).map_err(|e| {
×
242
                                    log::error!(target: LOG_TARGET, "Internal error: {}", e);
×
243
                                    RpcStatus::general_default()
×
UNCOV
244
                                })
×
UNCOV
245
                            });
×
UNCOV
246

×
UNCOV
247
                            // Ensure task stops if the peer prematurely stops their RPC session
×
UNCOV
248
                            if utils::mpsc::send_all(&tx, blocks).await.is_err() {
×
249
                                debug!(
×
250
                                    target: LOG_TARGET,
×
251
                                    "Block sync session for peer '{}' terminated early", peer_node_id
×
252
                                );
253
                                break;
×
UNCOV
254
                            }
×
255
                        },
256
                        Err(err) => {
×
257
                            let _result = tx.send(Err(err)).await;
×
258
                            break;
×
259
                        },
260
                    }
261
                }
262

263
                #[cfg(feature = "metrics")]
UNCOV
264
                metrics::active_sync_peers().dec();
×
UNCOV
265
                debug!(
×
266
                    target: LOG_TARGET,
×
267
                    "Block sync round complete for peer `{}`.", peer_node_id,
×
268
                );
UNCOV
269
            }
×
UNCOV
270
            .instrument(span),
×
UNCOV
271
        );
×
UNCOV
272

×
UNCOV
273
        Ok(Streaming::new(rx))
×
274
    }
2✔
275

276
    #[instrument(level = "trace", name = "sync_rpc::sync_headers", skip(self), err)]
277
    #[allow(clippy::blocks_in_conditions)]
278
    async fn sync_headers(
279
        &self,
280
        request: Request<SyncHeadersRequest>,
281
    ) -> Result<Streaming<proto::core::BlockHeader>, RpcStatus> {
×
282
        let db = self.db();
×
283
        let peer_node_id = request.context().peer_node_id().clone();
×
284
        let message = request.into_message();
×
285
        let hash = message
×
286
            .start_hash
×
287
            .try_into()
×
288
            .map_err(|_| RpcStatus::bad_request(&"Malformed starting hash received".to_string()))?;
×
289
        let start_header = db
×
290
            .fetch_header_by_block_hash(hash)
×
291
            .await
×
292
            .rpc_status_internal_error(LOG_TARGET)?
×
293
            .ok_or_else(|| RpcStatus::not_found("Header not found with given hash"))?;
×
294

295
        let mut count = message.count;
×
296
        if count == 0 {
×
297
            let tip_header = db.fetch_tip_header().await.rpc_status_internal_error(LOG_TARGET)?;
×
298
            count = tip_header.height().saturating_sub(start_header.height);
×
299
        }
×
300
        // if its the start(tip_header == tip), return empty
301
        if count == 0 {
×
302
            return Ok(Streaming::empty());
×
303
        }
×
304

×
305
        #[allow(clippy::cast_possible_truncation)]
×
306
        let chunk_size = cmp::min(100, count) as usize;
×
307
        debug!(
×
308
            target: LOG_TARGET,
×
309
            "Initiating header sync with peer `{}` from height {} to {} (chunk_size={})",
×
310
            peer_node_id,
311
            start_header.height,
312
            count,
313
            chunk_size
314
        );
315

316
        let session_token = self.try_add_exclusive_session(peer_node_id.clone()).await?;
×
317
        let (tx, rx) = mpsc::channel(chunk_size);
×
318
        let span = span!(Level::TRACE, "sync_rpc::sync_headers::inner_worker");
×
319
        let iter = NonOverlappingIntegerPairIter::new(
×
320
            start_header.height.saturating_add(1),
×
321
            start_header.height.saturating_add(count).saturating_add(1),
×
322
            chunk_size,
×
323
        )
×
324
        .map_err(|e| RpcStatus::bad_request(&e))?;
×
325
        task::spawn(
×
326
            async move {
×
327
                // Move token into this task
×
328
                let peer_node_id = session_token;
×
329
                for (start, end) in iter {
×
330
                    if tx.is_closed() {
×
331
                        break;
×
332
                    }
×
333
                    debug!(target: LOG_TARGET, "Sending headers #{} - #{}", start, end);
×
334
                    let headers = db
×
335
                        .fetch_headers(start..=end)
×
336
                        .await
×
337
                        .map_err(RpcStatus::log_internal_error(LOG_TARGET));
×
338

×
339
                    if tx.is_closed() {
×
340
                        debug!(
×
341
                            target: LOG_TARGET,
×
342
                            "Header sync session for peer '{}' terminated early", peer_node_id
×
343
                        );
344
                        break;
×
345
                    }
×
346
                    match headers {
×
347
                        Ok(headers) if headers.is_empty() => {
×
348
                            break;
×
349
                        },
350
                        Ok(headers) => {
×
351
                            let headers = headers.into_iter().map(proto::core::BlockHeader::from).map(Ok);
×
352
                            // Ensure task stops if the peer prematurely stops their RPC session
×
353
                            if utils::mpsc::send_all(&tx, headers).await.is_err() {
×
354
                                break;
×
355
                            }
×
356
                        },
357
                        Err(err) => {
×
358
                            let _result = tx.send(Err(err)).await;
×
359
                            break;
×
360
                        },
361
                    }
362
                }
363

364
                #[cfg(feature = "metrics")]
365
                metrics::active_sync_peers().dec();
×
366
                debug!(
×
367
                    target: LOG_TARGET,
×
368
                    "Header sync round complete for peer `{}`.", peer_node_id,
×
369
                );
370
            }
×
371
            .instrument(span),
×
372
        );
×
373

×
374
        Ok(Streaming::new(rx))
×
375
    }
×
376

377
    #[instrument(level = "trace", skip(self), err)]
378
    #[allow(clippy::blocks_in_conditions)]
379
    async fn get_header_by_height(
380
        &self,
381
        request: Request<u64>,
382
    ) -> Result<Response<proto::core::BlockHeader>, RpcStatus> {
×
383
        let height = request.into_message();
×
384
        let header = self
×
385
            .db()
×
386
            .fetch_header(height)
×
387
            .await
×
388
            .rpc_status_internal_error(LOG_TARGET)?
×
389
            .ok_or_else(|| RpcStatus::not_found(&format!("Header not found at height {}", height)))?;
×
390

391
        Ok(Response::new(header.into()))
×
392
    }
×
393

394
    #[instrument(level = "trace", skip(self), err)]
395
    #[allow(clippy::blocks_in_conditions)]
396
    async fn find_chain_split(
397
        &self,
398
        request: Request<FindChainSplitRequest>,
399
    ) -> Result<Response<FindChainSplitResponse>, RpcStatus> {
×
400
        const MAX_ALLOWED_BLOCK_HASHES: usize = 1000;
401

402
        let peer = request.context().peer_node_id().clone();
×
403
        let message = request.into_message();
×
404
        if message.block_hashes.is_empty() {
×
405
            return Err(RpcStatus::bad_request(
×
406
                "Cannot find chain split because no hashes were sent",
×
407
            ));
×
408
        }
×
409
        if message.block_hashes.len() > MAX_ALLOWED_BLOCK_HASHES {
×
410
            return Err(RpcStatus::bad_request(&format!(
×
411
                "Cannot query more than {} block hashes",
×
412
                MAX_ALLOWED_BLOCK_HASHES,
×
413
            )));
×
414
        }
×
415
        if message.header_count > (HEADER_SYNC_INITIAL_MAX_HEADERS as u64) {
×
416
            return Err(RpcStatus::bad_request(&format!(
×
417
                "Cannot ask for more than {} headers",
×
418
                HEADER_SYNC_INITIAL_MAX_HEADERS,
×
419
            )));
×
420
        }
×
421

×
422
        let db = self.db();
×
423
        let hashes: Vec<FixedHash> = message
×
424
            .block_hashes
×
425
            .into_iter()
×
426
            .map(|hash| hash.try_into().map_err(|_| "Malformed pruned hash".to_string()))
×
427
            .collect::<Result<_, _>>()
×
428
            .map_err(|_| RpcStatus::bad_request(&"Malformed block hash received".to_string()))?;
×
429
        let maybe_headers = db
×
430
            .find_headers_after_hash(hashes, message.header_count)
×
431
            .await
×
432
            .rpc_status_internal_error(LOG_TARGET)?;
×
433
        match maybe_headers {
×
434
            Some((idx, headers)) => {
×
435
                debug!(
×
436
                    target: LOG_TARGET,
×
437
                    "Sending forked index {} and {} header(s) to peer `{}`",
×
438
                    idx,
×
439
                    headers.len(),
×
440
                    peer
441
                );
442

443
                Ok(Response::new(FindChainSplitResponse {
×
444
                    fork_hash_index: idx as u64,
×
445
                    headers: headers.into_iter().map(Into::into).collect(),
×
446
                }))
×
447
            },
448
            None => {
449
                debug!(
×
450
                    target: LOG_TARGET,
×
451
                    "Unable to find link to main chain from peer `{}`", peer
×
452
                );
453
                Err(RpcStatus::not_found("No link found to main chain"))
×
454
            },
455
        }
456
    }
×
457

458
    #[instrument(level = "trace", skip(self), err)]
459
    #[allow(clippy::blocks_in_conditions)]
460
    async fn get_chain_metadata(&self, _: Request<()>) -> Result<Response<proto::base_node::ChainMetadata>, RpcStatus> {
×
461
        let chain_metadata = self
×
462
            .db()
×
463
            .get_chain_metadata()
×
464
            .await
×
465
            .rpc_status_internal_error(LOG_TARGET)?;
×
466
        Ok(Response::new(chain_metadata.into()))
×
467
    }
×
468

469
    #[instrument(level = "trace", skip(self), err)]
470
    #[allow(clippy::blocks_in_conditions)]
471
    async fn sync_kernels(
472
        &self,
473
        request: Request<SyncKernelsRequest>,
474
    ) -> Result<Streaming<proto::types::TransactionKernel>, RpcStatus> {
×
475
        let peer_node_id = request.context().peer_node_id().clone();
×
476
        let req = request.into_message();
×
477
        let (tx, rx) = mpsc::channel(100);
×
478
        let db = self.db();
×
479

480
        let start_header = db
×
481
            .fetch_header_containing_kernel_mmr(req.start)
×
482
            .await
×
483
            .rpc_status_internal_error(LOG_TARGET)?
×
484
            .into_header();
×
485
        let hash = req
×
486
            .end_header_hash
×
487
            .try_into()
×
488
            .map_err(|_| RpcStatus::bad_request(&"Malformed end hash received".to_string()))?;
×
489
        let end_header = db
×
490
            .fetch_header_by_block_hash(hash)
×
491
            .await
×
492
            .rpc_status_internal_error(LOG_TARGET)?
×
493
            .ok_or_else(|| RpcStatus::not_found("Unknown end header"))?;
×
494

495
        let mut current_height = start_header.height;
×
496
        let end_height = end_header.height;
×
497
        let mut current_mmr_position = start_header.kernel_mmr_size;
×
498
        let mut current_header_hash = start_header.hash();
×
499

×
500
        if current_height > end_height {
×
501
            return Err(RpcStatus::bad_request("start header height is after end header"));
×
502
        }
×
503

504
        let session_token = self.try_add_exclusive_session(peer_node_id).await?;
×
505
        task::spawn(async move {
×
506
            // Move session token into task
×
507
            let peer_node_id = session_token;
×
508
            while current_height <= end_height {
×
509
                if tx.is_closed() {
×
510
                    break;
×
511
                }
×
512
                let res = db
×
513
                    .fetch_kernels_in_block(current_header_hash)
×
514
                    .await
×
515
                    .map_err(RpcStatus::log_internal_error(LOG_TARGET));
×
516

×
517
                if tx.is_closed() {
×
518
                    debug!(
×
519
                        target: LOG_TARGET,
×
520
                        "Kernel sync session for peer '{}' terminated early", peer_node_id
×
521
                    );
522
                    break;
×
523
                }
×
524

525
                match res {
×
526
                    Ok(kernels) if kernels.is_empty() => {
×
527
                        let _result = tx
×
528
                            .send(Err(RpcStatus::general(&format!(
×
529
                                "No kernels in block {}",
×
530
                                current_header_hash.to_hex()
×
531
                            ))))
×
532
                            .await;
×
533
                        break;
×
534
                    },
535
                    Ok(kernels) => {
×
536
                        debug!(
×
537
                            target: LOG_TARGET,
×
538
                            "Streaming kernels {} to {}",
×
539
                            current_mmr_position,
×
540
                            current_mmr_position + kernels.len() as u64
×
541
                        );
542
                        current_mmr_position += kernels.len() as u64;
×
543
                        let kernels = kernels.into_iter().map(proto::types::TransactionKernel::from).map(Ok);
×
544
                        // Ensure task stops if the peer prematurely stops their RPC session
×
545
                        if utils::mpsc::send_all(&tx, kernels).await.is_err() {
×
546
                            break;
×
547
                        }
×
548
                    },
549
                    Err(err) => {
×
550
                        let _result = tx.send(Err(err)).await;
×
551
                        break;
×
552
                    },
553
                }
554

555
                current_height += 1;
×
556

×
557
                if current_height <= end_height {
×
558
                    let res = db
×
559
                        .fetch_header(current_height)
×
560
                        .await
×
561
                        .map_err(RpcStatus::log_internal_error(LOG_TARGET));
×
562
                    match res {
×
563
                        Ok(Some(header)) => {
×
564
                            current_header_hash = header.hash();
×
565
                        },
×
566
                        Ok(None) => {
567
                            let _result = tx
×
568
                                .send(Err(RpcStatus::not_found(&format!(
×
569
                                    "Could not find header #{} while streaming UTXOs after position {}",
×
570
                                    current_height, current_mmr_position
×
571
                                ))))
×
572
                                .await;
×
573
                            break;
×
574
                        },
575
                        Err(err) => {
×
576
                            error!(target: LOG_TARGET, "DB error while streaming kernels: {}", err);
×
577
                            let _result = tx
×
578
                                .send(Err(RpcStatus::general("DB error while streaming kernels")))
×
579
                                .await;
×
580
                            break;
×
581
                        },
582
                    }
583
                }
×
584
            }
585

586
            #[cfg(feature = "metrics")]
587
            metrics::active_sync_peers().dec();
×
588
            debug!(
×
589
                target: LOG_TARGET,
×
590
                "Kernel sync round complete for peer `{}`.", peer_node_id,
×
591
            );
592
        });
×
593
        Ok(Streaming::new(rx))
×
594
    }
×
595

596
    #[instrument(level = "trace", skip(self), err)]
597
    #[allow(clippy::blocks_in_conditions)]
598
    async fn sync_utxos(&self, request: Request<SyncUtxosRequest>) -> Result<Streaming<SyncUtxosResponse>, RpcStatus> {
1✔
599
        let req = request.message();
1✔
600
        let peer_node_id = request.context().peer_node_id();
1✔
601
        debug!(
1✔
602
            target: LOG_TARGET,
×
603
            "Received sync_utxos-{} request from header {} to {}",
×
604
            peer_node_id,
×
605
            req.start_header_hash.to_hex(),
×
606
            req.end_header_hash.to_hex(),
×
607
        );
608

609
        let session_token = self.try_add_exclusive_session(peer_node_id.clone()).await?;
1✔
610
        let (tx, rx) = mpsc::channel(200);
1✔
611
        let task = SyncUtxosTask::new(self.db(), session_token);
1✔
612
        task.run(request, tx).await?;
1✔
613

614
        Ok(Streaming::new(rx))
×
615
    }
2✔
616
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc