• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tari-project / tari / 18097567115

29 Sep 2025 12:50PM UTC coverage: 58.554% (-2.3%) from 60.88%
18097567115

push

github

web-flow
chore(ci): switch rust toolchain to stable (#7524)

Description
switch rust toolchain to stable

Motivation and Context
use stable rust toolchain


<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->

## Summary by CodeRabbit

* **Chores**
* Standardized Rust toolchain on stable across CI workflows for more
predictable builds.
* Streamlined setup by removing unnecessary components and aligning
toolchain configuration with environment variables.
  * Enabled an environment flag to improve rustup behavior during CI.
* Improved coverage workflow consistency with dynamic toolchain
selection.

* **Tests**
* Removed nightly-only requirements, simplifying test commands and
improving compatibility.
* Expanded CI triggers to include ci-* branches for better pre-merge
validation.
* Maintained existing job logic while improving reliability and
maintainability.

<!-- end of auto-generated comment: release notes by coderabbit.ai -->

66336 of 113291 relevant lines covered (58.55%)

551641.45 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.4
/comms/dht/src/inbound/decryption.rs
1
// Copyright 2019, The Tari Project
2
//
3
// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
4
// following conditions are met:
5
//
6
// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
7
// disclaimer.
8
//
9
// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
10
// following disclaimer in the documentation and/or other materials provided with the distribution.
11
//
12
// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
13
// products derived from this software without specific prior written permission.
14
//
15
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
16
// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
18
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
19
// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
20
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
21
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
22

23
use std::{convert::TryInto, sync::Arc, task::Poll, time::Duration};
24

25
use futures::{future::BoxFuture, task::Context};
26
use log::*;
27
use prost::Message;
28
use tari_comms::{
29
    connectivity::ConnectivityRequester,
30
    message::EnvelopeBody,
31
    peer_manager::NodeIdentity,
32
    pipeline::PipelineError,
33
    types::CommsDHKE,
34
    BytesMut,
35
};
36
use tari_crypto::compressed_key::CompressedKey;
37
use tari_utilities::ByteArray;
38
use thiserror::Error;
39
use tower::{layer::Layer, Service, ServiceExt};
40

41
use crate::{
42
    crypt,
43
    inbound::message::{DecryptedDhtMessage, DhtInboundMessage, ValidatedDhtInboundMessage},
44
    message_signature::{MessageSignature, ProtoMessageSignature},
45
    DhtConfig,
46
};
47

48
const LOG_TARGET: &str = "comms::middleware::decryption";
49

50
#[derive(Error, Debug, PartialEq)]
51
enum DecryptionError {
52
    #[error("Failed to validate message signature")]
53
    InvalidSignature,
54
    #[error("Bad encrypted message semantics")]
55
    BadEncryptedMessageSemantics,
56
    #[error("Message rejected because this node could not decrypt a message that was addressed to it")]
57
    MessageRejectDecryptionFailed,
58
    #[error("Failed to decode envelope body")]
59
    EnvelopeBodyDecodeFailed,
60
    #[error("Bad clear-text message semantics")]
61
    BadClearTextMessageSemantics,
62
}
63

64
/// This layer is responsible for attempting to decrypt inbound messages.
65
pub struct DecryptionLayer {
66
    node_identity: Arc<NodeIdentity>,
67
    connectivity: ConnectivityRequester,
68
    config: Arc<DhtConfig>,
69
}
70

71
impl DecryptionLayer {
72
    pub fn new(config: Arc<DhtConfig>, node_identity: Arc<NodeIdentity>, connectivity: ConnectivityRequester) -> Self {
35✔
73
        Self {
35✔
74
            node_identity,
35✔
75
            connectivity,
35✔
76
            config,
35✔
77
        }
35✔
78
    }
35✔
79
}
80

81
impl<S> Layer<S> for DecryptionLayer {
82
    type Service = DecryptionService<S>;
83

84
    fn layer(&self, service: S) -> Self::Service {
35✔
85
        DecryptionService::new(
35✔
86
            self.config.clone(),
35✔
87
            self.node_identity.clone(),
35✔
88
            self.connectivity.clone(),
35✔
89
            service,
35✔
90
        )
91
    }
35✔
92
}
93

94
/// Responsible for decrypting InboundMessages and passing a DecryptedInboundMessage to the given service
95
#[derive(Clone)]
96
pub struct DecryptionService<S> {
97
    config: Arc<DhtConfig>,
98
    node_identity: Arc<NodeIdentity>,
99
    connectivity: ConnectivityRequester,
100
    inner: S,
101
}
102

103
impl<S> DecryptionService<S> {
104
    pub fn new(
58✔
105
        config: Arc<DhtConfig>,
58✔
106
        node_identity: Arc<NodeIdentity>,
58✔
107
        connectivity: ConnectivityRequester,
58✔
108
        service: S,
58✔
109
    ) -> Self {
58✔
110
        Self {
58✔
111
            node_identity,
58✔
112
            connectivity,
58✔
113
            config,
58✔
114
            inner: service,
58✔
115
        }
58✔
116
    }
58✔
117
}
118

119
impl<S> Service<DhtInboundMessage> for DecryptionService<S>
120
where
121
    S: Service<DecryptedDhtMessage, Response = (), Error = PipelineError> + Clone + Send + 'static,
122
    S::Future: Send,
123
{
124
    type Error = PipelineError;
125
    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
126
    type Response = ();
127

128
    fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
159✔
129
        Poll::Ready(Ok(()))
159✔
130
    }
159✔
131

132
    fn call(&mut self, msg: DhtInboundMessage) -> Self::Future {
101✔
133
        Box::pin(Self::handle_message(
101✔
134
            self.inner.clone(),
101✔
135
            Arc::clone(&self.node_identity),
101✔
136
            self.connectivity.clone(),
101✔
137
            self.config.ban_duration,
101✔
138
            msg,
101✔
139
        ))
101✔
140
    }
101✔
141
}
142

143
impl<S> DecryptionService<S>
144
where S: Service<DecryptedDhtMessage, Response = (), Error = PipelineError>
145
{
146
    async fn handle_message(
101✔
147
        next_service: S,
101✔
148
        node_identity: Arc<NodeIdentity>,
101✔
149
        mut connectivity: ConnectivityRequester,
101✔
150
        ban_duration: Duration,
101✔
151
        message: DhtInboundMessage,
101✔
152
    ) -> Result<(), PipelineError> {
101✔
153
        use DecryptionError::*;
154
        let source = message.source_peer.clone();
101✔
155
        let trace_id = message.dht_header.message_tag;
101✔
156
        let tag = message.tag;
101✔
157
        match Self::validate_and_decrypt_message(node_identity, message).await {
101✔
158
            Ok(msg) => {
78✔
159
                trace!(target: LOG_TARGET, "Passing onto next service (Trace: {})", msg.tag);
78✔
160
                next_service.oneshot(msg).await
78✔
161
            },
162
            // These are verifiable error cases that can be checked by every node
163
            Err(err @ BadEncryptedMessageSemantics) | Err(err @ InvalidSignature) => {
15✔
164
                warn!(
22✔
165
                    target: LOG_TARGET,
×
166
                    "SECURITY: {} ({}, peer={}, trace={}). Message discarded", err, tag, source.node_id, trace_id
×
167
                );
168
                // This message should not have been propagated, or has been manipulated in some way. Ban the source of
169
                // this message.
170
                connectivity
22✔
171
                    .ban_peer_until(source.node_id.clone(), ban_duration, err.to_string())
22✔
172
                    .await?;
22✔
173
                Err(err.into())
22✔
174
            },
175
            Err(EnvelopeBodyDecodeFailed) => {
176
                debug!(
×
177
                    target: LOG_TARGET,
×
178
                    "Failed to decode message body ({}, peer={}, trace={}). Message discarded",
×
179
                    tag,
180
                    source.node_id,
×
181
                    trace_id
182
                );
183
                Ok(())
×
184
            },
185
            Err(err) => Err(err.into()),
1✔
186
        }
187
    }
101✔
188

189
    #[allow(clippy::too_many_lines)]
190
    async fn validate_and_decrypt_message(
101✔
191
        node_identity: Arc<NodeIdentity>,
101✔
192
        message: DhtInboundMessage,
101✔
193
    ) -> Result<DecryptedDhtMessage, DecryptionError> {
101✔
194
        // Perform initial checks and check the message signature if needed
195
        let validated_msg = Self::initial_validation(message)?;
101✔
196

197
        // The message is unencrypted and valid
198
        if !validated_msg.header().flags.is_encrypted() {
79✔
199
            return Self::success_not_encrypted(validated_msg).await;
64✔
200
        }
15✔
201

202
        trace!(
15✔
203
            target: LOG_TARGET,
×
204
            "Decrypting message {} (Trace: {})",
×
205
            validated_msg.message().tag,
×
206
            validated_msg.message().dht_header.message_tag
×
207
        );
208

209
        // The message is encrypted, so see if it is for us
210
        // If not, pass it along
211
        if validated_msg
15✔
212
            .message()
15✔
213
            .dht_header
15✔
214
            .destination
15✔
215
            .public_key()
15✔
216
            .map(|pk| pk != node_identity.public_key())
15✔
217
            .unwrap_or(false)
15✔
218
        {
219
            debug!(
9✔
220
                target: LOG_TARGET,
×
221
                "Encrypted message (source={}, {}) not destined for this peer. Passing to next service (Trace: {})",
×
222
                validated_msg.message().source_peer.node_id,
×
223
                validated_msg.message().dht_header.message_tag,
×
224
                validated_msg.message().tag
×
225
            );
226
            return Ok(DecryptedDhtMessage::failed(validated_msg.into_message()));
9✔
227
        }
6✔
228

229
        // The message is encrypted and for us, so complete the ephemeral key exchange
230
        let header = validated_msg.header();
6✔
231
        let ephemeral_public_key = header
6✔
232
            .ephemeral_public_key
6✔
233
            .as_ref()
6✔
234
            .ok_or(DecryptionError::BadEncryptedMessageSemantics)?
6✔
235
            .to_public_key()
6✔
236
            .map_err(|_| DecryptionError::InvalidSignature)?;
6✔
237
        let shared_ephemeral_secret = CommsDHKE::new(node_identity.secret_key(), &ephemeral_public_key);
6✔
238
        let message = validated_msg.message();
6✔
239

240
        // Unmask the sender public key using an offset mask derived from the ECDH exchange
241
        let mask = crypt::generate_key_mask(&shared_ephemeral_secret)
6✔
242
            .map_err(|_| DecryptionError::MessageRejectDecryptionFailed)?;
6✔
243
        let mask_inverse = mask.invert().ok_or(DecryptionError::MessageRejectDecryptionFailed)?;
6✔
244
        let sender_masked_public_key = validated_msg
6✔
245
            .authenticated_origin()
6✔
246
            .ok_or(DecryptionError::MessageRejectDecryptionFailed)?
6✔
247
            .to_public_key()
6✔
248
            .map_err(|_| DecryptionError::InvalidSignature)?;
6✔
249
        let sender_public_key = mask_inverse * &sender_masked_public_key;
6✔
250

251
        trace!(
6✔
252
            target: LOG_TARGET,
×
253
            "Attempting to decrypt message body from origin public key '{}', {} (Trace: {})",
×
254
            sender_public_key,
255
            message.tag,
256
            message.dht_header.message_tag
257
        );
258

259
        // Decrypt and verify the message
260
        match Self::attempt_decrypt_message_body(
6✔
261
            &shared_ephemeral_secret,
6✔
262
            &message.body,
6✔
263
            sender_masked_public_key.as_bytes(),
6✔
264
        ) {
6✔
265
            Ok(message_body) => {
5✔
266
                debug!(
5✔
267
                    target: LOG_TARGET,
×
268
                    "Message successfully decrypted, {} (Trace: {})", message.tag, message.dht_header.message_tag
×
269
                );
270
                Ok(DecryptedDhtMessage::succeeded(
5✔
271
                    message_body,
5✔
272
                    Some(CompressedKey::new_from_pk(sender_public_key)),
5✔
273
                    validated_msg.into_message(),
5✔
274
                ))
5✔
275
            },
276
            Err(err) => {
1✔
277
                debug!(
1✔
278
                    target: LOG_TARGET,
×
279
                    "Unable to decrypt message: {}, {} (Trace: {})", err, message.tag, message.dht_header.message_tag
×
280
                );
281

282
                if message.dht_header.destination.equals_node_identity(&node_identity) {
1✔
283
                    warn!(
1✔
284
                        target: LOG_TARGET,
×
285
                        "Received message from peer '{}' that is destined for this node that could not be decrypted. \
×
286
                         Discarding message {} (Trace: {})",
×
287
                        message.source_peer.node_id,
×
288
                        message.tag,
289
                        message.dht_header.message_tag
290
                    );
291
                    return Err(DecryptionError::MessageRejectDecryptionFailed);
1✔
292
                }
×
293

294
                Ok(DecryptedDhtMessage::failed(validated_msg.into_message()))
×
295
            },
296
        }
297
    }
101✔
298

299
    /// Performs message validation that should be performed by all nodes. If an error is encountered, the message is
300
    /// invalid and should never have been propagated.
301
    ///
302
    /// These failure modes are detectable by any node, so it is generally safe to ban an offending peer.
303
    fn initial_validation(message: DhtInboundMessage) -> Result<ValidatedDhtInboundMessage, DecryptionError> {
101✔
304
        if !message.is_semantically_valid() {
101✔
305
            if message.dht_header.flags.is_encrypted() {
7✔
306
                return Err(DecryptionError::BadEncryptedMessageSemantics);
7✔
307
            } else {
308
                return Err(DecryptionError::BadClearTextMessageSemantics);
×
309
            }
310
        }
94✔
311

312
        // If a signature is not present, the message is valid at this point
313
        if message.dht_header.message_signature.is_empty() {
94✔
314
            return Ok(ValidatedDhtInboundMessage::new(message, None));
55✔
315
        }
39✔
316

317
        // If a signature is present, it must be valid
318
        let message_signature: MessageSignature =
27✔
319
            ProtoMessageSignature::decode(message.dht_header.message_signature.as_slice())
39✔
320
                .map_err(|_| DecryptionError::InvalidSignature)?
39✔
321
                .try_into()
35✔
322
                .map_err(|_| DecryptionError::InvalidSignature)?;
35✔
323

324
        let binding_hash = crypt::create_message_domain_separated_hash(&message.dht_header, &message.body);
27✔
325

326
        match message_signature.verify(&binding_hash) {
27✔
327
            Ok(true) => {},
24✔
328
            _ => return Err(DecryptionError::InvalidSignature),
3✔
329
        }
330

331
        // The message is valid at this point
332
        Ok(ValidatedDhtInboundMessage::new(
24✔
333
            message,
24✔
334
            Some(message_signature.into_signer_public_key()),
24✔
335
        ))
24✔
336
    }
101✔
337

338
    fn attempt_decrypt_message_body(
6✔
339
        shared_secret: &CommsDHKE,
6✔
340
        message_body: &[u8],
6✔
341
        authenticated_data: &[u8],
6✔
342
    ) -> Result<EnvelopeBody, DecryptionError> {
6✔
343
        let key_message = crypt::generate_key_message(shared_secret);
6✔
344
        let mut decrypted = BytesMut::from(message_body);
6✔
345
        crypt::decrypt_message(&key_message, &mut decrypted, authenticated_data)
6✔
346
            .map_err(|_| DecryptionError::MessageRejectDecryptionFailed)?;
6✔
347
        // Deserialization into an EnvelopeBody is done here to determine if the
348
        // decryption produced valid bytes or not.
349
        EnvelopeBody::decode(decrypted.freeze())
5✔
350
            .and_then(|body| {
5✔
351
                // Check if we received a body length of zero
352
                //
353
                // In addition to a peer sending a zero-length EnvelopeBody, decoding can erroneously succeed
354
                // if the decrypted bytes happen to be valid protobuf encoding. This is very possible and
355
                // the decrypt_inbound_fail test below _will_ sporadically fail without the following check.
356
                // This is because proto3 will set fields to their default value if they don't exist in a valid
357
                // encoding.
358
                //
359
                // For the parts of EnvelopeBody to be erroneously populated with bytes, all of these
360
                // conditions would have to be true:
361
                // 1. field type == 2 (length-delimited)
362
                // 2. field number == 1
363
                // 3. the subsequent byte(s) would have to be varint-encoded length which does not overflow
364
                // 4. the rest of the bytes would have to be valid protobuf encoding
365
                //
366
                // The chance of this happening is extremely negligible.
367
                if body.is_empty() {
5✔
368
                    return Err(prost::DecodeError::new("EnvelopeBody has no parts"));
×
369
                }
5✔
370
                Ok(body)
5✔
371
            })
5✔
372
            .map_err(|_| DecryptionError::EnvelopeBodyDecodeFailed)
5✔
373
    }
6✔
374

375
    async fn success_not_encrypted(
64✔
376
        validated: ValidatedDhtInboundMessage,
64✔
377
    ) -> Result<DecryptedDhtMessage, DecryptionError> {
64✔
378
        let authenticated_pk = validated.authenticated_origin().cloned();
64✔
379
        let msg = validated.message();
64✔
380
        match EnvelopeBody::decode(msg.body.as_slice()) {
64✔
381
            Ok(deserialized) => {
64✔
382
                trace!(
64✔
383
                    target: LOG_TARGET,
×
384
                    "Message {} is not encrypted. Passing onto next service (Trace: {})",
×
385
                    msg.tag,
386
                    msg.dht_header.message_tag
387
                );
388
                Ok(DecryptedDhtMessage::succeeded(
64✔
389
                    deserialized,
64✔
390
                    authenticated_pk,
64✔
391
                    validated.into_message(),
64✔
392
                ))
64✔
393
            },
394
            Err(err) => {
×
395
                // Message was not encrypted but failed to deserialize - immediately discard
396
                debug!(
×
397
                    target: LOG_TARGET,
×
398
                    "Unable to deserialize message {}: {}. Message will be discarded. (Trace: {})",
×
399
                    msg.tag,
400
                    err,
401
                    msg.dht_header.message_tag
402
                );
403
                Err(DecryptionError::EnvelopeBodyDecodeFailed)
×
404
            },
405
        }
406
    }
64✔
407
}
408

409
#[cfg(test)]
410
mod test {
411

412
    #![allow(clippy::indexing_slicing)]
413
    use std::sync::Mutex;
414

415
    use futures::{executor::block_on, future};
416
    use tari_comms::{message::MessageExt, test_utils::mocks::create_connectivity_mock, wrap_in_envelope_body};
417
    use tari_test_utils::counter_context;
418
    use tokio::time::sleep;
419
    use tower::service_fn;
420

421
    use super::*;
422
    use crate::{
423
        envelope::DhtMessageFlags,
424
        test_utils::{make_dht_inbound_message, make_dht_inbound_message_raw, make_node_identity},
425
    };
426

427
    /// Receive a message, assert a specific error is raised, and test for peer ban status
428
    async fn expect_error(
20✔
429
        node_identity: Arc<NodeIdentity>,
20✔
430
        message: DhtInboundMessage,
20✔
431
        error: DecryptionError,
20✔
432
        ban: bool,
20✔
433
    ) {
20✔
434
        // Set up messaging
435
        let (connectivity, mock) = create_connectivity_mock();
20✔
436
        let mock_state = mock.spawn();
20✔
437
        let result = Arc::new(Mutex::new(None));
20✔
438
        let service = service_fn({
20✔
439
            let result = result.clone();
20✔
440
            move |msg: DecryptedDhtMessage| {
×
441
                *result.lock().unwrap() = Some(msg);
×
442
                future::ready(Result::<(), PipelineError>::Ok(()))
×
443
            }
×
444
        });
445
        let mut service = DecryptionService::new(Default::default(), node_identity, connectivity, service);
20✔
446

447
        // Receive the message and check for the expected error
448
        let err = service.call(message).await.unwrap_err();
20✔
449
        let err = err.downcast::<DecryptionError>().unwrap();
20✔
450
        assert_eq!(error, err);
20✔
451
        assert!(result.lock().unwrap().is_none());
20✔
452

453
        // Assert the expected ban status
454
        if ban {
20✔
455
            mock_state.await_call_count(1).await;
19✔
456
            assert_eq!(mock_state.count_calls_containing("BanPeer").await, 1);
19✔
457
        } else {
458
            // Waiting like this isn't a guarantee that the peer won't be banned
459
            sleep(Duration::from_secs(1)).await;
1✔
460
            assert_eq!(mock_state.count_calls_containing("BanPeer").await, 0);
1✔
461
        }
462
    }
20✔
463

464
    /// Receive a message successfully, decrypt if possible, check for peer banning, and return the message
465
    async fn expect_no_error(
2✔
466
        node_identity: Arc<NodeIdentity>,
2✔
467
        message: DhtInboundMessage,
2✔
468
        decryption_succeeded: bool,
2✔
469
    ) -> DecryptedDhtMessage {
2✔
470
        // Set up messaging
471
        let (connectivity, mock) = create_connectivity_mock();
2✔
472
        let mock_state = mock.spawn();
2✔
473
        let result = Arc::new(Mutex::new(None));
2✔
474
        let service = service_fn({
2✔
475
            let result = result.clone();
2✔
476
            move |msg: DecryptedDhtMessage| {
2✔
477
                *result.lock().unwrap() = Some(msg);
2✔
478
                future::ready(Result::<(), PipelineError>::Ok(()))
2✔
479
            }
2✔
480
        });
481
        let mut service = DecryptionService::new(Default::default(), node_identity, connectivity, service);
2✔
482

483
        // Receive the message and assert there were no errors
484
        block_on(service.call(message)).unwrap();
2✔
485
        assert!(result.lock().unwrap().is_some());
2✔
486
        let decrypted = result.lock().unwrap().take().unwrap();
2✔
487

488
        // See if decryption succeeded or failed as expected
489
        // We check both functions just in case!
490
        assert_eq!(decrypted.decryption_succeeded(), decryption_succeeded);
2✔
491
        assert_eq!(decrypted.decryption_failed(), !decryption_succeeded);
2✔
492

493
        // Don't ban the peer
494
        // Waiting like this isn't a guarantee that the peer won't be banned later
495
        sleep(Duration::from_secs(1)).await;
2✔
496
        assert_eq!(mock_state.count_calls_containing("BanPeer").await, 0);
2✔
497

498
        // Return the decrypted message for further handling; decryption may have failed
499
        decrypted
2✔
500
    }
2✔
501

502
    #[test]
503
    fn poll_ready() {
1✔
504
        let service = service_fn(|_: DecryptedDhtMessage| future::ready(Result::<(), PipelineError>::Ok(())));
1✔
505
        let node_identity = make_node_identity();
1✔
506
        let (connectivity, _) = create_connectivity_mock();
1✔
507
        let mut service = DecryptionService::new(Default::default(), node_identity, connectivity, service);
1✔
508

509
        counter_context!(cx, counter);
1✔
510

511
        assert!(service.poll_ready(&mut cx).is_ready());
1✔
512

513
        assert_eq!(counter.get(), 0);
1✔
514
    }
1✔
515

516
    #[tokio::test]
517
    /// We can decrypt valid encrypted messages destined for us
518
    async fn decrypt_inbound_success() {
1✔
519
        let node_identity = make_node_identity();
1✔
520

521
        // Encrypt a message for us
522
        let plain_text = wrap_in_envelope_body!(b"Secret plans".to_vec());
1✔
523
        let message =
1✔
524
            make_dht_inbound_message(&node_identity, &plain_text, DhtMessageFlags::ENCRYPTED, true, true).unwrap();
1✔
525

526
        // Check that decryption succeeds and yields the original message
527
        let decrypted = expect_no_error(node_identity, message, true).await;
1✔
528
        assert_eq!(decrypted.decryption_result.unwrap(), plain_text);
1✔
529
    }
1✔
530

531
    #[tokio::test]
532
    /// An encrypted message is not destined for us
533
    async fn decrypt_inbound_not_for_us() {
1✔
534
        let node_identity = make_node_identity();
1✔
535
        let some_other_node_identity = make_node_identity();
1✔
536

537
        // Encrypt a message for someone else
538
        let plain_text = wrap_in_envelope_body!(b"Secret plans".to_vec());
1✔
539
        let message = make_dht_inbound_message(
1✔
540
            &some_other_node_identity,
1✔
541
            &plain_text,
1✔
542
            DhtMessageFlags::ENCRYPTED,
543
            true,
544
            true,
545
        )
546
        .unwrap();
1✔
547

548
        // Check that the message is received, but that decryption fails
549
        let decrypted = expect_no_error(node_identity, message.clone(), false).await;
1✔
550

551
        // The error should contain the message body
552
        assert_eq!(decrypted.decryption_result.unwrap_err(), message.body);
1✔
553
    }
1✔
554

555
    #[tokio::test]
556
    /// An encrypted message is empty
557
    async fn empty_message() {
1✔
558
        let node_identity = make_node_identity();
1✔
559
        let other_identity = make_node_identity();
1✔
560

561
        // Encrypt an empty message
562
        for identity in [&node_identity, &other_identity] {
2✔
563
            let mut message =
2✔
564
                make_dht_inbound_message(identity, &Vec::new(), DhtMessageFlags::ENCRYPTED, true, true).unwrap();
2✔
565
            message.body = Vec::new(); // due to padding, we need to manually reset this
2✔
566

1✔
567
            // Ban the peer
1✔
568
            expect_error(
2✔
569
                node_identity.clone(),
2✔
570
                message,
2✔
571
                DecryptionError::BadEncryptedMessageSemantics,
2✔
572
                true,
2✔
573
            )
2✔
574
            .await;
2✔
575
        }
1✔
576
    }
1✔
577

578
    #[tokio::test]
579
    /// An encrypted message is destined for us but can't be decrypted
580
    async fn decrypt_inbound_fail_for_us() {
1✔
581
        let node_identity = make_node_identity();
1✔
582

583
        // Encrypt an invalid message destined for us
584
        let nonsense = b"Cannot Decrypt this".to_vec();
1✔
585
        let message =
1✔
586
            make_dht_inbound_message_raw(&node_identity, nonsense, DhtMessageFlags::ENCRYPTED, true, true).unwrap();
1✔
587

588
        // Don't ban the peer
589
        expect_error(
1✔
590
            node_identity,
1✔
591
            message,
1✔
592
            DecryptionError::MessageRejectDecryptionFailed,
1✔
593
            false,
1✔
594
        )
1✔
595
        .await;
1✔
596
    }
1✔
597

598
    #[tokio::test]
599
    /// An encrypted message has no destination
600
    async fn decrypt_inbound_fail_no_destination() {
1✔
601
        let node_identity = make_node_identity();
1✔
602

603
        // Encrypt a message with no destination
604
        let plain_text_msg = b"Secret message to nowhere".to_vec();
1✔
605
        let message =
1✔
606
            make_dht_inbound_message(&node_identity, &plain_text_msg, DhtMessageFlags::ENCRYPTED, true, false).unwrap();
1✔
607

608
        // Ban the peer
609
        expect_error(
1✔
610
            node_identity,
1✔
611
            message,
1✔
612
            DecryptionError::BadEncryptedMessageSemantics,
1✔
613
            true,
1✔
614
        )
1✔
615
        .await;
1✔
616
    }
1✔
617

618
    #[tokio::test]
619
    /// A message has an invalid signature
620
    async fn decrypt_inbound_fail_invalid_signature() {
1✔
621
        let node_identity = make_node_identity();
1✔
622
        let other_identity = make_node_identity();
1✔
623
        let plain_text_msg = b"a message".to_vec();
1✔
624

625
        // Handle the cases where we are and aren't the recipient
626
        for identity in [&node_identity, &other_identity] {
2✔
627
            // Handle the cases where the message is and isn't encrypted
1✔
628
            for flag in [DhtMessageFlags::NONE, DhtMessageFlags::ENCRYPTED] {
6✔
629
                let mut message = make_dht_inbound_message(identity, &plain_text_msg, flag, true, true).unwrap();
4✔
630

1✔
631
                // Manipulate the signature so it's invalid
1✔
632
                let malleated_index = message.dht_header.message_signature.len() - 1;
4✔
633
                message.dht_header.message_signature[malleated_index] =
4✔
634
                    !message.dht_header.message_signature[malleated_index];
4✔
635

1✔
636
                // Ban the peer
1✔
637
                expect_error(node_identity.clone(), message, DecryptionError::InvalidSignature, true).await;
4✔
638
            }
1✔
639
        }
1✔
640
    }
1✔
641

642
    #[tokio::test]
643
    /// An encrypted message has no signature
644
    async fn decrypt_inbound_fail_missing_signature_encrypted() {
1✔
645
        let node_identity = make_node_identity();
1✔
646
        let other_identity = make_node_identity();
1✔
647
        let plain_text_msg = b"a secret message".to_vec();
1✔
648

649
        // Handle the cases where we are and aren't the recipient
650
        for identity in [&node_identity, &other_identity] {
2✔
651
            let mut message =
2✔
652
                make_dht_inbound_message(identity, &plain_text_msg, DhtMessageFlags::ENCRYPTED, true, true).unwrap();
2✔
653

1✔
654
            // Remove the signature
1✔
655
            message.dht_header.message_signature = Vec::new();
2✔
656

1✔
657
            // Ban the peer
1✔
658
            expect_error(
2✔
659
                node_identity.clone(),
2✔
660
                message,
2✔
661
                DecryptionError::BadEncryptedMessageSemantics,
2✔
662
                true,
2✔
663
            )
2✔
664
            .await;
2✔
665
        }
1✔
666
    }
1✔
667

668
    #[tokio::test]
669
    /// An encrypted message has no ephemeral key
670
    async fn decrypt_inbound_fail_missing_ephemeral_encrypted() {
1✔
671
        let node_identity = make_node_identity();
1✔
672
        let other_identity = make_node_identity();
1✔
673
        let plain_text_msg = b"a secret message".to_vec();
1✔
674

675
        // Handle the cases where we are and aren't the recipient
676
        for identity in [&node_identity, &other_identity] {
2✔
677
            let mut message =
2✔
678
                make_dht_inbound_message(identity, &plain_text_msg, DhtMessageFlags::ENCRYPTED, true, true).unwrap();
2✔
679

1✔
680
            // Remove the ephemeral key
1✔
681
            message.dht_header.ephemeral_public_key = None;
2✔
682

1✔
683
            // Ban the peer
1✔
684
            expect_error(
2✔
685
                node_identity.clone(),
2✔
686
                message,
2✔
687
                DecryptionError::BadEncryptedMessageSemantics,
2✔
688
                true,
2✔
689
            )
2✔
690
            .await;
2✔
691
        }
1✔
692
    }
1✔
693

694
    #[tokio::test]
695
    /// A message has a signature that can't be decoded (wire format)
696
    async fn decrypt_inbound_fail_signature_decode_wire() {
1✔
697
        let node_identity = make_node_identity();
1✔
698
        let other_identity = make_node_identity();
1✔
699
        let plain_text_msg = b"a message".to_vec();
1✔
700

701
        // Handle the cases where we are and aren't the recipient
702
        for identity in [&node_identity, &other_identity] {
2✔
703
            // Handle the cases where the message is and isn't encrypted
1✔
704
            for flag in [DhtMessageFlags::NONE, DhtMessageFlags::ENCRYPTED] {
6✔
705
                let mut message = make_dht_inbound_message(identity, &plain_text_msg, flag, true, true).unwrap();
4✔
706

1✔
707
                // Render the signature not decodable
1✔
708
                message.dht_header.message_signature = vec![1u8; 32];
4✔
709

1✔
710
                // Ban the peer
1✔
711
                expect_error(node_identity.clone(), message, DecryptionError::InvalidSignature, true).await;
4✔
712
            }
1✔
713
        }
1✔
714
    }
1✔
715

716
    #[tokio::test]
717
    /// A message has a signature that can't be decoded (signature structure)
718
    async fn decrypt_inbound_fail_signature_decode_structure() {
1✔
719
        let node_identity = make_node_identity();
1✔
720
        let other_identity = make_node_identity();
1✔
721
        let plain_text_msg = b"a message".to_vec();
1✔
722

723
        // Handle the cases where we are and aren't the recipient
724
        for identity in [&node_identity, &other_identity] {
2✔
725
            // Handle the cases where the message is and isn't encrypted
1✔
726
            for flag in [DhtMessageFlags::NONE, DhtMessageFlags::ENCRYPTED] {
6✔
727
                let mut message = make_dht_inbound_message(identity, &plain_text_msg, flag, true, true).unwrap();
4✔
728

1✔
729
                // Render a signature field not decodable
1✔
730
                let mut signature =
4✔
731
                    MessageSignature::new_signed(node_identity.secret_key().clone(), &plain_text_msg).to_proto();
4✔
732
                signature.signer_public_key = vec![1u8; 8]; // invalid public key encoding
4✔
733
                message.dht_header.message_signature = signature.to_encoded_bytes();
4✔
734

1✔
735
                // Ban the peer
1✔
736
                expect_error(node_identity.clone(), message, DecryptionError::InvalidSignature, true).await;
4✔
737
            }
1✔
738
        }
1✔
739
    }
1✔
740
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc