• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tari-project / tari / 18097567115

29 Sep 2025 12:50PM UTC coverage: 58.554% (-2.3%) from 60.88%
18097567115

push

github

web-flow
chore(ci): switch rust toolchain to stable (#7524)

Description
switch rust toolchain to stable

Motivation and Context
use stable rust toolchain


<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->

## Summary by CodeRabbit

* **Chores**
* Standardized Rust toolchain on stable across CI workflows for more
predictable builds.
* Streamlined setup by removing unnecessary components and aligning
toolchain configuration with environment variables.
  * Enabled an environment flag to improve rustup behavior during CI.
* Improved coverage workflow consistency with dynamic toolchain
selection.

* **Tests**
* Removed nightly-only requirements, simplifying test commands and
improving compatibility.
* Expanded CI triggers to include ci-* branches for better pre-merge
validation.
* Maintained existing job logic while improving reliability and
maintainability.

<!-- end of auto-generated comment: release notes by coderabbit.ai -->

66336 of 113291 relevant lines covered (58.55%)

551641.45 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.73
/comms/core/src/builder/comms_node.rs
1
// Copyright 2020, The Tari Project
2
//
3
// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
4
// following conditions are met:
5
//
6
// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
7
// disclaimer.
8
//
9
// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
10
// following disclaimer in the documentation and/or other materials provided with the distribution.
11
//
12
// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
13
// products derived from this software without specific prior written permission.
14
//
15
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
16
// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
18
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
19
// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
20
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
21
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
22

23
use std::{iter, sync::Arc};
24

25
use log::*;
26
use tari_shutdown::ShutdownSignal;
27
use tokio::{
28
    io::{AsyncRead, AsyncWrite},
29
    sync::{broadcast, mpsc, watch},
30
};
31

32
use super::{CommsBuilderError, CommsShutdown};
33
use crate::{
34
    connection_manager::{
35
        ConnectionManager,
36
        ConnectionManagerEvent,
37
        ConnectionManagerRequest,
38
        ConnectionManagerRequester,
39
        SelfLivenessCheck,
40
        SelfLivenessStatus,
41
    },
42
    connectivity::{ConnectivityEventRx, ConnectivityManager, ConnectivityRequest, ConnectivityRequester},
43
    multiaddr::Multiaddr,
44
    peer_manager::{NodeIdentity, PeerManager},
45
    protocol::{
46
        ProtocolExtension,
47
        ProtocolExtensionContext,
48
        ProtocolExtensions,
49
        ProtocolId,
50
        ProtocolNotificationTx,
51
        Protocols,
52
    },
53
    tor,
54
    transports::Transport,
55
    CommsBuilder,
56
    Substream,
57
};
58

59
const LOG_TARGET: &str = "comms::node";
60

61
/// Contains the built comms services
62
pub struct UnspawnedCommsNode {
63
    pub(super) node_identity: Arc<NodeIdentity>,
64
    pub(super) builder: CommsBuilder,
65
    pub(super) connection_manager_request_rx: mpsc::Receiver<ConnectionManagerRequest>,
66
    pub(super) connection_manager_requester: ConnectionManagerRequester,
67
    pub(super) connectivity_requester: ConnectivityRequester,
68
    pub(super) connectivity_rx: mpsc::Receiver<ConnectivityRequest>,
69
    pub(super) peer_manager: Arc<PeerManager>,
70
    pub(super) protocol_extensions: ProtocolExtensions,
71
    pub(super) protocols: Protocols<Substream>,
72
    pub(super) shutdown_signal: ShutdownSignal,
73
}
74

75
impl UnspawnedCommsNode {
76
    /// Add an RPC server/router in this instance of Tari comms.
77
    ///
78
    /// ```compile_fail
79
    /// # use tari_comms::CommsBuilder;
80
    /// # use tari_comms::protocol::rpc::RpcServer;
81
    /// let server = RpcServer::new().add_service(MyService).add_service(AnotherService);
82
    /// CommsBuilder::new().add_rpc_service(server).build();
83
    /// ```
84
    #[cfg(feature = "rpc")]
85
    pub fn add_rpc_server<T: ProtocolExtension + 'static>(mut self, rpc: T) -> Self {
25✔
86
        // Rpc router is treated the same as any other `ProtocolExtension` however this method may make it clearer for
87
        // users that this is the correct way to add the RPC server
88
        self.protocol_extensions.add(rpc);
25✔
89
        self
25✔
90
    }
25✔
91

92
    /// Adds [ProtocolExtensions](crate::protocol::ProtocolExtensions) to this node.
93
    pub fn add_protocol_extensions(mut self, extensions: ProtocolExtensions) -> Self {
6✔
94
        self.protocol_extensions.extend(extensions);
6✔
95
        self
6✔
96
    }
6✔
97

98
    /// Adds an implementation of [ProtocolExtension](crate::protocol::ProtocolExtension) to this node.
99
    /// This is used to add custom protocols to Tari comms.
100
    pub fn add_protocol_extension<T: ProtocolExtension + 'static>(mut self, extension: T) -> Self {
38✔
101
        self.protocol_extensions.add(extension);
38✔
102
        self
38✔
103
    }
38✔
104

105
    /// Registers custom ProtocolIds and mpsc notifier. A [ProtocolNotification](crate::protocol::ProtocolNotification)
106
    /// will be sent on that channel whenever a remote peer requests to speak the given protocols.
107
    pub fn add_protocol<I: AsRef<[ProtocolId]>>(
2✔
108
        mut self,
2✔
109
        protocol: I,
2✔
110
        notifier: &ProtocolNotificationTx<Substream>,
2✔
111
    ) -> Self {
2✔
112
        self.protocols.add(protocol, notifier);
2✔
113
        self
2✔
114
    }
2✔
115

116
    /// Set the listener address. This is an alias to `CommsBuilder::with_listener_address`.
117
    pub fn with_listener_address(mut self, listener_address: Multiaddr) -> Self {
×
118
        self.builder = self.builder.with_listener_address(listener_address);
×
119
        self
×
120
    }
×
121

122
    /// Set the tor hidden service controller to associate with this comms instance
123
    pub fn with_hidden_service_controller(mut self, hidden_service_ctl: tor::HiddenServiceController) -> Self {
×
124
        self.builder.hidden_service_ctl = Some(hidden_service_ctl);
×
125
        self
×
126
    }
×
127

128
    /// Spawn a new node using the specified [Transport](crate::transports::Transport).
129
    #[allow(clippy::too_many_lines)]
130
    pub async fn spawn_with_transport<TTransport>(self, transport: TTransport) -> Result<CommsNode, CommsBuilderError>
66✔
131
    where
66✔
132
        TTransport: Transport + Unpin + Send + Sync + Clone + 'static,
66✔
133
        TTransport::Output: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
66✔
134
    {
66✔
135
        let UnspawnedCommsNode {
136
            builder,
66✔
137
            connection_manager_request_rx,
66✔
138
            connection_manager_requester,
66✔
139
            connectivity_requester,
66✔
140
            connectivity_rx,
66✔
141
            node_identity,
66✔
142
            shutdown_signal,
66✔
143
            peer_manager,
66✔
144
            protocol_extensions,
66✔
145
            protocols,
66✔
146
        } = self;
66✔
147

148
        let CommsBuilder {
149
            dial_backoff,
66✔
150
            connection_manager_config,
66✔
151
            connectivity_config,
66✔
152
            ..
153
        } = builder;
66✔
154

155
        //---------------------------------- Connectivity Manager --------------------------------------------//
156
        let connectivity_manager = ConnectivityManager {
66✔
157
            config: connectivity_config,
66✔
158
            request_rx: connectivity_rx,
66✔
159
            event_tx: connectivity_requester.get_event_publisher(),
66✔
160
            connection_manager: connection_manager_requester.clone(),
66✔
161
            node_identity: node_identity.clone(),
66✔
162
            peer_manager: peer_manager.clone(),
66✔
163
            shutdown_signal: shutdown_signal.clone(),
66✔
164
        };
66✔
165

166
        let mut ext_context = ProtocolExtensionContext::new(
66✔
167
            connectivity_requester.clone(),
66✔
168
            peer_manager.clone(),
66✔
169
            shutdown_signal.clone(),
66✔
170
        );
171

172
        debug!(
66✔
173
            target: LOG_TARGET,
×
174
            "Installing {} protocol extension(s)",
×
175
            protocol_extensions.len()
×
176
        );
177
        protocol_extensions.install_all(&mut ext_context)?;
66✔
178

179
        //---------------------------------- Connection Manager --------------------------------------------//
180

181
        let mut connection_manager = ConnectionManager::new(
66✔
182
            connection_manager_config.clone(),
66✔
183
            transport.clone(),
66✔
184
            dial_backoff,
66✔
185
            connection_manager_request_rx,
66✔
186
            node_identity.clone(),
66✔
187
            peer_manager.clone(),
66✔
188
            connection_manager_requester.get_event_publisher(),
66✔
189
            shutdown_signal.clone(),
66✔
190
        );
191

192
        ext_context.register_complete_signal(connection_manager.complete_signal());
66✔
193
        connection_manager.add_protocols(ext_context.take_protocols().expect("Protocols already taken"));
66✔
194
        connection_manager.add_protocols(protocols);
66✔
195

196
        //---------------------------------- Spawn Actors --------------------------------------------//
197
        connectivity_manager.spawn();
66✔
198
        connection_manager.spawn();
66✔
199

200
        trace!(target: LOG_TARGET, "Hello from comms!");
66✔
201
        info!(
66✔
202
            target: LOG_TARGET,
×
203
            "Your node's public key is '{}'",
×
204
            node_identity.public_key()
×
205
        );
206
        info!(
66✔
207
            target: LOG_TARGET,
×
208
            "Your node's network ID is '{}'",
×
209
            node_identity.node_id()
×
210
        );
211
        info!(
66✔
212
            target: LOG_TARGET,
×
213
            "Your node's public addresses are '{}'",
×
214
            node_identity
×
215
                .public_addresses()
×
216
                .iter()
×
217
                .map(|a| a.to_string())
×
218
                .collect::<Vec<_>>()
×
219
                .join(", ")
×
220
        );
221

222
        // Spawn liveness check now that we have the final address
223
        let public_addresses = node_identity.public_addresses();
66✔
224
        let liveness_watch = if public_addresses.is_empty() {
66✔
225
            watch::channel(SelfLivenessStatus::Disabled).1
×
226
        } else {
227
            connection_manager_config
66✔
228
                .self_liveness_self_check_interval
66✔
229
                .map(|interval| {
66✔
230
                    SelfLivenessCheck::spawn(transport, public_addresses, interval, shutdown_signal.clone())
×
231
                })
×
232
                .unwrap_or_else(|| watch::channel(SelfLivenessStatus::Disabled).1)
66✔
233
        };
234

235
        Ok(CommsNode {
66✔
236
            shutdown_signal,
66✔
237
            connection_manager_requester,
66✔
238
            connectivity_requester,
66✔
239
            node_identity,
66✔
240
            peer_manager,
66✔
241
            liveness_watch,
66✔
242
            complete_signals: ext_context.drain_complete_signals(),
66✔
243
        })
66✔
244
    }
66✔
245

246
    /// Return a cloned atomic reference of the PeerManager
247
    pub fn peer_manager(&self) -> Arc<PeerManager> {
58✔
248
        Arc::clone(&self.peer_manager)
58✔
249
    }
58✔
250

251
    /// Return a cloned atomic reference of the NodeIdentity
252
    pub fn node_identity(&self) -> Arc<NodeIdentity> {
34✔
253
        Arc::clone(&self.node_identity)
34✔
254
    }
34✔
255

256
    /// Return an owned copy of a ConnectivityRequester. This is the async interface to the ConnectivityManager
257
    pub fn connectivity(&self) -> ConnectivityRequester {
32✔
258
        self.connectivity_requester.clone()
32✔
259
    }
32✔
260

261
    /// Returns an owned copy`ShutdownSignal`
262
    pub fn shutdown_signal(&self) -> ShutdownSignal {
32✔
263
        self.shutdown_signal.clone()
32✔
264
    }
32✔
265
}
266

267
/// CommsNode is a handle to a comms node.
268
///
269
/// It allows communication with the internals of tari_comms.
270
#[derive(Clone)]
271
pub struct CommsNode {
272
    /// The `ShutdownSignal` for this node. Use `wait_until_shutdown` to asynchronously block until the
273
    /// shutdown signal is triggered.
274
    shutdown_signal: ShutdownSignal,
275
    /// Requester object for the ConnectionManager
276
    connection_manager_requester: ConnectionManagerRequester,
277
    /// Requester for the ConnectivityManager
278
    connectivity_requester: ConnectivityRequester,
279
    /// Node identity for this node
280
    node_identity: Arc<NodeIdentity>,
281
    /// Shared PeerManager instance
282
    peer_manager: Arc<PeerManager>,
283
    /// Current liveness status
284
    liveness_watch: watch::Receiver<SelfLivenessStatus>,
285
    /// The 'reciprocal' shutdown signals for each comms service
286
    complete_signals: Vec<ShutdownSignal>,
287
}
288

289
impl CommsNode {
290
    /// Get a subscription to `ConnectionManagerEvent`s
291
    pub fn subscribe_connection_manager_events(&self) -> broadcast::Receiver<Arc<ConnectionManagerEvent>> {
2✔
292
        self.connection_manager_requester.get_event_subscription()
2✔
293
    }
2✔
294

295
    pub fn connection_manager_requester(&mut self) -> &mut ConnectionManagerRequester {
34✔
296
        &mut self.connection_manager_requester
34✔
297
    }
34✔
298

299
    /// Get a subscription to `ConnectivityEvent`s
300
    pub fn subscribe_connectivity_events(&self) -> ConnectivityEventRx {
×
301
        self.connectivity_requester.get_event_subscription()
×
302
    }
×
303

304
    /// Return a cloned atomic reference of the PeerManager
305
    pub fn peer_manager(&self) -> Arc<PeerManager> {
41✔
306
        Arc::clone(&self.peer_manager)
41✔
307
    }
41✔
308

309
    /// Return a cloned atomic reference of the NodeIdentity
310
    pub fn node_identity(&self) -> Arc<NodeIdentity> {
173✔
311
        Arc::clone(&self.node_identity)
173✔
312
    }
173✔
313

314
    /// Return a reference to the NodeIdentity
315
    pub fn node_identity_ref(&self) -> &NodeIdentity {
×
316
        &self.node_identity
×
317
    }
×
318

319
    /// Returns the current liveness status
320
    pub fn liveness_status(&self) -> SelfLivenessStatus {
×
321
        *self.liveness_watch.borrow()
×
322
    }
×
323

324
    /// Return a handle that is used to call the connectivity service.
325
    pub fn connectivity(&self) -> ConnectivityRequester {
47✔
326
        self.connectivity_requester.clone()
47✔
327
    }
47✔
328

329
    /// Returns a new `ShutdownSignal`
330
    pub fn shutdown_signal(&self) -> ShutdownSignal {
2✔
331
        self.shutdown_signal.clone()
2✔
332
    }
2✔
333

334
    /// Wait for comms to shutdown once the shutdown signal is triggered and for comms services to shut down.
335
    /// The object is consumed to ensure that no handles/channels are kept after shutdown
336
    pub fn wait_until_shutdown(self) -> CommsShutdown {
28✔
337
        CommsShutdown::new(iter::once(self.shutdown_signal).chain(self.complete_signals))
28✔
338
    }
28✔
339
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc