• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tari-project / tari / 15280118615

27 May 2025 04:01PM UTC coverage: 73.59% (+0.4%) from 73.233%
15280118615

push

github

web-flow
feat: add base node HTTP wallet service (#7061)

Description
---
Added a new HTTP server for base node that exposes some wallet related
query functionality.

Current new endpoints (examples on **esmeralda** network):
 - http://127.0.0.1:9005/get_tip_info
 - http://127.0.0.1:9005/get_header_by_height?height=6994
 - http://127.0.0.1:9005/get_height_at_time?time=1747739959

Default ports for http service (by network):
```
MainNet: 9000,
StageNet: 9001,
NextNet: 9002,
LocalNet: 9003,
Igor: 9004,
Esmeralda: 9005,
```

New configuration needs to be set in base node:
```toml
[base_node.http_wallet_query_service]
port = 9000
external_address = "http://127.0.0.1:9000" # this is optional, but if not set, when someone requests for the external address, just returns a None, so wallets can't contact base node
```

Motivation and Context
---


How Has This Been Tested?
---
### Manually

#### Basic test
1. Build new base node
2. Set base node configuration by adding the following:
```toml
[base_node.http_wallet_query_service]
port = 9000
external_address = "http://127.0.0.1:9000"
```
This way we set the port and external address (which is sent to wallet
client when requesting, so in real world it must be public)
3. Set logging level of base node logs to DEBUG
4. Start base node
5. Build and start console wallet
6. See that it is still able to synchronize
7. Check logs of base node (with `tail -f ...` command for instance) and
see that the HTTP endpoints are used

#### Use RPC fallback test
1. Build new base node
2. Set base node configuration by adding the following:
```toml
[base_node.http_wallet_query_service]
port = 9000
external_address = "http://127.0.0.1:9001"
```
This way we set the port and external address (which is sent to wallet
client when requesting, so in real world it must be public)
3. Set logging level of base node logs to DEBUG
4. Start base node
5. Build and start console wallet
6. See that it is still able to synchronize
7. Check logs of base nod... (continued)

9 of 114 new or added lines in 4 files covered. (7.89%)

1592 existing lines in 62 files now uncovered.

82227 of 111736 relevant lines covered (73.59%)

272070.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

34.94
/base_layer/core/src/base_node/sync/rpc/service.rs
1
//  Copyright 2020, The Tari Project
2
//
3
//  Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
4
//  following conditions are met:
5
//
6
//  1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
7
//  disclaimer.
8
//
9
//  2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
10
//  following disclaimer in the documentation and/or other materials provided with the distribution.
11
//
12
//  3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
13
//  products derived from this software without specific prior written permission.
14
//
15
//  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
16
//  INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17
//  DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
18
//  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
19
//  SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
20
//  WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
21
//  USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
22

23
use std::{
24
    cmp,
25
    convert::{TryFrom, TryInto},
26
    sync::{Arc, Weak},
27
};
28

29
use log::*;
30
use tari_common_types::types::FixedHash;
31
use tari_comms::{
32
    peer_manager::NodeId,
33
    protocol::rpc::{Request, Response, RpcStatus, RpcStatusResultExt, Streaming},
34
    utils,
35
};
36
use tari_utilities::hex::Hex;
37
use tokio::{
38
    sync::{mpsc, Mutex},
39
    task,
40
};
41
use tracing::{instrument, span, Instrument, Level};
42

43
#[cfg(feature = "metrics")]
44
use crate::base_node::metrics;
45
use crate::{
46
    base_node::{
47
        comms_interface::{BlockEvent, BlockEvent::BlockSyncRewind},
48
        sync::{
49
            header_sync::HEADER_SYNC_INITIAL_MAX_HEADERS,
50
            rpc::{sync_utxos_task::SyncUtxosTask, BaseNodeSyncService},
51
        },
52
        LocalNodeCommsInterface,
53
    },
54
    chain_storage::{async_db::AsyncBlockchainDb, BlockAddResult, BlockchainBackend},
55
    iterators::NonOverlappingIntegerPairIter,
56
    proto,
57
    proto::base_node::{
58
        FindChainSplitRequest,
59
        FindChainSplitResponse,
60
        SyncBlocksRequest,
61
        SyncHeadersRequest,
62
        SyncKernelsRequest,
63
        SyncUtxosRequest,
64
        SyncUtxosResponse,
65
    },
66
};
67

68
const LOG_TARGET: &str = "c::base_node::sync_rpc";
69

70
pub struct BaseNodeSyncRpcService<B> {
71
    db: AsyncBlockchainDb<B>,
72
    active_sessions: Mutex<Vec<Weak<NodeId>>>,
73
    base_node_service: LocalNodeCommsInterface,
74
}
75

76
impl<B: BlockchainBackend + 'static> BaseNodeSyncRpcService<B> {
77
    pub fn new(db: AsyncBlockchainDb<B>, base_node_service: LocalNodeCommsInterface) -> Self {
66✔
78
        Self {
66✔
79
            db,
66✔
80
            active_sessions: Mutex::new(Vec::new()),
66✔
81
            base_node_service,
66✔
82
        }
66✔
83
    }
66✔
84

85
    #[inline]
86
    fn db(&self) -> AsyncBlockchainDb<B> {
18✔
87
        self.db.clone()
18✔
88
    }
18✔
89

90
    pub async fn try_add_exclusive_session(&self, peer: NodeId) -> Result<Arc<NodeId>, RpcStatus> {
4✔
91
        let mut lock = self.active_sessions.lock().await;
4✔
92
        *lock = lock.drain(..).filter(|l| l.strong_count() > 0).collect();
4✔
93
        debug!(target: LOG_TARGET, "Number of active sync sessions: {}", lock.len());
4✔
94

95
        if lock.iter().any(|p| p.upgrade().filter(|p| **p == peer).is_some()) {
4✔
96
            return Err(RpcStatus::forbidden(
×
97
                "Existing sync session found for this client. Only a single session is permitted",
×
98
            ));
×
99
        }
4✔
100

4✔
101
        let token = Arc::new(peer);
4✔
102
        lock.push(Arc::downgrade(&token));
4✔
103
        #[allow(clippy::cast_possible_wrap)]
4✔
104
        #[cfg(feature = "metrics")]
4✔
105
        metrics::active_sync_peers().set(lock.len() as i64);
4✔
106
        Ok(token)
4✔
107
    }
4✔
108
}
109

110
#[tari_comms::async_trait]
111
impl<B: BlockchainBackend + 'static> BaseNodeSyncService for BaseNodeSyncRpcService<B> {
112
    #[instrument(level = "trace", name = "sync_rpc::sync_blocks", skip(self), err)]
113
    #[allow(clippy::blocks_in_conditions)]
114
    async fn sync_blocks(
115
        &self,
116
        request: Request<SyncBlocksRequest>,
117
    ) -> Result<Streaming<proto::base_node::BlockBodyResponse>, RpcStatus> {
6✔
118
        let peer_node_id = request.context().peer_node_id().clone();
6✔
119
        let message = request.into_message();
6✔
120
        let mut block_event_stream = self.base_node_service.get_block_event_stream();
6✔
121

6✔
122
        let db = self.db();
6✔
123
        let hash = message
6✔
124
            .start_hash
6✔
125
            .try_into()
6✔
126
            .map_err(|_| RpcStatus::bad_request(&"Malformed starting hash received".to_string()))?;
6✔
127
        if db.fetch_block_by_hash(hash, true).await.is_err() {
6✔
128
            return Err(RpcStatus::not_found("Requested start block sync hash was not found"));
×
129
        }
6✔
130
        let start_header = db
6✔
131
            .fetch_header_by_block_hash(hash)
6✔
132
            .await
6✔
133
            .rpc_status_internal_error(LOG_TARGET)?
6✔
134
            .ok_or_else(|| RpcStatus::not_found("Header not found with given hash"))?;
6✔
135

136
        let metadata = db.get_chain_metadata().await.rpc_status_internal_error(LOG_TARGET)?;
5✔
137

138
        let start_height = start_header.height + 1;
5✔
139
        if start_height < metadata.pruned_height() {
5✔
140
            return Err(RpcStatus::bad_request(&format!(
×
141
                "Requested full block body at height {}, however this node has an effective pruned height of {}",
×
142
                start_height,
×
143
                metadata.pruned_height()
×
144
            )));
×
145
        }
5✔
146

147
        let hash = message
5✔
148
            .end_hash
5✔
149
            .try_into()
5✔
150
            .map_err(|_| RpcStatus::bad_request(&"Malformed end hash received".to_string()))?;
5✔
151
        if db.fetch_block_by_hash(hash, true).await.is_err() {
5✔
152
            return Err(RpcStatus::not_found("Requested end block sync hash was not found"));
2✔
153
        }
3✔
154

155
        let end_header = db
3✔
156
            .fetch_header_by_block_hash(hash)
3✔
157
            .await
3✔
158
            .rpc_status_internal_error(LOG_TARGET)?
3✔
159
            .ok_or_else(|| RpcStatus::not_found("Requested end block sync hash was not found"))?;
3✔
160

161
        let end_height = end_header.height;
3✔
162
        if start_height > end_height {
3✔
163
            return Err(RpcStatus::bad_request(&format!(
1✔
164
                "Start block #{} is higher than end block #{}",
1✔
165
                start_height, end_height
1✔
166
            )));
1✔
167
        }
2✔
168

2✔
169
        debug!(
2✔
170
            target: LOG_TARGET,
×
171
            "Initiating block sync with peer `{}` from height {} to {}", peer_node_id, start_height, end_height,
×
172
        );
173

174
        let session_token = self.try_add_exclusive_session(peer_node_id).await?;
2✔
175
        // Number of blocks to load and push to the stream before loading the next batch
176
        const BATCH_SIZE: usize = 2;
177
        let (tx, rx) = mpsc::channel(BATCH_SIZE);
2✔
178

179
        let span = span!(Level::TRACE, "sync_rpc::block_sync::inner_worker");
2✔
180
        let iter = NonOverlappingIntegerPairIter::new(start_height, end_height + 1, BATCH_SIZE)
2✔
181
            .map_err(|e| RpcStatus::bad_request(&e))?;
2✔
182
        task::spawn(
2✔
183
            async move {
2✔
184
                // Move token into this task
2✔
185
                let peer_node_id = session_token;
2✔
186
                for (start, end) in iter {
7✔
187
                    if tx.is_closed() {
5✔
188
                        break;
×
189
                    }
5✔
190

191
                    // Check for reorgs during sync
192
                    while let Ok(block_event) = block_event_stream.try_recv() {
5✔
193
                        if let BlockEvent::ValidBlockAdded(_, BlockAddResult::ChainReorg { removed, .. }) |  BlockSyncRewind(removed)  =
×
194
                            &*block_event
×
195
                        {
196
                            //add BlockSyncRewind(Vec<Arc<ChainBlock>>),
197
                            if let Some(reorg_block) = removed
×
198
                                .iter()
×
199
                                // If the reorg happens before the end height of sync we let the peer know that the chain they are syncing with has changed
×
200
                                .find(|block| block.height() <= end_height)
×
201
                            {
202
                                warn!(
×
203
                                    target: LOG_TARGET,
×
204
                                    "Block reorg/rewind detected at height {} during sync, letting the sync peer {} know.",
×
205
                                    reorg_block.height(),
×
206
                                    peer_node_id
207
                                );
208
                                let _result = tx.send(Err(RpcStatus::conflict(&format!(
×
209
                                    "Reorg at height {} detected",
×
210
                                    reorg_block.height()
×
211
                                ))));
×
212
                                return;
×
213
                            }
×
214
                        }
×
215
                    }
216

217
                    debug!(
5✔
218
                        target: LOG_TARGET,
×
219
                        "Sending blocks #{} - #{} to '{}'", start, end, peer_node_id
×
220
                    );
221
                    let blocks = db
5✔
222
                        .fetch_blocks(start..=end, true)
5✔
223
                        .await
5✔
224
                        .map_err(RpcStatus::log_internal_error(LOG_TARGET));
5✔
225

5✔
226
                    if tx.is_closed() {
5✔
227
                        debug!(
×
228
                            target: LOG_TARGET,
×
229
                            "Block sync session for peer '{}' terminated early", peer_node_id
×
230
                        );
231
                        break;
×
232
                    }
5✔
233

234
                    match blocks {
5✔
235
                        Ok(blocks) if blocks.is_empty() => {
5✔
236
                            break;
×
237
                        },
238
                        Ok(blocks) => {
5✔
239
                            let blocks = blocks.into_iter().map(|hb| {
9✔
240
                                let block = hb.into_block();
9✔
241
                                proto::base_node::BlockBodyResponse::try_from(block).map_err(|e| {
9✔
242
                                    log::error!(target: LOG_TARGET, "Internal error: {}", e);
×
243
                                    RpcStatus::general_default()
×
244
                                })
9✔
245
                            });
9✔
246

5✔
247
                            // Ensure task stops if the peer prematurely stops their RPC session
5✔
248
                            if utils::mpsc::send_all(&tx, blocks).await.is_err() {
5✔
249
                                debug!(
×
250
                                    target: LOG_TARGET,
×
251
                                    "Block sync session for peer '{}' terminated early", peer_node_id
×
252
                                );
253
                                break;
×
254
                            }
5✔
255
                        },
256
                        Err(err) => {
×
257
                            let _result = tx.send(Err(err)).await;
×
258
                            break;
×
259
                        },
260
                    }
261
                }
262

263
                #[cfg(feature = "metrics")]
264
                metrics::active_sync_peers().dec();
2✔
265
                debug!(
2✔
266
                    target: LOG_TARGET,
×
267
                    "Block sync round complete for peer `{}`.", peer_node_id,
×
268
                );
269
            }
2✔
270
            .instrument(span),
2✔
271
        );
2✔
272

2✔
273
        Ok(Streaming::new(rx))
2✔
274
    }
12✔
275

276
    #[instrument(level = "trace", name = "sync_rpc::sync_headers", skip(self), err)]
277
    #[allow(clippy::blocks_in_conditions)]
278
    async fn sync_headers(
279
        &self,
280
        request: Request<SyncHeadersRequest>,
281
    ) -> Result<Streaming<proto::core::BlockHeader>, RpcStatus> {
×
282
        let db = self.db();
×
283
        let peer_node_id = request.context().peer_node_id().clone();
×
284
        let message = request.into_message();
×
285
        let hash = message
×
286
            .start_hash
×
287
            .try_into()
×
288
            .map_err(|_| RpcStatus::bad_request(&"Malformed starting hash received".to_string()))?;
×
289
        let start_header = db
×
290
            .fetch_header_by_block_hash(hash)
×
291
            .await
×
292
            .rpc_status_internal_error(LOG_TARGET)?
×
293
            .ok_or_else(|| RpcStatus::not_found("Header not found with given hash"))?;
×
294

295
        let mut count = message.count;
×
296
        if count == 0 {
×
297
            let tip_header = db.fetch_tip_header().await.rpc_status_internal_error(LOG_TARGET)?;
×
298
            count = tip_header.height().saturating_sub(start_header.height);
×
299
        }
×
300
        // if its the start(tip_header == tip), return empty
301
        if count == 0 {
×
302
            return Ok(Streaming::empty());
×
303
        }
×
304

×
305
        #[allow(clippy::cast_possible_truncation)]
×
306
        let chunk_size = cmp::min(100, count) as usize;
×
307
        debug!(
×
308
            target: LOG_TARGET,
×
309
            "Initiating header sync with peer `{}` from height {} to {} (chunk_size={})",
×
310
            peer_node_id,
311
            start_header.height,
312
            count,
313
            chunk_size
314
        );
315

316
        let session_token = self.try_add_exclusive_session(peer_node_id.clone()).await?;
×
317
        let (tx, rx) = mpsc::channel(chunk_size);
×
318
        let span = span!(Level::TRACE, "sync_rpc::sync_headers::inner_worker");
×
319
        let iter = NonOverlappingIntegerPairIter::new(
×
320
            start_header.height.saturating_add(1),
×
321
            start_header.height.saturating_add(count).saturating_add(1),
×
322
            chunk_size,
×
323
        )
×
324
        .map_err(|e| RpcStatus::bad_request(&e))?;
×
325
        task::spawn(
×
326
            async move {
×
327
                // Move token into this task
×
328
                let peer_node_id = session_token;
×
329
                for (start, end) in iter {
×
330
                    if tx.is_closed() {
×
331
                        break;
×
332
                    }
×
333
                    debug!(target: LOG_TARGET, "Sending headers #{} - #{}", start, end);
×
334
                    let headers = db
×
335
                        .fetch_headers(start..=end)
×
336
                        .await
×
337
                        .map_err(RpcStatus::log_internal_error(LOG_TARGET));
×
338

×
339
                    if tx.is_closed() {
×
340
                        debug!(
×
341
                            target: LOG_TARGET,
×
342
                            "Header sync session for peer '{}' terminated early", peer_node_id
×
343
                        );
344
                        break;
×
345
                    }
×
346
                    match headers {
×
347
                        Ok(headers) if headers.is_empty() => {
×
348
                            break;
×
349
                        },
350
                        Ok(headers) => {
×
351
                            let headers = headers.into_iter().map(proto::core::BlockHeader::from).map(Ok);
×
352
                            // Ensure task stops if the peer prematurely stops their RPC session
×
353
                            if utils::mpsc::send_all(&tx, headers).await.is_err() {
×
354
                                break;
×
355
                            }
×
356
                        },
357
                        Err(err) => {
×
358
                            let _result = tx.send(Err(err)).await;
×
359
                            break;
×
360
                        },
361
                    }
362
                }
363

364
                #[cfg(feature = "metrics")]
365
                metrics::active_sync_peers().dec();
×
366
                debug!(
×
367
                    target: LOG_TARGET,
×
368
                    "Header sync round complete for peer `{}`.", peer_node_id,
×
369
                );
370
            }
×
371
            .instrument(span),
×
372
        );
×
373

×
374
        Ok(Streaming::new(rx))
×
375
    }
×
376

377
    #[instrument(level = "trace", skip(self), err)]
378
    #[allow(clippy::blocks_in_conditions)]
379
    async fn get_header_by_height(
380
        &self,
381
        request: Request<u64>,
382
    ) -> Result<Response<proto::core::BlockHeader>, RpcStatus> {
×
383
        let height = request.into_message();
×
384
        let header = self
×
385
            .db()
×
386
            .fetch_header(height)
×
387
            .await
×
388
            .rpc_status_internal_error(LOG_TARGET)?
×
389
            .ok_or_else(|| RpcStatus::not_found(&format!("Header not found at height {}", height)))?;
×
390

391
        Ok(Response::new(header.into()))
×
392
    }
×
393

394
    #[instrument(level = "trace", skip(self), err)]
395
    #[allow(clippy::blocks_in_conditions)]
396
    async fn find_chain_split(
397
        &self,
398
        request: Request<FindChainSplitRequest>,
399
    ) -> Result<Response<FindChainSplitResponse>, RpcStatus> {
10✔
400
        const MAX_ALLOWED_BLOCK_HASHES: usize = 1000;
401

402
        let peer = request.context().peer_node_id().clone();
10✔
403
        let message = request.into_message();
10✔
404
        if message.block_hashes.is_empty() {
10✔
405
            return Err(RpcStatus::bad_request(
×
406
                "Cannot find chain split because no hashes were sent",
×
407
            ));
×
408
        }
10✔
409
        if message.block_hashes.len() > MAX_ALLOWED_BLOCK_HASHES {
10✔
410
            return Err(RpcStatus::bad_request(&format!(
×
411
                "Cannot query more than {} block hashes",
×
412
                MAX_ALLOWED_BLOCK_HASHES,
×
413
            )));
×
414
        }
10✔
415
        if message.header_count > (HEADER_SYNC_INITIAL_MAX_HEADERS as u64) {
10✔
416
            return Err(RpcStatus::bad_request(&format!(
×
417
                "Cannot ask for more than {} headers",
×
418
                HEADER_SYNC_INITIAL_MAX_HEADERS,
×
419
            )));
×
420
        }
10✔
421

10✔
422
        let db = self.db();
10✔
423
        let hashes: Vec<FixedHash> = message
10✔
424
            .block_hashes
10✔
425
            .into_iter()
10✔
426
            .map(|hash| hash.try_into().map_err(|_| "Malformed pruned hash".to_string()))
52✔
427
            .collect::<Result<_, _>>()
10✔
428
            .map_err(|_| RpcStatus::bad_request(&"Malformed block hash received".to_string()))?;
10✔
429
        let maybe_headers = db
10✔
430
            .find_headers_after_hash(hashes, message.header_count)
10✔
431
            .await
10✔
432
            .rpc_status_internal_error(LOG_TARGET)?;
10✔
433
        match maybe_headers {
10✔
434
            Some((idx, headers)) => {
10✔
435
                debug!(
10✔
436
                    target: LOG_TARGET,
×
437
                    "Sending forked index {} and {} header(s) to peer `{}`",
×
438
                    idx,
×
439
                    headers.len(),
×
440
                    peer
441
                );
442

443
                Ok(Response::new(FindChainSplitResponse {
10✔
444
                    fork_hash_index: idx as u64,
10✔
445
                    headers: headers.into_iter().map(Into::into).collect(),
10✔
446
                }))
10✔
447
            },
448
            None => {
449
                debug!(
×
450
                    target: LOG_TARGET,
×
451
                    "Unable to find link to main chain from peer `{}`", peer
×
452
                );
453
                Err(RpcStatus::not_found("No link found to main chain"))
×
454
            },
455
        }
456
    }
20✔
457

458
    #[instrument(level = "trace", skip(self), err)]
459
    #[allow(clippy::blocks_in_conditions)]
460
    async fn get_chain_metadata(&self, _: Request<()>) -> Result<Response<proto::base_node::ChainMetadata>, RpcStatus> {
×
461
        let chain_metadata = self
×
462
            .db()
×
463
            .get_chain_metadata()
×
464
            .await
×
465
            .rpc_status_internal_error(LOG_TARGET)?;
×
466
        Ok(Response::new(chain_metadata.into()))
×
467
    }
×
468

469
    #[instrument(level = "trace", skip(self), err)]
470
    #[allow(clippy::blocks_in_conditions)]
471
    async fn sync_kernels(
472
        &self,
473
        request: Request<SyncKernelsRequest>,
UNCOV
474
    ) -> Result<Streaming<proto::types::TransactionKernel>, RpcStatus> {
×
UNCOV
475
        let peer_node_id = request.context().peer_node_id().clone();
×
UNCOV
476
        let req = request.into_message();
×
UNCOV
477
        let (tx, rx) = mpsc::channel(100);
×
UNCOV
478
        let db = self.db();
×
479

UNCOV
480
        let start_header = db
×
UNCOV
481
            .fetch_header_containing_kernel_mmr(req.start)
×
UNCOV
482
            .await
×
UNCOV
483
            .rpc_status_internal_error(LOG_TARGET)?
×
UNCOV
484
            .into_header();
×
UNCOV
485
        let hash = req
×
UNCOV
486
            .end_header_hash
×
UNCOV
487
            .try_into()
×
UNCOV
488
            .map_err(|_| RpcStatus::bad_request(&"Malformed end hash received".to_string()))?;
×
UNCOV
489
        let end_header = db
×
UNCOV
490
            .fetch_header_by_block_hash(hash)
×
UNCOV
491
            .await
×
UNCOV
492
            .rpc_status_internal_error(LOG_TARGET)?
×
UNCOV
493
            .ok_or_else(|| RpcStatus::not_found("Unknown end header"))?;
×
494

UNCOV
495
        let mut current_height = start_header.height;
×
UNCOV
496
        let end_height = end_header.height;
×
UNCOV
497
        let mut current_mmr_position = start_header.kernel_mmr_size;
×
UNCOV
498
        let mut current_header_hash = start_header.hash();
×
UNCOV
499

×
UNCOV
500
        if current_height > end_height {
×
501
            return Err(RpcStatus::bad_request("start header height is after end header"));
×
UNCOV
502
        }
×
503

UNCOV
504
        let session_token = self.try_add_exclusive_session(peer_node_id).await?;
×
UNCOV
505
        task::spawn(async move {
×
UNCOV
506
            // Move session token into task
×
UNCOV
507
            let peer_node_id = session_token;
×
UNCOV
508
            while current_height <= end_height {
×
UNCOV
509
                if tx.is_closed() {
×
510
                    break;
×
UNCOV
511
                }
×
UNCOV
512
                let res = db
×
UNCOV
513
                    .fetch_kernels_in_block(current_header_hash)
×
UNCOV
514
                    .await
×
UNCOV
515
                    .map_err(RpcStatus::log_internal_error(LOG_TARGET));
×
UNCOV
516

×
UNCOV
517
                if tx.is_closed() {
×
518
                    debug!(
×
519
                        target: LOG_TARGET,
×
520
                        "Kernel sync session for peer '{}' terminated early", peer_node_id
×
521
                    );
522
                    break;
×
UNCOV
523
                }
×
524

UNCOV
525
                match res {
×
UNCOV
526
                    Ok(kernels) if kernels.is_empty() => {
×
527
                        let _result = tx
×
528
                            .send(Err(RpcStatus::general(&format!(
×
529
                                "No kernels in block {}",
×
530
                                current_header_hash.to_hex()
×
531
                            ))))
×
532
                            .await;
×
533
                        break;
×
534
                    },
UNCOV
535
                    Ok(kernels) => {
×
UNCOV
536
                        debug!(
×
537
                            target: LOG_TARGET,
×
538
                            "Streaming kernels {} to {}",
×
539
                            current_mmr_position,
×
540
                            current_mmr_position + kernels.len() as u64
×
541
                        );
UNCOV
542
                        current_mmr_position += kernels.len() as u64;
×
UNCOV
543
                        let kernels = kernels.into_iter().map(proto::types::TransactionKernel::from).map(Ok);
×
UNCOV
544
                        // Ensure task stops if the peer prematurely stops their RPC session
×
UNCOV
545
                        if utils::mpsc::send_all(&tx, kernels).await.is_err() {
×
546
                            break;
×
UNCOV
547
                        }
×
548
                    },
549
                    Err(err) => {
×
550
                        let _result = tx.send(Err(err)).await;
×
551
                        break;
×
552
                    },
553
                }
554

UNCOV
555
                current_height += 1;
×
UNCOV
556

×
UNCOV
557
                if current_height <= end_height {
×
UNCOV
558
                    let res = db
×
UNCOV
559
                        .fetch_header(current_height)
×
UNCOV
560
                        .await
×
UNCOV
561
                        .map_err(RpcStatus::log_internal_error(LOG_TARGET));
×
UNCOV
562
                    match res {
×
UNCOV
563
                        Ok(Some(header)) => {
×
UNCOV
564
                            current_header_hash = header.hash();
×
UNCOV
565
                        },
×
566
                        Ok(None) => {
567
                            let _result = tx
×
568
                                .send(Err(RpcStatus::not_found(&format!(
×
569
                                    "Could not find header #{} while streaming UTXOs after position {}",
×
570
                                    current_height, current_mmr_position
×
571
                                ))))
×
572
                                .await;
×
573
                            break;
×
574
                        },
575
                        Err(err) => {
×
576
                            error!(target: LOG_TARGET, "DB error while streaming kernels: {}", err);
×
577
                            let _result = tx
×
578
                                .send(Err(RpcStatus::general("DB error while streaming kernels")))
×
579
                                .await;
×
580
                            break;
×
581
                        },
582
                    }
UNCOV
583
                }
×
584
            }
585

586
            #[cfg(feature = "metrics")]
UNCOV
587
            metrics::active_sync_peers().dec();
×
UNCOV
588
            debug!(
×
589
                target: LOG_TARGET,
×
590
                "Kernel sync round complete for peer `{}`.", peer_node_id,
×
591
            );
UNCOV
592
        });
×
UNCOV
593
        Ok(Streaming::new(rx))
×
UNCOV
594
    }
×
595

596
    #[instrument(level = "trace", skip(self), err)]
597
    #[allow(clippy::blocks_in_conditions)]
598
    async fn sync_utxos(&self, request: Request<SyncUtxosRequest>) -> Result<Streaming<SyncUtxosResponse>, RpcStatus> {
2✔
599
        let req = request.message();
2✔
600
        let peer_node_id = request.context().peer_node_id();
2✔
601
        debug!(
2✔
602
            target: LOG_TARGET,
×
603
            "Received sync_utxos-{} request from header {} to {}",
×
604
            peer_node_id,
×
605
            req.start_header_hash.to_hex(),
×
606
            req.end_header_hash.to_hex(),
×
607
        );
608

609
        let session_token = self.try_add_exclusive_session(peer_node_id.clone()).await?;
2✔
610
        let (tx, rx) = mpsc::channel(200);
2✔
611
        let task = SyncUtxosTask::new(self.db(), session_token);
2✔
612
        task.run(request, tx).await?;
2✔
613

UNCOV
614
        Ok(Streaming::new(rx))
×
615
    }
4✔
616
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc