• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tari-project / tari / 19468834672

18 Nov 2025 02:01PM UTC coverage: 51.294% (-0.3%) from 51.544%
19468834672

push

github

web-flow
feat: add coin selection and spending via bins or buckets (#7584)

Description
---

Add range limit coin-join:
- Added an unspent output coin distribution gRPC method
('CoinHistogramRequest'), whereby the wallet will return the amount and
value of coins in a pre-set range of buckets.
- Added a range limit coin-join gRPC method ('RangeLimitCoinJoin') to
the wallet, whereby the user can specify the minimum target amount,
maximum number of inputs, dust lower bound (inclusive), dust upper bound
(exclusive) and fee. Transaction size will be limited to the specified
maximum number of inputs, and multiple outputs will be created according
to the minimum target amount. All the inputs in the range will be spent,
unless the total available amount does not meet the minimum target
amount.

Closes #7582.

Motivation and Context
---
See #7582.

How Has This Been Tested?
---
System-level testing

gRPC **CoinHistogram** method
```
{
  "buckets": [
    {
      "count": "0",
      "total_amount": "0",
      "lower_bound": "0",
      "upper_bound": "1000"
    },
    {
      "count": "0",
      "total_amount": "0",
      "lower_bound": "1000",
      "upper_bound": "100000"
    },
    {
      "count": "2",
      "total_amount": "1165548",
      "lower_bound": "100000",
      "upper_bound": "1000000"
    },
    {
      "count": "158",
      "total_amount": "1455989209",
      "lower_bound": "1000000",
      "upper_bound": "1000000000"
    },
    {
      "count": "0",
      "total_amount": "0",
      "lower_bound": "1000000000",
      "upper_bound": "100000000000"
    },
    {
      "count": "0",
      "total_amount": "0",
      "lower_bound": "100000000000",
      "upper_bound": "21000000000000000"
    }
  ]
}
```

gRPC **RangeLimitCoinJoin** method

In this example, two transactions were created, bounded by the 350 input
size limit. gRPC client view:

<img width="897" height="396" alt="image"
src="https://github.com/user-attachments/assets/6c5ae857-8a01-4c90-9c55-1eee2fbd... (continued)

0 of 636 new or added lines in 11 files covered. (0.0%)

17 existing lines in 8 files now uncovered.

59180 of 115373 relevant lines covered (51.29%)

7948.46 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

99.04
/comms/dht/src/connectivity/test.rs
1
//  Copyright 2020, The Tari Project
2
//
3
//  Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
4
//  following conditions are met:
5
//
6
//  1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
7
//  disclaimer.
8
//
9
//  2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
10
//  following disclaimer in the documentation and/or other materials provided with the distribution.
11
//
12
//  3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
13
//  products derived from this software without specific prior written permission.
14
//
15
//  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
16
//  INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17
//  DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
18
//  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
19
//  SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
20
//  WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
21
//  USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
22

23
#![allow(clippy::indexing_slicing)]
24
use std::{iter::repeat_with, sync::Arc, time::Duration};
25

26
use rand::{rngs::OsRng, seq::SliceRandom};
27
use tari_comms::{
28
    connectivity::ConnectivityEvent,
29
    peer_manager::{Peer, PeerFeatures, STALE_PEER_THRESHOLD_DURATION},
30
    test_utils::{
31
        count_string_occurrences,
32
        mocks::{create_connectivity_mock, create_dummy_peer_connection, ConnectivityManagerMockState},
33
        node_identity::ordered_node_identities_by_distance,
34
    },
35
    Minimized,
36
    NodeIdentity,
37
    PeerManager,
38
};
39
use tari_shutdown::Shutdown;
40
use tari_test_utils::async_assert;
41
use tokio::sync::broadcast;
42

43
use crate::{
44
    connectivity::{DhtConnectivity, MetricsCollector},
45
    test_utils::{
46
        build_peer_manager,
47
        create_dht_actor_mock,
48
        create_good_standing_peer,
49
        make_node_identity,
50
        DhtMockState,
51
    },
52
    DhtConfig,
53
};
54

55
async fn setup(
4✔
56
    config: DhtConfig,
4✔
57
    node_identity: Arc<NodeIdentity>,
4✔
58
    initial_peers: Vec<Peer>,
4✔
59
) -> (
4✔
60
    DhtConnectivity,
4✔
61
    DhtMockState,
4✔
62
    ConnectivityManagerMockState,
4✔
63
    Arc<PeerManager>,
4✔
64
    Arc<NodeIdentity>,
4✔
65
    Shutdown,
4✔
66
) {
4✔
67
    let peer_manager = build_peer_manager();
4✔
68
    for peer in initial_peers {
25✔
69
        peer_manager.add_or_update_peer(peer).await.unwrap();
21✔
70
    }
71

72
    let shutdown = Shutdown::new();
4✔
73
    let (connectivity, mock) = create_connectivity_mock();
4✔
74
    let connectivity_state = mock.get_shared_state();
4✔
75
    mock.spawn();
4✔
76
    let (dht_requester, mock) = create_dht_actor_mock();
4✔
77
    let dht_state = mock.get_shared_state();
4✔
78
    mock.spawn();
4✔
79
    let (event_publisher, _) = broadcast::channel(1);
4✔
80

81
    let dht_connectivity = DhtConnectivity::new(
4✔
82
        Arc::new(config),
4✔
83
        peer_manager.clone(),
4✔
84
        node_identity.clone(),
4✔
85
        connectivity,
4✔
86
        dht_requester,
4✔
87
        event_publisher.subscribe(),
4✔
88
        MetricsCollector::spawn(),
4✔
89
        shutdown.to_signal(),
4✔
90
    );
91

92
    (
4✔
93
        dht_connectivity,
4✔
94
        dht_state,
4✔
95
        connectivity_state,
4✔
96
        peer_manager,
4✔
97
        node_identity,
4✔
98
        shutdown,
4✔
99
    )
4✔
100
}
4✔
101

102
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
103
async fn initialize() {
1✔
104
    let config = DhtConfig {
1✔
105
        num_neighbouring_nodes: 4,
1✔
106
        num_random_nodes: 2,
1✔
107
        ..Default::default()
1✔
108
    };
1✔
109
    let peers = repeat_with(|| create_good_standing_peer(&make_node_identity()))
10✔
110
        .take(10)
1✔
111
        .collect();
1✔
112
    let (dht_connectivity, _, connectivity, peer_manager, node_identity, _shutdown) =
1✔
113
        setup(config, make_node_identity(), peers).await;
1✔
114
    dht_connectivity.spawn();
1✔
115
    let neighbours = peer_manager
1✔
116
        .closest_n_active_peers(
1✔
117
            node_identity.node_id(),
1✔
118
            4,
1✔
119
            &[],
1✔
120
            Some(PeerFeatures::COMMUNICATION_NODE),
1✔
121
            None,
1✔
122
            Some(STALE_PEER_THRESHOLD_DURATION),
1✔
123
            true,
1✔
124
            None,
1✔
125
            false,
1✔
126
        )
1✔
127
        .await
1✔
128
        .unwrap()
1✔
129
        .into_iter()
1✔
130
        .map(|p| p.node_id)
1✔
131
        .collect::<Vec<_>>();
1✔
132

133
    // Wait for calls to add peers
134
    async_assert!(
1✔
135
        connectivity.get_dialed_peers().await.len() >= 2,
1✔
136
        max_attempts = 20,
UNCOV
137
        interval = Duration::from_millis(10),
×
138
    );
139

140
    // Check that neighbours are added
141
    for neighbour in &neighbours {
4✔
142
        connectivity.expect_dial_peer(neighbour).await;
4✔
143
    }
1✔
144
}
1✔
145

146
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
147
async fn added_neighbours() {
1✔
148
    // env_logger::init(); // Set `$env:RUST_LOG = "trace"` // Pipe to `> .\target\output.log 2>&1`
149
    let node_identity = make_node_identity();
1✔
150
    let mut node_identities =
1✔
151
        ordered_node_identities_by_distance(node_identity.node_id(), 6, PeerFeatures::COMMUNICATION_NODE);
1✔
152
    // Closest to this node
153
    let closer_peer = node_identities.remove(0);
1✔
154
    let mut peers = node_identities.iter().map(|ni| ni.to_peer()).collect::<Vec<_>>();
5✔
155
    for peer in &mut peers {
6✔
156
        let addresses: Vec<_> = peer.addresses.address_iter().cloned().collect();
5✔
157
        for addr in &addresses {
10✔
158
            peer.addresses.mark_last_seen_now(addr);
5✔
159
        }
5✔
160
    }
161

162
    let config = DhtConfig {
1✔
163
        num_neighbouring_nodes: 5,
1✔
164
        num_random_nodes: 0,
1✔
165
        ..Default::default()
1✔
166
    };
1✔
167
    let peer_node_ids = peers.iter().map(|p| p.node_id.clone()).collect::<Vec<_>>();
5✔
168
    let (dht_connectivity, _, connectivity, peer_manager, _, _shutdown) = setup(config, node_identity, peers).await;
1✔
169

170
    let added_peers = peer_manager.get_peers_by_node_ids(&peer_node_ids).await.unwrap();
1✔
171
    assert!(added_peers
1✔
172
        .iter()
1✔
173
        .any(|p| peer_node_ids.iter().any(|node_id| node_id == &p.node_id)));
5✔
174
    assert!(peer_node_ids
1✔
175
        .iter()
1✔
176
        .any(|p| peer_node_ids.iter().any(|node_id| node_id == p)));
1✔
177

178
    dht_connectivity.spawn();
1✔
179

180
    // Wait for calls to add peers
181
    async_assert!(
1✔
182
        connectivity.call_count().await >= 1,
2✔
183
        max_attempts = 20,
184
        interval = Duration::from_millis(10),
1✔
185
    );
186

187
    let calls = connectivity.take_calls().await;
1✔
188
    assert_eq!(count_string_occurrences(&calls, &["DialPeer"]), 5);
1✔
189

190
    let (conn, _) = create_dummy_peer_connection(closer_peer.node_id().clone());
1✔
191
    connectivity.publish_event(ConnectivityEvent::PeerConnected(conn.clone().into()));
1✔
192

193
    async_assert!(
1✔
194
        connectivity.get_dialed_peers().await.len() >= 5,
1✔
195
        max_attempts = 20,
196
        interval = Duration::from_millis(50),
×
197
    );
198

199
    // 1 for this test, 1 for the connectivity manager [FLAKY test, sometimes it is 3]
200
    assert!(conn.handle_count() == 2 || conn.handle_count() == 3);
1✔
201
}
1✔
202

203
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
204
async fn replace_peer_when_peer_goes_offline() {
1✔
205
    let node_identity = make_node_identity();
1✔
206
    let node_identities =
1✔
207
        ordered_node_identities_by_distance(node_identity.node_id(), 6, PeerFeatures::COMMUNICATION_NODE);
1✔
208
    // Closest to this node
209
    let peers = node_identities
1✔
210
        .iter()
1✔
211
        .map(|ni| create_good_standing_peer(ni))
6✔
212
        .collect::<Vec<_>>();
1✔
213

214
    let config = DhtConfig {
1✔
215
        num_neighbouring_nodes: 5,
1✔
216
        num_random_nodes: 0,
1✔
217
        ..Default::default()
1✔
218
    };
1✔
219
    let (dht_connectivity, _, connectivity, _, _, _shutdown) = setup(config, node_identity, peers).await;
1✔
220
    dht_connectivity.spawn();
1✔
221

222
    // Wait for calls to dial peers
223
    async_assert!(
1✔
224
        connectivity.call_count().await >= 6,
2✔
225
        max_attempts = 20,
226
        interval = Duration::from_millis(10),
1✔
227
    );
228
    let _result = connectivity.take_calls().await;
1✔
229

230
    let dialed = connectivity.take_dialed_peers().await;
1✔
231
    assert_eq!(dialed.len(), 5);
1✔
232

233
    connectivity.publish_event(ConnectivityEvent::PeerDisconnected(
1✔
234
        node_identities[4].node_id().clone(),
1✔
235
        Minimized::No,
1✔
236
    ));
1✔
237

238
    async_assert!(
1✔
239
        connectivity.call_count().await >= 1,
2✔
240
        max_attempts = 20,
241
        interval = Duration::from_millis(10),
1✔
242
    );
243

244
    let _result = connectivity.take_calls().await;
1✔
245
    // Redial
246
    let dialed = connectivity.take_dialed_peers().await;
1✔
247
    assert_eq!(dialed.len(), 1);
1✔
248
    assert_eq!(dialed[0], *node_identities[4].node_id());
1✔
249

250
    connectivity.publish_event(ConnectivityEvent::PeerConnectFailed(
1✔
251
        node_identities[4].node_id().clone(),
1✔
252
    ));
1✔
253

254
    async_assert!(
1✔
255
        connectivity.call_count().await >= 1,
2✔
256
        max_attempts = 20,
257
        interval = Duration::from_millis(10),
1✔
258
    );
259

260
    // Check that the next closer neighbour was added to the pool
261
    let dialed = connectivity.take_dialed_peers().await;
1✔
262
    assert_eq!(dialed.len(), 1);
1✔
263
    assert_eq!(dialed[0], *node_identities[5].node_id());
1✔
264
}
1✔
265

266
#[tokio::test]
267
async fn insert_neighbour() {
1✔
268
    let node_identity = make_node_identity();
1✔
269
    let node_identities =
1✔
270
        ordered_node_identities_by_distance(node_identity.node_id(), 10, PeerFeatures::COMMUNICATION_NODE);
1✔
271

272
    let config = DhtConfig {
1✔
273
        num_neighbouring_nodes: 8,
1✔
274
        ..Default::default()
1✔
275
    };
1✔
276
    let (mut dht_connectivity, _, _, _, _, _) = setup(config, node_identity.clone(), vec![]).await;
1✔
277

278
    let shuffled = {
1✔
279
        let mut v = node_identities.clone();
1✔
280
        v.shuffle(&mut OsRng);
1✔
281
        v
1✔
282
    };
283

284
    // First 8 inserts should not remove a peer (because num_neighbouring_nodes == 8)
285
    for ni in shuffled.iter().take(8) {
8✔
286
        assert!(dht_connectivity
8✔
287
            .insert_neighbour_ordered_by_distance(ni.node_id().clone())
8✔
288
            .is_none());
8✔
289
    }
290

291
    // Next 2 inserts will always remove a node id
292
    for ni in shuffled.iter().skip(8) {
2✔
293
        assert!(dht_connectivity
2✔
294
            .insert_neighbour_ordered_by_distance(ni.node_id().clone())
2✔
295
            .is_some())
2✔
296
    }
297

298
    // Check the first 7 node ids match our neighbours, the last element depends on distance and ordering of inserts
299
    // (these are random). insert_neighbour only cares about inserting the element in the right order and preserving
300
    // the length of the neighbour list. It doesnt care if it kicks out a closer peer (that is left for the
301
    // calling code).
302
    let ordered_node_ids = node_identities
1✔
303
        .iter()
1✔
304
        .take(7)
1✔
305
        .map(|ni| ni.node_id())
7✔
306
        .cloned()
1✔
307
        .collect::<Vec<_>>();
1✔
308
    assert_eq!(&dht_connectivity.neighbours[..7], ordered_node_ids.as_slice());
1✔
309
}
1✔
310

311
mod metrics {
312
    mod collector {
313
        use tari_comms::peer_manager::NodeId;
314

315
        use crate::connectivity::MetricsCollector;
316

317
        #[tokio::test]
318
        async fn it_adds_message_received() {
1✔
319
            let mut metric_collector = MetricsCollector::spawn();
1✔
320
            let node_id = NodeId::default();
1✔
321
            (0..100).for_each(|_| {
100✔
322
                assert!(metric_collector.write_metric_message_received(node_id.clone()));
100✔
323
            });
100✔
324

325
            let ts = metric_collector
1✔
326
                .get_messages_received_timeseries(node_id)
1✔
327
                .await
1✔
328
                .unwrap();
1✔
329
            assert_eq!(ts.count(), 100);
1✔
330
        }
1✔
331

332
        #[tokio::test]
333
        async fn it_clears_the_metrics() {
1✔
334
            let mut metric_collector = MetricsCollector::spawn();
1✔
335
            let node_id = NodeId::default();
1✔
336
            assert!(metric_collector.write_metric_message_received(node_id.clone()));
1✔
337

338
            metric_collector.clear_metrics(node_id.clone()).await.unwrap();
1✔
339
            let ts = metric_collector
1✔
340
                .get_messages_received_timeseries(node_id)
1✔
341
                .await
1✔
342
                .unwrap();
1✔
343
            assert_eq!(ts.count(), 0);
1✔
344
        }
1✔
345
    }
346
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc