• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tari-project / tari / 18097567115

29 Sep 2025 12:50PM UTC coverage: 58.554% (-2.3%) from 60.88%
18097567115

push

github

web-flow
chore(ci): switch rust toolchain to stable (#7524)

Description
switch rust toolchain to stable

Motivation and Context
use stable rust toolchain


<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->

## Summary by CodeRabbit

* **Chores**
* Standardized Rust toolchain on stable across CI workflows for more
predictable builds.
* Streamlined setup by removing unnecessary components and aligning
toolchain configuration with environment variables.
  * Enabled an environment flag to improve rustup behavior during CI.
* Improved coverage workflow consistency with dynamic toolchain
selection.

* **Tests**
* Removed nightly-only requirements, simplifying test commands and
improving compatibility.
* Expanded CI triggers to include ci-* branches for better pre-merge
validation.
* Maintained existing job logic while improving reliability and
maintainability.

<!-- end of auto-generated comment: release notes by coderabbit.ai -->

66336 of 113291 relevant lines covered (58.55%)

551641.45 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.04
/base_layer/core/src/mempool/sync_protocol/test.rs
1
//  Copyright 2020, The Tari Project
2
//
3
//  Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
4
//  following conditions are met:
5
//
6
//  1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
7
//  disclaimer.
8
//
9
//  2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
10
//  following disclaimer in the documentation and/or other materials provided with the distribution.
11
//
12
//  3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
13
//  products derived from this software without specific prior written permission.
14
//
15
//  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
16
//  INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17
//  DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
18
//  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
19
//  SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
20
//  WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
21
//  USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
22

23
#![allow(clippy::indexing_slicing)]
24
use std::{fmt, io, sync::Arc};
25

26
use futures::{Sink, SinkExt, Stream, StreamExt};
27
use tari_common::configuration::Network;
28
use tari_comms::{
29
    connectivity::ConnectivityEvent,
30
    framing,
31
    memsocket::MemorySocket,
32
    message::MessageExt,
33
    peer_manager::PeerFeatures,
34
    protocol::{ProtocolEvent, ProtocolNotification, ProtocolNotificationTx},
35
    test_utils::{
36
        mocks::{create_connectivity_mock, create_peer_connection_mock_pair, ConnectivityManagerMockState},
37
        node_identity::build_node_identity,
38
    },
39
    Bytes,
40
    BytesMut,
41
};
42
use tari_transaction_components::{tari_amount::uT, test_helpers::create_tx, transaction_components::Transaction};
43
use tari_transaction_key_manager::create_memory_db_key_manager;
44
use tari_utilities::ByteArray;
45
use tokio::{
46
    sync::{broadcast, mpsc},
47
    task,
48
};
49

50
use crate::{
51
    consensus::BaseNodeConsensusManager,
52
    mempool::{
53
        proto,
54
        sync_protocol::{MempoolPeerProtocol, MempoolSyncProtocol, MAX_FRAME_SIZE, MEMPOOL_SYNC_PROTOCOL},
55
        Mempool,
56
    },
57
    validation::mocks::MockValidator,
58
};
59

60
pub async fn create_transactions(n: usize) -> Vec<Transaction> {
11✔
61
    let key_manager = create_memory_db_key_manager().await.unwrap();
11✔
62
    let mut transactions = Vec::new();
11✔
63
    for _i in 0..n {
19✔
64
        let (transaction, _, _) = create_tx(5000 * uT, 3 * uT, 1, 2, 1, 3, Default::default(), &key_manager)
19✔
65
            .await
19✔
66
            .expect("Failed to get transaction");
19✔
67
        transactions.push(transaction);
19✔
68
    }
69
    transactions
11✔
70
}
11✔
71

72
async fn new_mempool_with_transactions(n: usize) -> (Mempool, Vec<Transaction>) {
10✔
73
    let mempool = Mempool::new(
10✔
74
        Default::default(),
10✔
75
        BaseNodeConsensusManager::builder(Network::LocalNet).build().unwrap(),
10✔
76
        Box::new(MockValidator::new(true)),
10✔
77
    );
78

79
    let transactions = create_transactions(n).await;
10✔
80
    for txn in &transactions {
27✔
81
        mempool.insert(Arc::new(txn.clone())).await.unwrap();
17✔
82
    }
83

84
    (mempool, transactions)
10✔
85
}
10✔
86

87
async fn setup(
6✔
88
    num_txns: usize,
6✔
89
) -> (
6✔
90
    ProtocolNotificationTx<MemorySocket>,
6✔
91
    ConnectivityManagerMockState,
6✔
92
    Mempool,
6✔
93
    Vec<Transaction>,
6✔
94
) {
6✔
95
    let (protocol_notif_tx, protocol_notif_rx) = mpsc::channel(1);
6✔
96
    let (mempool, transactions) = new_mempool_with_transactions(num_txns).await;
6✔
97
    let (connectivity, connectivity_manager_mock) = create_connectivity_mock();
6✔
98
    let connectivity_manager_mock_state = connectivity_manager_mock.spawn();
6✔
99
    let (block_event_sender, _) = broadcast::channel(1);
6✔
100
    let block_receiver = block_event_sender.subscribe();
6✔
101
    let protocol = MempoolSyncProtocol::new(
6✔
102
        Default::default(),
6✔
103
        protocol_notif_rx,
6✔
104
        mempool.clone(),
6✔
105
        connectivity,
6✔
106
        block_receiver,
6✔
107
    );
108

109
    task::spawn(protocol.run());
6✔
110
    connectivity_manager_mock_state.wait_until_event_receivers_ready().await;
6✔
111
    (
6✔
112
        protocol_notif_tx,
6✔
113
        connectivity_manager_mock_state,
6✔
114
        mempool,
6✔
115
        transactions,
6✔
116
    )
6✔
117
}
6✔
118

119
#[tokio::test]
120
async fn empty_set() {
1✔
121
    let (_, connectivity_manager_state, mempool1, _) = setup(0).await;
1✔
122

123
    let node1 = build_node_identity(PeerFeatures::COMMUNICATION_NODE);
1✔
124
    let node2 = build_node_identity(PeerFeatures::COMMUNICATION_NODE);
1✔
125
    let (_node1_conn, node1_mock, node2_conn, _) =
1✔
126
        create_peer_connection_mock_pair(node1.to_peer(), node2.to_peer()).await;
1✔
127

128
    // This node connected to a peer, so it should open the substream
129
    connectivity_manager_state.publish_event(ConnectivityEvent::PeerConnected(node2_conn.into()));
1✔
130

131
    let substream = node1_mock.next_incoming_substream().await.unwrap();
1✔
132
    let framed = framing::canonical(substream, MAX_FRAME_SIZE);
1✔
133

134
    let (mempool2, _) = new_mempool_with_transactions(0).await;
1✔
135
    MempoolPeerProtocol::new(Default::default(), framed, node2.node_id().clone(), mempool2.clone())
1✔
136
        .start_responder()
1✔
137
        .await
1✔
138
        .unwrap();
1✔
139

140
    let transactions = mempool2.snapshot().await.unwrap();
1✔
141
    assert_eq!(transactions.len(), 0);
1✔
142

143
    let transactions = mempool1.snapshot().await.unwrap();
1✔
144
    assert_eq!(transactions.len(), 0);
1✔
145
}
1✔
146

147
#[tokio::test]
148
async fn synchronise() {
1✔
149
    let (_, connectivity_manager_state, mempool1, transactions1) = setup(5).await;
1✔
150

151
    let node1 = build_node_identity(PeerFeatures::COMMUNICATION_NODE);
1✔
152
    let node2 = build_node_identity(PeerFeatures::COMMUNICATION_NODE);
1✔
153
    let (_node1_conn, node1_mock, node2_conn, _) =
1✔
154
        create_peer_connection_mock_pair(node1.to_peer(), node2.to_peer()).await;
1✔
155

156
    // This node connected to a peer, so it should open the substream
157
    connectivity_manager_state.publish_event(ConnectivityEvent::PeerConnected(node2_conn.into()));
1✔
158

159
    let substream = node1_mock.next_incoming_substream().await.unwrap();
1✔
160
    let framed = framing::canonical(substream, MAX_FRAME_SIZE);
1✔
161

162
    let (mempool2, transactions2) = new_mempool_with_transactions(3).await;
1✔
163
    MempoolPeerProtocol::new(Default::default(), framed, node2.node_id().clone(), mempool2.clone())
1✔
164
        .start_responder()
1✔
165
        .await
1✔
166
        .unwrap();
1✔
167

168
    let transactions = get_snapshot(&mempool2).await;
1✔
169
    assert_eq!(transactions.len(), 8);
1✔
170
    assert!(transactions1.iter().all(|txn| transactions.contains(txn)));
5✔
171
    assert!(transactions2.iter().all(|txn| transactions.contains(txn)));
3✔
172

173
    let transactions = get_snapshot(&mempool1).await;
1✔
174
    assert_eq!(transactions.len(), 8);
1✔
175
    assert!(transactions1.iter().all(|txn| transactions.contains(txn)));
5✔
176
    assert!(transactions2.iter().all(|txn| transactions.contains(txn)));
3✔
177
}
1✔
178

179
#[tokio::test]
180
async fn duplicate_set() {
1✔
181
    let (_, connectivity_manager_state, mempool1, transactions1) = setup(2).await;
1✔
182
    let node1 = build_node_identity(PeerFeatures::COMMUNICATION_NODE);
1✔
183
    let node2 = build_node_identity(PeerFeatures::COMMUNICATION_NODE);
1✔
184
    let (_node1_conn, node1_mock, node2_conn, _) =
1✔
185
        create_peer_connection_mock_pair(node1.to_peer(), node2.to_peer()).await;
1✔
186

187
    // This node connected to a peer, so it should open the substream
188
    connectivity_manager_state.publish_event(ConnectivityEvent::PeerConnected(node2_conn.into()));
1✔
189

190
    let substream = node1_mock.next_incoming_substream().await.unwrap();
1✔
191
    let framed = framing::canonical(substream, MAX_FRAME_SIZE);
1✔
192

193
    let (mempool2, transactions2) = new_mempool_with_transactions(1).await;
1✔
194
    mempool2.insert(Arc::new(transactions1[0].clone())).await.unwrap();
1✔
195
    MempoolPeerProtocol::new(Default::default(), framed, node2.node_id().clone(), mempool2.clone())
1✔
196
        .start_responder()
1✔
197
        .await
1✔
198
        .unwrap();
1✔
199

200
    let transactions = get_snapshot(&mempool2).await;
1✔
201
    assert_eq!(transactions.len(), 3);
1✔
202
    assert!(transactions1.iter().all(|txn| transactions.contains(txn)));
2✔
203
    assert!(transactions2.iter().all(|txn| transactions.contains(txn)));
1✔
204

205
    let transactions = get_snapshot(&mempool1).await;
1✔
206
    assert_eq!(transactions.len(), 3);
1✔
207
    assert!(transactions1.iter().all(|txn| transactions.contains(txn)));
2✔
208
    assert!(transactions2.iter().all(|txn| transactions.contains(txn)));
1✔
209
}
1✔
210

211
#[tokio::test]
212
async fn responder() {
1✔
213
    let (protocol_notif, _, _, transactions1) = setup(2).await;
1✔
214

215
    let node1 = build_node_identity(PeerFeatures::COMMUNICATION_NODE);
1✔
216
    let node2 = build_node_identity(PeerFeatures::COMMUNICATION_NODE);
1✔
217

218
    let (sock_in, sock_out) = MemorySocket::new_pair();
1✔
219
    protocol_notif
1✔
220
        .send(ProtocolNotification::new(
1✔
221
            MEMPOOL_SYNC_PROTOCOL.clone(),
1✔
222
            ProtocolEvent::NewInboundSubstream(node1.node_id().clone(), sock_in),
1✔
223
        ))
1✔
224
        .await
1✔
225
        .unwrap();
1✔
226

227
    let (mempool2, transactions2) = new_mempool_with_transactions(1).await;
1✔
228
    mempool2.insert(Arc::new(transactions1[0].clone())).await.unwrap();
1✔
229
    let framed = framing::canonical(sock_out, MAX_FRAME_SIZE);
1✔
230
    MempoolPeerProtocol::new(Default::default(), framed, node2.node_id().clone(), mempool2.clone())
1✔
231
        .start_initiator()
1✔
232
        .await
1✔
233
        .unwrap();
1✔
234

235
    let transactions = get_snapshot(&mempool2).await;
1✔
236
    assert_eq!(transactions.len(), 3);
×
237
    assert!(transactions1.iter().all(|txn| transactions.contains(txn)));
×
238
    assert!(transactions2.iter().all(|txn| transactions.contains(txn)));
×
239

240
    // We cannot be sure that the mempool1 contains all the transactions at this point because the initiator protocol
241
    // can complete before the responder has inserted the final transaction. There is currently no mechanism to know
242
    // this.
243
}
1✔
244

245
#[tokio::test]
246
async fn initiator_messages() {
1✔
247
    let (protocol_notif, _, _, transactions1) = setup(2).await;
1✔
248

249
    let node1 = build_node_identity(PeerFeatures::COMMUNICATION_NODE);
1✔
250

251
    let (sock_in, sock_out) = MemorySocket::new_pair();
1✔
252
    protocol_notif
1✔
253
        .send(ProtocolNotification::new(
1✔
254
            MEMPOOL_SYNC_PROTOCOL.clone(),
1✔
255
            ProtocolEvent::NewInboundSubstream(node1.node_id().clone(), sock_in),
1✔
256
        ))
1✔
257
        .await
1✔
258
        .unwrap();
1✔
259

260
    let mut transactions = create_transactions(2).await;
1✔
261
    transactions.push(transactions1[0].clone());
1✔
262
    let mut framed = framing::canonical(sock_out, MAX_FRAME_SIZE);
1✔
263
    // As the initiator, send an inventory
264
    let inventory = proto::TransactionInventory {
1✔
265
        items: transactions
1✔
266
            .iter()
1✔
267
            .map(|tx| tx.first_kernel_excess_sig().unwrap().get_signature().to_vec())
3✔
268
            .collect(),
1✔
269
    };
270
    write_message(&mut framed, inventory).await;
1✔
271
    // Expect 1 transaction, a "stop message" and indexes for missing transactions
272
    let transaction: proto::TransactionItem = read_message(&mut framed).await;
×
273
    assert!(transaction.transaction.is_some());
×
274
    let stop: proto::TransactionItem = read_message(&mut framed).await;
×
275
    assert!(stop.transaction.is_none());
×
276
    let indexes: proto::InventoryIndexes = read_message(&mut framed).await;
×
277
    assert_eq!(indexes.indexes, [0, 1]);
×
278
}
1✔
279

280
#[tokio::test]
281
async fn responder_messages() {
1✔
282
    let (_, connectivity_manager_state, _, transactions1) = setup(1).await;
1✔
283

284
    let node1 = build_node_identity(PeerFeatures::COMMUNICATION_NODE);
1✔
285
    let node2 = build_node_identity(PeerFeatures::COMMUNICATION_NODE);
1✔
286
    let (_node1_conn, node1_mock, node2_conn, _) =
1✔
287
        create_peer_connection_mock_pair(node1.to_peer(), node2.to_peer()).await;
1✔
288

289
    // This node connected to a peer, so it should open the substream
290
    connectivity_manager_state.publish_event(ConnectivityEvent::PeerConnected(node2_conn.into()));
1✔
291

292
    let substream = node1_mock.next_incoming_substream().await.unwrap();
1✔
293
    let mut framed = framing::canonical(substream, MAX_FRAME_SIZE);
1✔
294

295
    // Expect an inventory
296
    let inventory: proto::TransactionInventory = read_message(&mut framed).await;
1✔
297
    assert_eq!(inventory.items.len(), 1);
1✔
298
    // Send no transactions back
299
    let nothing = proto::TransactionItem::empty();
1✔
300
    write_message(&mut framed, nothing).await;
1✔
301
    // Send transaction indexes back
302
    let indexes = proto::InventoryIndexes { indexes: vec![0] };
1✔
303
    write_message(&mut framed, indexes).await;
1✔
304
    // Expect a single transaction back and a stop message
305
    let transaction: proto::TransactionItem = read_message(&mut framed).await;
1✔
306
    assert_eq!(
1✔
307
        transaction
1✔
308
            .transaction
1✔
309
            .unwrap()
1✔
310
            .body
1✔
311
            .unwrap()
1✔
312
            .kernels
1✔
313
            .remove(0)
1✔
314
            .excess_sig
1✔
315
            .unwrap()
1✔
316
            .signature,
317
        transactions1[0]
1✔
318
            .first_kernel_excess_sig()
1✔
319
            .unwrap()
1✔
320
            .get_signature()
1✔
321
            .to_vec()
1✔
322
    );
323
    let stop: proto::TransactionItem = read_message(&mut framed).await;
1✔
324
    assert!(stop.transaction.is_none());
1✔
325
    // Except stream to end
326
    assert!(framed.next().await.is_none());
1✔
327
}
1✔
328

329
async fn get_snapshot(mempool: &Mempool) -> Vec<Transaction> {
4✔
330
    mempool
4✔
331
        .snapshot()
4✔
332
        .await
4✔
333
        .unwrap()
4✔
334
        .iter()
4✔
335
        .map(|t| &**t)
22✔
336
        .cloned()
4✔
337
        .collect()
4✔
338
}
4✔
339

340
async fn read_message<S, T>(reader: &mut S) -> T
3✔
341
where
3✔
342
    S: Stream<Item = io::Result<BytesMut>> + Unpin,
3✔
343
    T: prost::Message + Default,
3✔
344
{
3✔
345
    let msg = reader.next().await.unwrap().unwrap();
3✔
346
    T::decode(&mut msg.freeze()).unwrap()
3✔
347
}
3✔
348

349
async fn write_message<S, T>(writer: &mut S, message: T)
3✔
350
where
3✔
351
    S: Sink<Bytes> + Unpin,
3✔
352
    S::Error: fmt::Debug,
3✔
353
    T: prost::Message,
3✔
354
{
3✔
355
    writer.send(message.to_encoded_bytes().into()).await.unwrap();
3✔
356
}
3✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc