• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tari-project / tari / 16272458029

14 Jul 2025 04:32PM UTC coverage: 57.167% (-0.9%) from 58.047%
16272458029

push

github

web-flow
feat: modify soft disconnect criteria (#7307)

Description
---
We can be more efficient with soft disconnects when we compare against
expected RPC sessions and substream counts. This PR adds finer
discernment when doing soft peer disconnects.

Motivation and Context
---
The health check opens 2 substreams and 0 PRC sessions - that should
result in a disconnect if those are the only opened resources.

How Has This Been Tested?
---
System-level testing
```rust
2025-07-11 13:52:26.703289300 [comms::connection_manager::peer_connection] TRACE Hard disconnect - requester: 'Health check', peer: `d7c289e9e3c8377705ce599a96`, RPC clients: 0, substreams 2
2025-07-11 13:52:26.705658100 [comms::connection_manager::peer_connection] TRACE Soft disconnect - requester: 'Health check', peer: `0984896e74022c442c1034852c`, RPC clients: 1, substreams 3, NOT disconnecting
2025-07-11 13:52:26.705735900 [comms::connection_manager::peer_connection] TRACE Hard disconnect - requester: 'Health check', peer: `d025bc9e4bd423a9b304c491b8`, RPC clients: 0, substreams 2
2025-07-11 13:52:26.707647400 [comms::connection_manager::peer_connection] TRACE Hard disconnect - requester: 'Health check', peer: `51af08aff11f7129b4681d9950`, RPC clients: 0, substreams 2
```

What process can a PR reviewer use to test or verify this change?
---
Code review

<!-- Checklist -->
<!-- 1. Is the title of your PR in the form that would make nice release
notes? The title, excluding the conventional commit
tag, will be included exactly as is in the CHANGELOG, so please think
about it carefully. -->


Breaking Changes
---

- [x] None
- [ ] Requires data directory on base node to be deleted
- [ ] Requires hard fork
- [ ] Other - Please specify

<!-- Does this include a breaking change? If so, include this line as a
footer -->
<!-- BREAKING CHANGE: Description what the user should do, e.g. delete a
database, resync the chain -->


<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
##... (continued)

31 of 46 new or added lines in 6 files covered. (67.39%)

1102 existing lines in 27 files now uncovered.

68701 of 120177 relevant lines covered (57.17%)

226749.69 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.0
/base_layer/core/src/base_node/sync/rpc/tests.rs
1
//  Copyright 2020, The Tari Project
2
//
3
//  Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
4
//  following conditions are met:
5
//
6
//  1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
7
//  disclaimer.
8
//
9
//  2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
10
//  following disclaimer in the documentation and/or other materials provided with the distribution.
11
//
12
//  3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
13
//  products derived from this software without specific prior written permission.
14
//
15
//  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
16
//  INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17
//  DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
18
//  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
19
//  SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
20
//  WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
21
//  USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
22

23
use futures::StreamExt;
24
use tari_comms::protocol::rpc::{mock::RpcRequestMock, RpcStatusCode};
25
use tari_service_framework::reply_channel;
26
use tari_test_utils::{streams::convert_mpsc_to_stream, unpack_enum};
27
use tokio::sync::broadcast;
28

29
use super::BaseNodeSyncRpcService;
30
use crate::{
31
    base_node::{BaseNodeSyncService, LocalNodeCommsInterface},
32
    chain_storage::BlockchainDatabase,
33
    proto::base_node::{SyncBlocksRequest, SyncUtxosRequest},
34
    test_helpers::{
35
        blockchain::{create_main_chain, create_new_blockchain, TempDatabase},
36
        create_peer_manager,
37
    },
38
};
39

40
fn setup() -> (
5✔
41
    BaseNodeSyncRpcService<TempDatabase>,
5✔
42
    BlockchainDatabase<TempDatabase>,
5✔
43
    RpcRequestMock,
5✔
44
) {
5✔
45
    let peer_manager = create_peer_manager();
5✔
46
    let request_mock = RpcRequestMock::new(peer_manager);
5✔
47

5✔
48
    let db = create_new_blockchain();
5✔
49
    let (req_tx, _) = reply_channel::unbounded();
5✔
50
    let (block_tx, _) = reply_channel::unbounded();
5✔
51
    let (block_event_tx, _) = broadcast::channel(1);
5✔
52
    let service = BaseNodeSyncRpcService::new(
5✔
53
        db.clone().into(),
5✔
54
        LocalNodeCommsInterface::new(req_tx, block_tx, block_event_tx),
5✔
55
    );
5✔
56
    (service, db, request_mock)
5✔
57
}
5✔
58

59
mod sync_blocks {
60
    use super::*;
61

62
    #[tokio::test]
63
    async fn it_returns_not_found_if_unknown_hash() {
1✔
64
        let (service, _, rpc_request_mock) = setup();
1✔
65
        let msg = SyncBlocksRequest {
1✔
66
            start_hash: vec![0; 32],
1✔
67
            end_hash: vec![0; 32],
1✔
68
        };
1✔
69
        let req = rpc_request_mock.request_with_context(Default::default(), msg);
1✔
70
        let err = service.sync_blocks(req).await.unwrap_err();
1✔
71
        unpack_enum!(RpcStatusCode::NotFound = err.as_status_code());
1✔
72
    }
1✔
73

74
    #[tokio::test]
75
    async fn it_sends_bad_request_on_bad_response() {
1✔
76
        let (service, db, rpc_request_mock) = setup();
1✔
77

1✔
78
        let (_, chain) = create_main_chain(&db, block_specs!(["A->GB"])).await;
1✔
79

1✔
80
        let block = chain.get("A").unwrap();
1✔
UNCOV
81
        let msg = SyncBlocksRequest {
×
UNCOV
82
            start_hash: block.hash().to_vec(),
×
UNCOV
83
            end_hash: block.hash().to_vec(),
×
UNCOV
84
        };
×
UNCOV
85
        let req = rpc_request_mock.request_with_context(Default::default(), msg);
×
UNCOV
86
        assert!(service.sync_blocks(req).await.is_err());
×
87
    }
1✔
88

89
    #[tokio::test]
90
    async fn it_streams_blocks_until_end() {
1✔
91
        let (service, db, rpc_request_mock) = setup();
1✔
92

1✔
93
        let (_, chain) = create_main_chain(&db, block_specs!(["A->GB"], ["B->A"], ["C->B"], ["D->C"], ["E->D"])).await;
1✔
94

1✔
95
        let first_block = chain.get("A").unwrap();
1✔
UNCOV
96
        let last_block = chain.get("E").unwrap();
×
UNCOV
97

×
UNCOV
98
        let msg = SyncBlocksRequest {
×
UNCOV
99
            start_hash: first_block.hash().to_vec(),
×
UNCOV
100
            end_hash: last_block.hash().to_vec(),
×
UNCOV
101
        };
×
UNCOV
102
        let req = rpc_request_mock.request_with_context(Default::default(), msg);
×
103
        let mut streaming = service.sync_blocks(req).await.unwrap().into_inner();
1✔
104
        let blocks = convert_mpsc_to_stream(&mut streaming)
1✔
UNCOV
105
            .map(|block| block.unwrap())
×
UNCOV
106
            .collect::<Vec<_>>()
×
UNCOV
107
            .await;
×
108

1✔
109
        assert_eq!(blocks.len(), 4);
1✔
110
        blocks.iter().zip(["B", "C", "D", "E"]).for_each(|(block, name)| {
1✔
UNCOV
111
            assert_eq!(*chain.get(name).unwrap().hash(), block.hash);
×
112
        });
1✔
113
    }
1✔
114
}
115

116
mod sync_utxos {
117
    use super::*;
118

119
    #[tokio::test]
120
    async fn it_returns_not_found_if_unknown_hash() {
1✔
121
        let (service, db, rpc_request_mock) = setup();
1✔
122
        let gen_block_hash = db.fetch_header(0).unwrap().unwrap().hash();
1✔
123
        let msg = SyncUtxosRequest {
1✔
124
            start_header_hash: gen_block_hash.to_vec(),
1✔
125
            end_header_hash: vec![0; 32],
1✔
126
        };
1✔
127
        let req = rpc_request_mock.request_with_context(Default::default(), msg);
1✔
128
        let err = service.sync_utxos(req).await.unwrap_err();
1✔
129
        unpack_enum!(RpcStatusCode::NotFound = err.as_status_code());
1✔
130
    }
1✔
131

132
    #[tokio::test]
133
    async fn it_returns_not_found_if_start_not_found() {
1✔
134
        let (service, db, rpc_request_mock) = setup();
1✔
135
        let (_, chain) = create_main_chain(&db, block_specs!(["A->GB"])).await;
1✔
136
        let gb = chain.get("GB").unwrap();
1✔
UNCOV
137
        let msg = SyncUtxosRequest {
×
UNCOV
138
            start_header_hash: vec![0; 32],
×
UNCOV
139
            end_header_hash: gb.hash().to_vec(),
×
UNCOV
140
        };
×
UNCOV
141
        let req = rpc_request_mock.request_with_context(Default::default(), msg);
×
142
        let err = service.sync_utxos(req).await.unwrap_err();
1✔
143
        unpack_enum!(RpcStatusCode::NotFound = err.as_status_code());
1✔
144
    }
1✔
145
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc