• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tari-project / tari / 19333426770

13 Nov 2025 01:37PM UTC coverage: 50.497% (-1.0%) from 51.536%
19333426770

push

github

web-flow
feat: add noreadahead to lmdb as config option  (#7581)

Description
---
Added `open::NORDAHEAD` flag to the LMDB builder as config option.

_"**Quote:** Most operating systems perform readahead on read requests
by default. This option turns it off if the OS supports it. Turning it
off may help random read performance when the DB is larger than RAM and
system RAM is full. The option is not implemented on Windows."_

As per LMDB documentation, this might help with the seed nodes' RAM
usage when needed, or for other Linux users.

Motivation and Context
---
See #7578 for background.

How Has This Been Tested?
---
Not tested.

What process can a PR reviewer use to test or verify this change?
---
Code review.

<!-- Checklist -->
<!-- 1. Is the title of your PR in the form that would make nice release
notes? The title, excluding the conventional commit
tag, will be included exactly as is in the CHANGELOG, so please think
about it carefully. -->


Breaking Changes
---

- [x] None
- [ ] Requires data directory on base node to be deleted
- [ ] Requires hard fork
- [ ] Other - Please specify

<!-- Does this include a breaking change? If so, include this line as a
footer -->
<!-- BREAKING CHANGE: Description what the user should do, e.g. delete a
database, resync the chain -->


<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

* **New Features**
* Added a configuration option to control OS readahead behavior for
database storage; can be disabled for performance tuning (defaults
preserve prior behavior).
* **Chores**
  * Storage initialization now respects the new readahead setting.
* **Tests**
  * Internal tests updated to reflect the new configuration parameter.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

21 of 22 new or added lines in 2 files covered. (95.45%)

1256 existing lines in 34 files now uncovered.

57979 of 114817 relevant lines covered (50.5%)

325976.36 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

12.27
/base_layer/core/src/base_node/sync/rpc/service.rs
1
//  Copyright 2020, The Tari Project
2
//
3
//  Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
4
//  following conditions are met:
5
//
6
//  1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
7
//  disclaimer.
8
//
9
//  2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
10
//  following disclaimer in the documentation and/or other materials provided with the distribution.
11
//
12
//  3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
13
//  products derived from this software without specific prior written permission.
14
//
15
//  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
16
//  INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17
//  DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
18
//  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
19
//  SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
20
//  WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
21
//  USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
22

23
use std::{
24
    cmp,
25
    convert::{TryFrom, TryInto},
26
    sync::{Arc, Weak},
27
};
28

29
use log::*;
30
use tari_common_types::types::FixedHash;
31
use tari_comms::{
32
    peer_manager::NodeId,
33
    protocol::rpc::{Request, Response, RpcStatus, RpcStatusResultExt, Streaming},
34
    utils,
35
};
36
use tari_utilities::hex::Hex;
37
use tokio::{
38
    sync::{mpsc, Mutex},
39
    task,
40
};
41
use tracing::{instrument, span, Instrument, Level};
42

43
#[cfg(feature = "metrics")]
44
use crate::base_node::metrics;
45
use crate::{
46
    base_node::{
47
        comms_interface::{BlockEvent, BlockEvent::BlockSyncRewind},
48
        sync::{
49
            header_sync::HEADER_SYNC_INITIAL_MAX_HEADERS,
50
            rpc::{sync_utxos_task::SyncUtxosTask, BaseNodeSyncService},
51
        },
52
        LocalNodeCommsInterface,
53
    },
54
    chain_storage::{async_db::AsyncBlockchainDb, BlockAddResult, BlockchainBackend},
55
    iterators::NonOverlappingIntegerPairIter,
56
    proto,
57
    proto::base_node::{
58
        FindChainSplitRequest,
59
        FindChainSplitResponse,
60
        SyncBlocksRequest,
61
        SyncHeadersRequest,
62
        SyncKernelsRequest,
63
        SyncUtxosRequest,
64
        SyncUtxosResponse,
65
    },
66
};
67

68
const LOG_TARGET: &str = "c::base_node::sync_rpc";
69

70
pub struct BaseNodeSyncRpcService<B> {
71
    db: AsyncBlockchainDb<B>,
72
    active_sessions: Mutex<Vec<Weak<NodeId>>>,
73
    base_node_service: LocalNodeCommsInterface,
74
}
75

76
impl<B: BlockchainBackend + 'static> BaseNodeSyncRpcService<B> {
77
    pub fn new(db: AsyncBlockchainDb<B>, base_node_service: LocalNodeCommsInterface) -> Self {
5✔
78
        Self {
5✔
79
            db,
5✔
80
            active_sessions: Mutex::new(Vec::new()),
5✔
81
            base_node_service,
5✔
82
        }
5✔
83
    }
5✔
84

85
    #[inline]
86
    fn db(&self) -> AsyncBlockchainDb<B> {
2✔
87
        self.db.clone()
2✔
88
    }
2✔
89

90
    pub async fn try_add_exclusive_session(&self, peer: NodeId) -> Result<Arc<NodeId>, RpcStatus> {
1✔
91
        let mut lock = self.active_sessions.lock().await;
1✔
92
        *lock = lock.drain(..).filter(|l| l.strong_count() > 0).collect();
1✔
93
        debug!(target: LOG_TARGET, "Number of active sync sessions: {}", lock.len());
1✔
94

95
        if lock.iter().any(|p| p.upgrade().filter(|p| **p == peer).is_some()) {
1✔
96
            return Err(RpcStatus::forbidden(
×
97
                "Existing sync session found for this client. Only a single session is permitted",
×
98
            ));
×
99
        }
1✔
100

101
        let token = Arc::new(peer);
1✔
102
        lock.push(Arc::downgrade(&token));
1✔
103
        #[allow(clippy::cast_possible_wrap)]
104
        #[cfg(feature = "metrics")]
105
        metrics::active_sync_peers().set(lock.len() as i64);
1✔
106
        Ok(token)
1✔
107
    }
1✔
108
}
109

110
#[tari_comms::async_trait]
111
impl<B: BlockchainBackend + 'static> BaseNodeSyncService for BaseNodeSyncRpcService<B> {
112
    #[instrument(level = "trace", name = "sync_rpc::sync_blocks", skip(self), err)]
113
    #[allow(clippy::blocks_in_conditions)]
114
    async fn sync_blocks(
115
        &self,
116
        request: Request<SyncBlocksRequest>,
117
    ) -> Result<Streaming<proto::base_node::BlockBodyResponse>, RpcStatus> {
2✔
118
        let peer_node_id = request.context().peer_node_id().clone();
1✔
119
        let message = request.into_message();
1✔
120
        let mut block_event_stream = self.base_node_service.get_block_event_stream();
1✔
121

122
        let db = self.db();
1✔
123
        let hash = message
1✔
124
            .start_hash
1✔
125
            .try_into()
1✔
126
            .map_err(|_| RpcStatus::bad_request(&"Malformed starting hash received".to_string()))?;
1✔
127
        if db.fetch_block_by_hash(hash, true).await.is_err() {
1✔
128
            return Err(RpcStatus::not_found("Requested start block sync hash was not found"));
×
129
        }
1✔
130
        let start_header = db
1✔
131
            .fetch_header_by_block_hash(hash)
1✔
132
            .await
1✔
133
            .rpc_status_internal_error(LOG_TARGET)?
1✔
134
            .ok_or_else(|| RpcStatus::not_found("Header not found with given hash"))?;
1✔
135

UNCOV
136
        let metadata = db.get_chain_metadata().await.rpc_status_internal_error(LOG_TARGET)?;
×
137

UNCOV
138
        let start_height = start_header.height + 1;
×
UNCOV
139
        if start_height < metadata.pruned_height() {
×
140
            return Err(RpcStatus::bad_request(&format!(
×
141
                "Requested full block body at height {}, however this node has an effective pruned height of {}",
×
142
                start_height,
×
143
                metadata.pruned_height()
×
144
            )));
×
UNCOV
145
        }
×
146

UNCOV
147
        let hash = message
×
UNCOV
148
            .end_hash
×
UNCOV
149
            .try_into()
×
UNCOV
150
            .map_err(|_| RpcStatus::bad_request(&"Malformed end hash received".to_string()))?;
×
UNCOV
151
        if db.fetch_block_by_hash(hash, true).await.is_err() {
×
152
            return Err(RpcStatus::not_found("Requested end block sync hash was not found"));
×
UNCOV
153
        }
×
154

UNCOV
155
        let end_header = db
×
UNCOV
156
            .fetch_header_by_block_hash(hash)
×
UNCOV
157
            .await
×
UNCOV
158
            .rpc_status_internal_error(LOG_TARGET)?
×
UNCOV
159
            .ok_or_else(|| RpcStatus::not_found("Requested end block sync hash was not found"))?;
×
160

UNCOV
161
        let end_height = end_header.height;
×
UNCOV
162
        if start_height > end_height {
×
UNCOV
163
            return Err(RpcStatus::bad_request(&format!(
×
UNCOV
164
                "Start block #{start_height} is higher than end block #{end_height}"
×
UNCOV
165
            )));
×
UNCOV
166
        }
×
167

UNCOV
168
        debug!(
×
169
            target: LOG_TARGET,
×
170
            "Initiating block sync with peer `{peer_node_id}` from height {start_height} to {end_height}"
×
171
        );
172

UNCOV
173
        let session_token = self.try_add_exclusive_session(peer_node_id).await?;
×
174
        // Number of blocks to load and push to the stream before loading the next batch
175
        const BATCH_SIZE: usize = 2;
UNCOV
176
        let (tx, rx) = mpsc::channel(BATCH_SIZE);
×
177

UNCOV
178
        let span = span!(Level::TRACE, "sync_rpc::block_sync::inner_worker");
×
UNCOV
179
        let iter = NonOverlappingIntegerPairIter::new(start_height, end_height + 1, BATCH_SIZE)
×
UNCOV
180
            .map_err(|e| RpcStatus::bad_request(&e))?;
×
UNCOV
181
        task::spawn(
×
UNCOV
182
            async move {
×
183
                // Move token into this task
UNCOV
184
                let peer_node_id = session_token;
×
UNCOV
185
                for (start, end) in iter {
×
UNCOV
186
                    if tx.is_closed() {
×
187
                        break;
×
UNCOV
188
                    }
×
189

190
                    // Check for reorgs during sync
UNCOV
191
                    while let Ok(block_event) = block_event_stream.try_recv() {
×
192
                        if let BlockEvent::ValidBlockAdded(_, BlockAddResult::ChainReorg { removed, .. }) |  BlockSyncRewind(removed)  =
×
193
                            &*block_event
×
194
                        {
195
                            //add BlockSyncRewind(Vec<Arc<ChainBlock>>),
196
                            if let Some(reorg_block) = removed
×
197
                                .iter()
×
198
                                // If the reorg happens before the end height of sync we let the peer know that the chain they are syncing with has changed
199
                                .find(|block| block.height() <= end_height)
×
200
                            {
201
                                warn!(
×
202
                                    target: LOG_TARGET,
×
203
                                    "Block reorg/rewind detected at height {} during sync, letting the sync peer {} know.",
×
204
                                    reorg_block.height(),
×
205
                                    peer_node_id
206
                                );
207
                                let _result = tx.send(Err(RpcStatus::conflict(&format!(
×
208
                                    "Reorg at height {} detected",
×
209
                                    reorg_block.height()
×
210
                                ))));
×
211
                                return;
×
212
                            }
×
213
                        }
×
214
                    }
215

UNCOV
216
                    debug!(
×
217
                        target: LOG_TARGET,
×
218
                        "Sending blocks #{start} - #{end} to '{peer_node_id}'"
×
219
                    );
UNCOV
220
                    let blocks = db
×
UNCOV
221
                        .fetch_blocks(start..=end, true)
×
UNCOV
222
                        .await
×
UNCOV
223
                        .map_err(RpcStatus::log_internal_error(LOG_TARGET));
×
224

UNCOV
225
                    if tx.is_closed() {
×
226
                        debug!(
×
227
                            target: LOG_TARGET,
×
228
                            "Block sync session for peer '{peer_node_id}' terminated early"
×
229
                        );
230
                        break;
×
UNCOV
231
                    }
×
232

UNCOV
233
                    match blocks {
×
UNCOV
234
                        Ok(blocks) if blocks.is_empty() => {
×
235
                            break;
×
236
                        },
UNCOV
237
                        Ok(blocks) => {
×
UNCOV
238
                            let blocks = blocks.into_iter().map(|hb| {
×
UNCOV
239
                                let block = hb.into_block();
×
UNCOV
240
                                proto::base_node::BlockBodyResponse::try_from(block).map_err(|e| {
×
241
                                    log::error!(target: LOG_TARGET, "Internal error: {e}");
×
242
                                    RpcStatus::general_default()
×
243
                                })
×
UNCOV
244
                            });
×
245

246
                            // Ensure task stops if the peer prematurely stops their RPC session
UNCOV
247
                            if utils::mpsc::send_all(&tx, blocks).await.is_err() {
×
248
                                debug!(
×
249
                                    target: LOG_TARGET,
×
250
                                    "Block sync session for peer '{peer_node_id}' terminated early"
×
251
                                );
252
                                break;
×
UNCOV
253
                            }
×
254
                        },
255
                        Err(err) => {
×
256
                            let _result = tx.send(Err(err)).await;
×
257
                            break;
×
258
                        },
259
                    }
260
                }
261

262
                #[cfg(feature = "metrics")]
UNCOV
263
                metrics::active_sync_peers().dec();
×
UNCOV
264
                debug!(
×
265
                    target: LOG_TARGET,
×
266
                    "Block sync round complete for peer `{peer_node_id}`."
×
267
                );
UNCOV
268
            }
×
UNCOV
269
            .instrument(span),
×
270
        );
271

UNCOV
272
        Ok(Streaming::new(rx))
×
273
    }
2✔
274

275
    #[instrument(level = "trace", name = "sync_rpc::sync_headers", skip(self), err)]
276
    #[allow(clippy::blocks_in_conditions)]
277
    async fn sync_headers(
278
        &self,
279
        request: Request<SyncHeadersRequest>,
280
    ) -> Result<Streaming<proto::core::BlockHeader>, RpcStatus> {
×
281
        let db = self.db();
×
282
        let peer_node_id = request.context().peer_node_id().clone();
×
283
        let message = request.into_message();
×
284
        let hash = message
×
285
            .start_hash
×
286
            .try_into()
×
287
            .map_err(|_| RpcStatus::bad_request(&"Malformed starting hash received".to_string()))?;
×
288
        let start_header = db
×
289
            .fetch_header_by_block_hash(hash)
×
290
            .await
×
291
            .rpc_status_internal_error(LOG_TARGET)?
×
292
            .ok_or_else(|| RpcStatus::not_found("Header not found with given hash"))?;
×
293

294
        let mut count = message.count;
×
295
        if count == 0 {
×
296
            let tip_header = db.fetch_tip_header().await.rpc_status_internal_error(LOG_TARGET)?;
×
297
            count = tip_header.height().saturating_sub(start_header.height);
×
298
        }
×
299
        // if its the start(tip_header == tip), return empty
300
        if count == 0 {
×
301
            return Ok(Streaming::empty());
×
302
        }
×
303

304
        #[allow(clippy::cast_possible_truncation)]
305
        let chunk_size = cmp::min(100, count) as usize;
×
306
        debug!(
×
307
            target: LOG_TARGET,
×
308
            "Initiating header sync with peer `{}` from height {} to {} (chunk_size={})",
×
309
            peer_node_id,
310
            start_header.height,
311
            count,
312
            chunk_size
313
        );
314

315
        let session_token = self.try_add_exclusive_session(peer_node_id.clone()).await?;
×
316
        let (tx, rx) = mpsc::channel(chunk_size);
×
317
        let span = span!(Level::TRACE, "sync_rpc::sync_headers::inner_worker");
×
318
        let iter = NonOverlappingIntegerPairIter::new(
×
319
            start_header.height.saturating_add(1),
×
320
            start_header.height.saturating_add(count).saturating_add(1),
×
321
            chunk_size,
×
322
        )
323
        .map_err(|e| RpcStatus::bad_request(&e))?;
×
324
        task::spawn(
×
325
            async move {
×
326
                // Move token into this task
327
                let peer_node_id = session_token;
×
328
                for (start, end) in iter {
×
329
                    if tx.is_closed() {
×
330
                        break;
×
331
                    }
×
332
                    debug!(target: LOG_TARGET, "Sending headers #{start} - #{end}");
×
333
                    let headers = db
×
334
                        .fetch_headers(start..=end)
×
335
                        .await
×
336
                        .map_err(RpcStatus::log_internal_error(LOG_TARGET));
×
337

338
                    if tx.is_closed() {
×
339
                        debug!(
×
340
                            target: LOG_TARGET,
×
341
                            "Header sync session for peer '{peer_node_id}' terminated early"
×
342
                        );
343
                        break;
×
344
                    }
×
345
                    match headers {
×
346
                        Ok(headers) if headers.is_empty() => {
×
347
                            break;
×
348
                        },
349
                        Ok(headers) => {
×
350
                            let headers = headers.into_iter().map(proto::core::BlockHeader::from).map(Ok);
×
351
                            // Ensure task stops if the peer prematurely stops their RPC session
352
                            if utils::mpsc::send_all(&tx, headers).await.is_err() {
×
353
                                break;
×
354
                            }
×
355
                        },
356
                        Err(err) => {
×
357
                            let _result = tx.send(Err(err)).await;
×
358
                            break;
×
359
                        },
360
                    }
361
                }
362

363
                #[cfg(feature = "metrics")]
364
                metrics::active_sync_peers().dec();
×
365
                debug!(
×
366
                    target: LOG_TARGET,
×
367
                    "Header sync round complete for peer `{peer_node_id}`."
×
368
                );
369
            }
×
370
            .instrument(span),
×
371
        );
372

373
        Ok(Streaming::new(rx))
×
374
    }
×
375

376
    #[instrument(level = "trace", skip(self), err)]
377
    #[allow(clippy::blocks_in_conditions)]
378
    async fn get_header_by_height(
379
        &self,
380
        request: Request<u64>,
381
    ) -> Result<Response<proto::core::BlockHeader>, RpcStatus> {
×
382
        let height = request.into_message();
×
383
        let header = self
×
384
            .db()
×
385
            .fetch_header(height)
×
386
            .await
×
387
            .rpc_status_internal_error(LOG_TARGET)?
×
388
            .ok_or_else(|| RpcStatus::not_found(&format!("Header not found at height {height}")))?;
×
389

390
        Ok(Response::new(header.into()))
×
391
    }
×
392

393
    #[instrument(level = "trace", skip(self), err)]
394
    #[allow(clippy::blocks_in_conditions)]
395
    async fn find_chain_split(
396
        &self,
397
        request: Request<FindChainSplitRequest>,
398
    ) -> Result<Response<FindChainSplitResponse>, RpcStatus> {
×
399
        const MAX_ALLOWED_BLOCK_HASHES: usize = 1000;
400

401
        let peer = request.context().peer_node_id().clone();
×
402
        let message = request.into_message();
×
403
        if message.block_hashes.is_empty() {
×
404
            return Err(RpcStatus::bad_request(
×
405
                "Cannot find chain split because no hashes were sent",
×
406
            ));
×
407
        }
×
408
        if message.block_hashes.len() > MAX_ALLOWED_BLOCK_HASHES {
×
409
            return Err(RpcStatus::bad_request(&format!(
×
410
                "Cannot query more than {MAX_ALLOWED_BLOCK_HASHES} block hashes"
×
411
            )));
×
412
        }
×
413
        if message.header_count > (HEADER_SYNC_INITIAL_MAX_HEADERS as u64) {
×
414
            return Err(RpcStatus::bad_request(&format!(
×
415
                "Cannot ask for more than {HEADER_SYNC_INITIAL_MAX_HEADERS} headers"
×
416
            )));
×
417
        }
×
418

419
        let db = self.db();
×
420
        let hashes: Vec<FixedHash> = message
×
421
            .block_hashes
×
422
            .into_iter()
×
423
            .map(|hash| hash.try_into().map_err(|_| "Malformed pruned hash".to_string()))
×
424
            .collect::<Result<_, _>>()
×
425
            .map_err(|_| RpcStatus::bad_request(&"Malformed block hash received".to_string()))?;
×
426
        let maybe_headers = db
×
427
            .find_headers_after_hash(hashes, message.header_count)
×
428
            .await
×
429
            .rpc_status_internal_error(LOG_TARGET)?;
×
430
        match maybe_headers {
×
431
            Some((idx, headers)) => {
×
432
                debug!(
×
433
                    target: LOG_TARGET,
×
434
                    "Sending forked index {} and {} header(s) to peer `{}`",
×
435
                    idx,
436
                    headers.len(),
×
437
                    peer
438
                );
439

440
                Ok(Response::new(FindChainSplitResponse {
×
441
                    fork_hash_index: idx as u64,
×
442
                    headers: headers.into_iter().map(Into::into).collect(),
×
443
                }))
×
444
            },
445
            None => {
446
                debug!(
×
447
                    target: LOG_TARGET,
×
448
                    "Unable to find link to main chain from peer `{peer}`"
×
449
                );
450
                Err(RpcStatus::not_found("No link found to main chain"))
×
451
            },
452
        }
453
    }
×
454

455
    #[instrument(level = "trace", skip(self), err)]
456
    #[allow(clippy::blocks_in_conditions)]
457
    async fn get_chain_metadata(&self, _: Request<()>) -> Result<Response<proto::base_node::ChainMetadata>, RpcStatus> {
×
458
        let chain_metadata = self
×
459
            .db()
×
460
            .get_chain_metadata()
×
461
            .await
×
462
            .rpc_status_internal_error(LOG_TARGET)?;
×
463
        Ok(Response::new(chain_metadata.into()))
×
464
    }
×
465

466
    #[instrument(level = "trace", skip(self), err)]
467
    #[allow(clippy::blocks_in_conditions)]
468
    async fn sync_kernels(
469
        &self,
470
        request: Request<SyncKernelsRequest>,
471
    ) -> Result<Streaming<proto::types::TransactionKernel>, RpcStatus> {
×
472
        let peer_node_id = request.context().peer_node_id().clone();
×
473
        let req = request.into_message();
×
474
        let (tx, rx) = mpsc::channel(100);
×
475
        let db = self.db();
×
476

477
        let start_header = db
×
478
            .fetch_header_containing_kernel_mmr(req.start)
×
479
            .await
×
480
            .rpc_status_internal_error(LOG_TARGET)?
×
481
            .into_header();
×
482
        let hash = req
×
483
            .end_header_hash
×
484
            .try_into()
×
485
            .map_err(|_| RpcStatus::bad_request(&"Malformed end hash received".to_string()))?;
×
486
        let end_header = db
×
487
            .fetch_header_by_block_hash(hash)
×
488
            .await
×
489
            .rpc_status_internal_error(LOG_TARGET)?
×
490
            .ok_or_else(|| RpcStatus::not_found("Unknown end header"))?;
×
491

492
        let mut current_height = start_header.height;
×
493
        let end_height = end_header.height;
×
494
        let mut current_mmr_position = start_header.kernel_mmr_size;
×
495
        let mut current_header_hash = start_header.hash();
×
496

497
        if current_height > end_height {
×
498
            return Err(RpcStatus::bad_request("start header height is after end header"));
×
499
        }
×
500

501
        let session_token = self.try_add_exclusive_session(peer_node_id).await?;
×
502
        task::spawn(async move {
×
503
            // Move session token into task
504
            let peer_node_id = session_token;
×
505
            while current_height <= end_height {
×
506
                if tx.is_closed() {
×
507
                    break;
×
508
                }
×
509
                let res = db
×
510
                    .fetch_kernels_in_block(current_header_hash)
×
511
                    .await
×
512
                    .map_err(RpcStatus::log_internal_error(LOG_TARGET));
×
513

514
                if tx.is_closed() {
×
515
                    debug!(
×
516
                        target: LOG_TARGET,
×
517
                        "Kernel sync session for peer '{peer_node_id}' terminated early",
×
518
                    );
519
                    break;
×
520
                }
×
521

522
                match res {
×
523
                    Ok(kernels) if kernels.is_empty() => {
×
524
                        let _result = tx
×
525
                            .send(Err(RpcStatus::general(&format!(
×
526
                                "No kernels in block {}",
×
527
                                current_header_hash.to_hex()
×
528
                            ))))
×
529
                            .await;
×
530
                        break;
×
531
                    },
532
                    Ok(kernels) => {
×
533
                        debug!(
×
534
                            target: LOG_TARGET,
×
535
                            "Streaming kernels {} to {}",
×
536
                            current_mmr_position,
537
                            current_mmr_position + kernels.len() as u64
×
538
                        );
539
                        current_mmr_position += kernels.len() as u64;
×
540
                        let kernels = kernels.into_iter().map(proto::types::TransactionKernel::from).map(Ok);
×
541
                        // Ensure task stops if the peer prematurely stops their RPC session
542
                        if utils::mpsc::send_all(&tx, kernels).await.is_err() {
×
543
                            break;
×
544
                        }
×
545
                    },
546
                    Err(err) => {
×
547
                        let _result = tx.send(Err(err)).await;
×
548
                        break;
×
549
                    },
550
                }
551

552
                current_height += 1;
×
553

554
                if current_height <= end_height {
×
555
                    let res = db
×
556
                        .fetch_header(current_height)
×
557
                        .await
×
558
                        .map_err(RpcStatus::log_internal_error(LOG_TARGET));
×
559
                    match res {
×
560
                        Ok(Some(header)) => {
×
561
                            current_header_hash = header.hash();
×
562
                        },
×
563
                        Ok(None) => {
564
                            let _result = tx
×
565
                                .send(Err(RpcStatus::not_found(&format!(
×
566
                                    "Could not find header #{current_height} while streaming UTXOs after position \
×
567
                                     {current_mmr_position}"
×
568
                                ))))
×
569
                                .await;
×
570
                            break;
×
571
                        },
572
                        Err(err) => {
×
573
                            error!(target: LOG_TARGET, "DB error while streaming kernels: {err}");
×
574
                            let _result = tx
×
575
                                .send(Err(RpcStatus::general("DB error while streaming kernels")))
×
576
                                .await;
×
577
                            break;
×
578
                        },
579
                    }
580
                }
×
581
            }
582

583
            #[cfg(feature = "metrics")]
584
            metrics::active_sync_peers().dec();
×
585
            debug!(
×
586
                target: LOG_TARGET,
×
587
                "Kernel sync round complete for peer `{peer_node_id}`.",
×
588
            );
589
        });
×
590
        Ok(Streaming::new(rx))
×
591
    }
×
592

593
    #[instrument(level = "trace", skip(self), err)]
594
    #[allow(clippy::blocks_in_conditions)]
595
    async fn sync_utxos(&self, request: Request<SyncUtxosRequest>) -> Result<Streaming<SyncUtxosResponse>, RpcStatus> {
2✔
596
        let req = request.message();
1✔
597
        let peer_node_id = request.context().peer_node_id();
1✔
598
        debug!(
1✔
599
            target: LOG_TARGET,
×
600
            "Received sync_utxos-{} request from header {} to {}",
×
601
            peer_node_id,
602
            req.start_header_hash.to_hex(),
×
603
            req.end_header_hash.to_hex(),
×
604
        );
605

606
        let session_token = self.try_add_exclusive_session(peer_node_id.clone()).await?;
1✔
607
        let (tx, rx) = mpsc::channel(200);
1✔
608
        let task = SyncUtxosTask::new(self.db(), session_token);
1✔
609
        task.run(request, tx).await?;
1✔
610

611
        Ok(Streaming::new(rx))
×
612
    }
2✔
613
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc