• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tari-project / tari / 20715266036

05 Jan 2026 12:22PM UTC coverage: 59.702% (-0.9%) from 60.642%
20715266036

push

github

web-flow
chore(deps): bump azure/trusted-signing-action from 0.5.10 to 0.5.11 (#7632)

Bumps
[azure/trusted-signing-action](https://github.com/azure/trusted-signing-action)
from 0.5.10 to 0.5.11.
<details>
<summary>Release notes</summary>
<p><em>Sourced from <a
href="https://github.com/azure/trusted-signing-action/releases">azure/trusted-signing-action's
releases</a>.</em></p>
<blockquote>
<h2>v0.5.11</h2>
<h2>What's Changed</h2>
<ul>
<li>fix: map to environment variables instead of directly from inputs by
<a href="https://github.com/Jaxelr"><code>@​Jaxelr</code></a> in <a
href="https://redirect.github.com/Azure/trusted-signing-action/pull/102">Azure/trusted-signing-action#102</a></li>
<li>chore: update codeowners to trusted signing team by <a
href="https://github.com/Jaxelr"><code>@​Jaxelr</code></a> in <a
href="https://redirect.github.com/Azure/trusted-signing-action/pull/103">Azure/trusted-signing-action#103</a></li>
</ul>
<p><strong>Full Changelog</strong>: <a
href="https://github.com/Azure/trusted-signing-action/compare/v0...v0.5.11">https://github.com/Azure/trusted-signing-action/compare/v0...v0.5.11</a></p>
</blockquote>
</details>
<details>
<summary>Commits</summary>
<ul>
<li><a
href="https://github.com/Azure/trusted-signing-action/commit/1d365fec1"><code>1d365fe</code></a>
chore: update codeowners to trusted signing team (<a
href="https://redirect.github.com/azure/trusted-signing-action/issues/103">#103</a>)</li>
<li><a
href="https://github.com/Azure/trusted-signing-action/commit/34bc367eb"><code>34bc367</code></a>
fix: map to environment variables instead of directly from inputs (<a
href="https://redirect.github.com/azure/trusted-signing-action/issues/102">#102</a>)</li>
<li>See full diff in <a
href="https://github.com/azure/trusted-signing-action/compare/v0.5.10...v0.5.11">compare
view</a></li>
</ul>
</details>
<br />


[![Dependabot compatibility
score](https://dependabot-badges.githubap... (continued)

69282 of 116047 relevant lines covered (59.7%)

300086.98 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

19.58
/base_layer/core/src/base_node/sync/rpc/service.rs
1
//  Copyright 2020, The Tari Project
2
//
3
//  Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
4
//  following conditions are met:
5
//
6
//  1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
7
//  disclaimer.
8
//
9
//  2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
10
//  following disclaimer in the documentation and/or other materials provided with the distribution.
11
//
12
//  3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
13
//  products derived from this software without specific prior written permission.
14
//
15
//  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
16
//  INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17
//  DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
18
//  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
19
//  SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
20
//  WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
21
//  USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
22

23
use std::{
24
    cmp,
25
    convert::{TryFrom, TryInto},
26
    sync::{Arc, Weak},
27
};
28

29
use log::*;
30
use tari_common_types::types::FixedHash;
31
use tari_comms::{
32
    peer_manager::NodeId,
33
    protocol::rpc::{Request, Response, RpcStatus, RpcStatusResultExt, Streaming},
34
    utils,
35
};
36
use tari_utilities::hex::Hex;
37
use tokio::{
38
    sync::{mpsc, Mutex},
39
    task,
40
};
41
use tracing::{instrument, span, Instrument, Level};
42

43
#[cfg(feature = "metrics")]
44
use crate::base_node::metrics;
45
use crate::{
46
    base_node::{
47
        comms_interface::{BlockEvent, BlockEvent::BlockSyncRewind},
48
        sync::{
49
            header_sync::HEADER_SYNC_INITIAL_MAX_HEADERS,
50
            rpc::{sync_utxos_task::SyncUtxosTask, BaseNodeSyncService},
51
        },
52
        LocalNodeCommsInterface,
53
    },
54
    chain_storage::{async_db::AsyncBlockchainDb, BlockAddResult, BlockchainBackend},
55
    iterators::NonOverlappingIntegerPairIter,
56
    proto,
57
    proto::base_node::{
58
        FindChainSplitRequest,
59
        FindChainSplitResponse,
60
        SyncBlocksRequest,
61
        SyncHeadersRequest,
62
        SyncKernelsRequest,
63
        SyncUtxosRequest,
64
        SyncUtxosResponse,
65
    },
66
};
67

68
const LOG_TARGET: &str = "c::base_node::sync_rpc";
69

70
pub struct BaseNodeSyncRpcService<B> {
71
    db: AsyncBlockchainDb<B>,
72
    active_sessions: Mutex<Vec<Weak<NodeId>>>,
73
    base_node_service: LocalNodeCommsInterface,
74
}
75

76
impl<B: BlockchainBackend + 'static> BaseNodeSyncRpcService<B> {
77
    pub fn new(db: AsyncBlockchainDb<B>, base_node_service: LocalNodeCommsInterface) -> Self {
52✔
78
        Self {
52✔
79
            db,
52✔
80
            active_sessions: Mutex::new(Vec::new()),
52✔
81
            base_node_service,
52✔
82
        }
52✔
83
    }
52✔
84

85
    #[inline]
86
    fn db(&self) -> AsyncBlockchainDb<B> {
5✔
87
        self.db.clone()
5✔
88
    }
5✔
89

90
    pub async fn try_add_exclusive_session(&self, peer: NodeId) -> Result<Arc<NodeId>, RpcStatus> {
1✔
91
        let mut lock = self.active_sessions.lock().await;
1✔
92
        *lock = lock.drain(..).filter(|l| l.strong_count() > 0).collect();
1✔
93
        debug!(target: LOG_TARGET, "Number of active sync sessions: {}", lock.len());
1✔
94

95
        if lock.iter().any(|p| p.upgrade().filter(|p| **p == peer).is_some()) {
1✔
96
            return Err(RpcStatus::forbidden(
×
97
                "Existing sync session found for this client. Only a single session is permitted",
×
98
            ));
×
99
        }
1✔
100

101
        let token = Arc::new(peer);
1✔
102
        lock.push(Arc::downgrade(&token));
1✔
103
        #[allow(clippy::cast_possible_wrap)]
104
        #[cfg(feature = "metrics")]
105
        metrics::active_sync_peers().set(lock.len() as i64);
1✔
106
        Ok(token)
1✔
107
    }
1✔
108
}
109

110
#[tari_comms::async_trait]
111
impl<B: BlockchainBackend + 'static> BaseNodeSyncService for BaseNodeSyncRpcService<B> {
112
    #[instrument(level = "trace", name = "sync_rpc::sync_blocks", skip(self), err)]
113
    #[allow(clippy::blocks_in_conditions)]
114
    async fn sync_blocks(
115
        &self,
116
        request: Request<SyncBlocksRequest>,
117
    ) -> Result<Streaming<proto::base_node::BlockBodyResponse>, RpcStatus> {
2✔
118
        let peer_node_id = request.context().peer_node_id().clone();
1✔
119
        let message = request.into_message();
1✔
120
        let mut block_event_stream = self.base_node_service.get_block_event_stream();
1✔
121

122
        let db = self.db();
1✔
123
        let hash = message
1✔
124
            .start_hash
1✔
125
            .try_into()
1✔
126
            .map_err(|_| RpcStatus::bad_request(&"Malformed starting hash received".to_string()))?;
1✔
127
        if db.fetch_block_by_hash(hash, true).await.is_err() {
1✔
128
            return Err(RpcStatus::not_found("Requested start block sync hash was not found"));
×
129
        }
1✔
130
        let start_header = db
1✔
131
            .fetch_header_by_block_hash(hash)
1✔
132
            .await
1✔
133
            .rpc_status_internal_error(LOG_TARGET)?
1✔
134
            .ok_or_else(|| RpcStatus::not_found("Header not found with given hash"))?;
1✔
135

136
        let metadata = db.get_chain_metadata().await.rpc_status_internal_error(LOG_TARGET)?;
×
137

138
        let start_height = start_header.height + 1;
×
139
        if start_height < metadata.pruned_height() {
×
140
            return Err(RpcStatus::bad_request(&format!(
×
141
                "Requested full block body at height {}, however this node has an effective pruned height of {}",
×
142
                start_height,
×
143
                metadata.pruned_height()
×
144
            )));
×
145
        }
×
146

147
        let hash = message
×
148
            .end_hash
×
149
            .try_into()
×
150
            .map_err(|_| RpcStatus::bad_request(&"Malformed end hash received".to_string()))?;
×
151
        if db.fetch_block_by_hash(hash, true).await.is_err() {
×
152
            return Err(RpcStatus::not_found("Requested end block sync hash was not found"));
×
153
        }
×
154

155
        let end_header = db
×
156
            .fetch_header_by_block_hash(hash)
×
157
            .await
×
158
            .rpc_status_internal_error(LOG_TARGET)?
×
159
            .ok_or_else(|| RpcStatus::not_found("Requested end block sync hash was not found"))?;
×
160

161
        let end_height = end_header.height;
×
162
        if start_height > end_height {
×
163
            return Err(RpcStatus::bad_request(&format!(
×
164
                "Start block #{start_height} is higher than end block #{end_height}"
×
165
            )));
×
166
        }
×
167

168
        debug!(
×
169
            target: LOG_TARGET,
×
170
            "Initiating block sync with peer `{peer_node_id}` from height {start_height} to {end_height}"
×
171
        );
172

173
        let session_token = self.try_add_exclusive_session(peer_node_id).await?;
×
174
        // Number of blocks to load and push to the stream before loading the next batch
175
        const BATCH_SIZE: usize = 2;
176
        let (tx, rx) = mpsc::channel(BATCH_SIZE);
×
177

178
        let span = span!(Level::TRACE, "sync_rpc::block_sync::inner_worker");
×
179
        let iter = NonOverlappingIntegerPairIter::new(start_height, end_height + 1, BATCH_SIZE)
×
180
            .map_err(|e| RpcStatus::bad_request(&e))?;
×
181
        task::spawn(
×
182
            async move {
×
183
                // Move token into this task
184
                let peer_node_id = session_token;
×
185
                for (start, end) in iter {
×
186
                    if tx.is_closed() {
×
187
                        break;
×
188
                    }
×
189

190
                    // Check for reorgs during sync
191
                    while let Ok(block_event) = block_event_stream.try_recv() {
×
192
                        if let BlockEvent::ValidBlockAdded(_, BlockAddResult::ChainReorg { removed, .. }) |  BlockSyncRewind(removed)  =
×
193
                            &*block_event
×
194
                        {
195
                            //add BlockSyncRewind(Vec<Arc<ChainBlock>>),
196
                            if let Some(reorg_block) = removed
×
197
                                .iter()
×
198
                                // If the reorg happens before the end height of sync we let the peer know that the chain they are syncing with has changed
199
                                .find(|block| block.height() <= end_height)
×
200
                            {
201
                                warn!(
×
202
                                    target: LOG_TARGET,
×
203
                                    "Block reorg/rewind detected at height {} during sync, letting the sync peer {} know.",
×
204
                                    reorg_block.height(),
×
205
                                    peer_node_id
206
                                );
207
                                let _result = tx.send(Err(RpcStatus::conflict(&format!(
×
208
                                    "Reorg at height {} detected",
×
209
                                    reorg_block.height()
×
210
                                ))));
×
211
                                return;
×
212
                            }
×
213
                        }
×
214
                    }
215

216
                    debug!(
×
217
                        target: LOG_TARGET,
×
218
                        "Sending blocks #{start} - #{end} to '{peer_node_id}'"
×
219
                    );
220
                    let blocks = db
×
221
                        .fetch_blocks(start..=end, true)
×
222
                        .await
×
223
                        .map_err(RpcStatus::log_internal_error(LOG_TARGET));
×
224

225
                    if tx.is_closed() {
×
226
                        debug!(
×
227
                            target: LOG_TARGET,
×
228
                            "Block sync session for peer '{peer_node_id}' terminated early"
×
229
                        );
230
                        break;
×
231
                    }
×
232

233
                    match blocks {
×
234
                        Ok(blocks) if blocks.is_empty() => {
×
235
                            break;
×
236
                        },
237
                        Ok(blocks) => {
×
238
                            let blocks = blocks.into_iter().map(|hb| {
×
239
                                let block = hb.into_block();
×
240
                                proto::base_node::BlockBodyResponse::try_from(block).map_err(|e| {
×
241
                                    log::error!(target: LOG_TARGET, "Internal error: {e}");
×
242
                                    RpcStatus::general_default()
×
243
                                })
×
244
                            });
×
245

246
                            // Ensure task stops if the peer prematurely stops their RPC session
247
                            if utils::mpsc::send_all(&tx, blocks).await.is_err() {
×
248
                                debug!(
×
249
                                    target: LOG_TARGET,
×
250
                                    "Block sync session for peer '{peer_node_id}' terminated early"
×
251
                                );
252
                                break;
×
253
                            }
×
254
                        },
255
                        Err(err) => {
×
256
                            let _result = tx.send(Err(err)).await;
×
257
                            break;
×
258
                        },
259
                    }
260
                }
261

262
                #[cfg(feature = "metrics")]
263
                metrics::active_sync_peers().dec();
×
264
                debug!(
×
265
                    target: LOG_TARGET,
×
266
                    "Block sync round complete for peer `{peer_node_id}`."
×
267
                );
268
            }
×
269
            .instrument(span),
×
270
        );
271

272
        Ok(Streaming::new(rx))
×
273
    }
2✔
274

275
    #[instrument(level = "trace", name = "sync_rpc::sync_headers", skip(self), err)]
276
    #[allow(clippy::blocks_in_conditions)]
277
    async fn sync_headers(
278
        &self,
279
        request: Request<SyncHeadersRequest>,
280
    ) -> Result<Streaming<proto::core::BlockHeader>, RpcStatus> {
×
281
        let db = self.db();
×
282
        let peer_node_id = request.context().peer_node_id().clone();
×
283
        let message = request.into_message();
×
284
        let hash = message
×
285
            .start_hash
×
286
            .try_into()
×
287
            .map_err(|_| RpcStatus::bad_request(&"Malformed starting hash received".to_string()))?;
×
288
        let start_header = db
×
289
            .fetch_header_by_block_hash(hash)
×
290
            .await
×
291
            .rpc_status_internal_error(LOG_TARGET)?
×
292
            .ok_or_else(|| RpcStatus::not_found("Header not found with given hash"))?;
×
293

294
        let mut count = message.count;
×
295
        if count == 0 {
×
296
            let tip_header = db.fetch_tip_header().await.rpc_status_internal_error(LOG_TARGET)?;
×
297
            count = tip_header.height().saturating_sub(start_header.height);
×
298
        }
×
299
        // if its the start(tip_header == tip), return empty
300
        if count == 0 {
×
301
            return Ok(Streaming::empty());
×
302
        }
×
303

304
        #[allow(clippy::cast_possible_truncation)]
305
        let chunk_size = cmp::min(100, count) as usize;
×
306
        debug!(
×
307
            target: LOG_TARGET,
×
308
            "Initiating header sync with peer `{}` from height {} to {} (chunk_size={})",
×
309
            peer_node_id,
310
            start_header.height,
311
            count,
312
            chunk_size
313
        );
314

315
        let session_token = self.try_add_exclusive_session(peer_node_id.clone()).await?;
×
316
        let (tx, rx) = mpsc::channel(chunk_size);
×
317
        let span = span!(Level::TRACE, "sync_rpc::sync_headers::inner_worker");
×
318
        let iter = NonOverlappingIntegerPairIter::new(
×
319
            start_header.height.saturating_add(1),
×
320
            start_header.height.saturating_add(count).saturating_add(1),
×
321
            chunk_size,
×
322
        )
323
        .map_err(|e| RpcStatus::bad_request(&e))?;
×
324
        task::spawn(
×
325
            async move {
×
326
                // Move token into this task
327
                let peer_node_id = session_token;
×
328
                for (start, end) in iter {
×
329
                    if tx.is_closed() {
×
330
                        break;
×
331
                    }
×
332
                    debug!(target: LOG_TARGET, "Sending headers #{start} - #{end}");
×
333
                    let headers = db
×
334
                        .fetch_headers(start..=end)
×
335
                        .await
×
336
                        .map_err(RpcStatus::log_internal_error(LOG_TARGET));
×
337

338
                    if tx.is_closed() {
×
339
                        debug!(
×
340
                            target: LOG_TARGET,
×
341
                            "Header sync session for peer '{peer_node_id}' terminated early"
×
342
                        );
343
                        break;
×
344
                    }
×
345
                    match headers {
×
346
                        Ok(headers) if headers.is_empty() => {
×
347
                            break;
×
348
                        },
349
                        Ok(headers) => {
×
350
                            let headers = headers.into_iter().map(proto::core::BlockHeader::from).map(Ok);
×
351
                            // Ensure task stops if the peer prematurely stops their RPC session
352
                            if utils::mpsc::send_all(&tx, headers).await.is_err() {
×
353
                                break;
×
354
                            }
×
355
                        },
356
                        Err(err) => {
×
357
                            let _result = tx.send(Err(err)).await;
×
358
                            break;
×
359
                        },
360
                    }
361
                }
362

363
                #[cfg(feature = "metrics")]
364
                metrics::active_sync_peers().dec();
×
365
                debug!(
×
366
                    target: LOG_TARGET,
×
367
                    "Header sync round complete for peer `{peer_node_id}`."
×
368
                );
369
            }
×
370
            .instrument(span),
×
371
        );
372

373
        Ok(Streaming::new(rx))
×
374
    }
×
375

376
    #[instrument(level = "trace", skip(self), err)]
377
    #[allow(clippy::blocks_in_conditions)]
378
    async fn get_header_by_height(
379
        &self,
380
        request: Request<u64>,
381
    ) -> Result<Response<proto::core::BlockHeader>, RpcStatus> {
×
382
        let height = request.into_message();
×
383
        let header = self
×
384
            .db()
×
385
            .fetch_header(height)
×
386
            .await
×
387
            .rpc_status_internal_error(LOG_TARGET)?
×
388
            .ok_or_else(|| RpcStatus::not_found(&format!("Header not found at height {height}")))?;
×
389

390
        Ok(Response::new(header.into()))
×
391
    }
×
392

393
    #[instrument(level = "trace", skip(self), err)]
394
    #[allow(clippy::blocks_in_conditions)]
395
    async fn find_chain_split(
396
        &self,
397
        request: Request<FindChainSplitRequest>,
398
    ) -> Result<Response<FindChainSplitResponse>, RpcStatus> {
6✔
399
        const MAX_ALLOWED_BLOCK_HASHES: usize = 1000;
400

401
        let peer = request.context().peer_node_id().clone();
3✔
402
        let message = request.into_message();
3✔
403
        if message.block_hashes.is_empty() {
3✔
404
            return Err(RpcStatus::bad_request(
×
405
                "Cannot find chain split because no hashes were sent",
×
406
            ));
×
407
        }
3✔
408
        if message.block_hashes.len() > MAX_ALLOWED_BLOCK_HASHES {
3✔
409
            return Err(RpcStatus::bad_request(&format!(
×
410
                "Cannot query more than {MAX_ALLOWED_BLOCK_HASHES} block hashes"
×
411
            )));
×
412
        }
3✔
413
        if message.header_count > (HEADER_SYNC_INITIAL_MAX_HEADERS as u64) {
3✔
414
            return Err(RpcStatus::bad_request(&format!(
×
415
                "Cannot ask for more than {HEADER_SYNC_INITIAL_MAX_HEADERS} headers"
×
416
            )));
×
417
        }
3✔
418

419
        let db = self.db();
3✔
420
        let hashes: Vec<FixedHash> = message
3✔
421
            .block_hashes
3✔
422
            .into_iter()
3✔
423
            .map(|hash| hash.try_into().map_err(|_| "Malformed pruned hash".to_string()))
5✔
424
            .collect::<Result<_, _>>()
3✔
425
            .map_err(|_| RpcStatus::bad_request(&"Malformed block hash received".to_string()))?;
3✔
426
        let maybe_headers = db
3✔
427
            .find_headers_after_hash(hashes, message.header_count)
3✔
428
            .await
3✔
429
            .rpc_status_internal_error(LOG_TARGET)?;
3✔
430
        match maybe_headers {
3✔
431
            Some((idx, headers)) => {
3✔
432
                debug!(
3✔
433
                    target: LOG_TARGET,
×
434
                    "Sending forked index {} and {} header(s) to peer `{}`",
×
435
                    idx,
436
                    headers.len(),
×
437
                    peer
438
                );
439

440
                Ok(Response::new(FindChainSplitResponse {
3✔
441
                    fork_hash_index: idx as u64,
3✔
442
                    headers: headers.into_iter().map(Into::into).collect(),
3✔
443
                }))
3✔
444
            },
445
            None => {
446
                debug!(
×
447
                    target: LOG_TARGET,
×
448
                    "Unable to find link to main chain from peer `{peer}`"
×
449
                );
450
                Err(RpcStatus::not_found("No link found to main chain"))
×
451
            },
452
        }
453
    }
6✔
454

455
    #[instrument(level = "trace", skip(self), err)]
456
    #[allow(clippy::blocks_in_conditions)]
457
    async fn get_chain_metadata(&self, _: Request<()>) -> Result<Response<proto::base_node::ChainMetadata>, RpcStatus> {
×
458
        let chain_metadata = self
×
459
            .db()
×
460
            .get_chain_metadata()
×
461
            .await
×
462
            .rpc_status_internal_error(LOG_TARGET)?;
×
463
        Ok(Response::new(chain_metadata.into()))
×
464
    }
×
465

466
    #[instrument(level = "trace", skip(self), err)]
467
    #[allow(clippy::blocks_in_conditions)]
468
    async fn sync_kernels(
469
        &self,
470
        request: Request<SyncKernelsRequest>,
471
    ) -> Result<Streaming<proto::types::TransactionKernel>, RpcStatus> {
×
472
        let peer_node_id = request.context().peer_node_id().clone();
×
473
        let req = request.into_message();
×
474
        let (tx, rx) = mpsc::channel(100);
×
475
        let db = self.db();
×
476

477
        let start_header = db
×
478
            .fetch_header_containing_kernel_mmr(req.start)
×
479
            .await
×
480
            .rpc_status_internal_error(LOG_TARGET)?
×
481
            .into_header();
×
482
        let hash = req
×
483
            .end_header_hash
×
484
            .try_into()
×
485
            .map_err(|_| RpcStatus::bad_request(&"Malformed end hash received".to_string()))?;
×
486
        let end_header = db
×
487
            .fetch_header_by_block_hash(hash)
×
488
            .await
×
489
            .rpc_status_internal_error(LOG_TARGET)?
×
490
            .ok_or_else(|| RpcStatus::not_found("Unknown end header"))?;
×
491

492
        let mut current_height = start_header.height;
×
493
        let end_height = end_header.height;
×
494
        let mut current_mmr_position = start_header.kernel_mmr_size;
×
495
        let mut current_header_hash = start_header.hash();
×
496

497
        if current_height > end_height {
×
498
            return Err(RpcStatus::bad_request("start header height is after end header"));
×
499
        }
×
500

501
        let session_token = self.try_add_exclusive_session(peer_node_id).await?;
×
502
        task::spawn(async move {
×
503
            // Move session token into task
504
            let peer_node_id = session_token;
×
505
            while current_height <= end_height {
×
506
                if tx.is_closed() {
×
507
                    break;
×
508
                }
×
509
                let res = db
×
510
                    .fetch_kernels_in_block(current_header_hash)
×
511
                    .await
×
512
                    .map_err(RpcStatus::log_internal_error(LOG_TARGET));
×
513

514
                if tx.is_closed() {
×
515
                    debug!(
×
516
                        target: LOG_TARGET,
×
517
                        "Kernel sync session for peer '{peer_node_id}' terminated early",
×
518
                    );
519
                    break;
×
520
                }
×
521

522
                match res {
×
523
                    Ok(kernels) if kernels.is_empty() => {
×
524
                        let _result = tx
×
525
                            .send(Err(RpcStatus::general(&format!(
×
526
                                "No kernels in block {}",
×
527
                                current_header_hash.to_hex()
×
528
                            ))))
×
529
                            .await;
×
530
                        break;
×
531
                    },
532
                    Ok(kernels) => {
×
533
                        debug!(
×
534
                            target: LOG_TARGET,
×
535
                            "Streaming kernels {} to {}",
×
536
                            current_mmr_position,
537
                            current_mmr_position + kernels.len() as u64
×
538
                        );
539
                        current_mmr_position += kernels.len() as u64;
×
540
                        let kernels = kernels.into_iter().map(proto::types::TransactionKernel::from).map(Ok);
×
541
                        // Ensure task stops if the peer prematurely stops their RPC session
542
                        if utils::mpsc::send_all(&tx, kernels).await.is_err() {
×
543
                            break;
×
544
                        }
×
545
                    },
546
                    Err(err) => {
×
547
                        let _result = tx.send(Err(err)).await;
×
548
                        break;
×
549
                    },
550
                }
551

552
                current_height += 1;
×
553

554
                if current_height <= end_height {
×
555
                    let res = db
×
556
                        .fetch_header(current_height)
×
557
                        .await
×
558
                        .map_err(RpcStatus::log_internal_error(LOG_TARGET));
×
559
                    match res {
×
560
                        Ok(Some(header)) => {
×
561
                            current_header_hash = header.hash();
×
562
                        },
×
563
                        Ok(None) => {
564
                            let _result = tx
×
565
                                .send(Err(RpcStatus::not_found(&format!(
×
566
                                    "Could not find header #{current_height} while streaming UTXOs after position \
×
567
                                     {current_mmr_position}"
×
568
                                ))))
×
569
                                .await;
×
570
                            break;
×
571
                        },
572
                        Err(err) => {
×
573
                            error!(target: LOG_TARGET, "DB error while streaming kernels: {err}");
×
574
                            let _result = tx
×
575
                                .send(Err(RpcStatus::general("DB error while streaming kernels")))
×
576
                                .await;
×
577
                            break;
×
578
                        },
579
                    }
580
                }
×
581
            }
582

583
            #[cfg(feature = "metrics")]
584
            metrics::active_sync_peers().dec();
×
585
            debug!(
×
586
                target: LOG_TARGET,
×
587
                "Kernel sync round complete for peer `{peer_node_id}`.",
×
588
            );
589
        });
×
590
        Ok(Streaming::new(rx))
×
591
    }
×
592

593
    #[instrument(level = "trace", skip(self), err)]
594
    #[allow(clippy::blocks_in_conditions)]
595
    async fn sync_utxos(&self, request: Request<SyncUtxosRequest>) -> Result<Streaming<SyncUtxosResponse>, RpcStatus> {
2✔
596
        let req = request.message();
1✔
597
        let peer_node_id = request.context().peer_node_id();
1✔
598
        debug!(
1✔
599
            target: LOG_TARGET,
×
600
            "Received sync_utxos-{} request from header {} to {}",
×
601
            peer_node_id,
602
            req.start_header_hash.to_hex(),
×
603
            req.end_header_hash.to_hex(),
×
604
        );
605

606
        let session_token = self.try_add_exclusive_session(peer_node_id.clone()).await?;
1✔
607
        let (tx, rx) = mpsc::channel(200);
1✔
608
        let task = SyncUtxosTask::new(self.db(), session_token);
1✔
609
        task.run(request, tx).await?;
1✔
610

611
        Ok(Streaming::new(rx))
×
612
    }
2✔
613
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc