• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tari-project / tari / 16990089413

15 Aug 2025 12:36PM UTC coverage: 54.497% (+0.06%) from 54.441%
16990089413

push

github

web-flow
chore: cleanup indexes (#7411)

Description
---
Forces clean indexs

970 of 2919 new or added lines in 369 files covered. (33.23%)

60 existing lines in 33 files now uncovered.

76698 of 140739 relevant lines covered (54.5%)

193749.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.49
/comms/dht/src/inbound/dht_handler/task.rs
1
// Copyright 2019, The Tari Project
2
//
3
// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
4
// following conditions are met:
5
//
6
// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
7
// disclaimer.
8
//
9
// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
10
// following disclaimer in the documentation and/or other materials provided with the distribution.
11
//
12
// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
13
// products derived from this software without specific prior written permission.
14
//
15
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
16
// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
18
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
19
// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
20
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
21
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
22

23
use std::{convert::TryInto, sync::Arc};
24

25
use log::*;
26
use tari_comms::{
27
    message::MessageExt,
28
    peer_manager::{NodeId, NodeIdentity, PeerManager},
29
    pipeline::PipelineError,
30
    types::CommsPublicKey,
31
    OrNotFound,
32
};
33
use tari_utilities::{hex::Hex, ByteArray};
34
use tower::{Service, ServiceExt};
35

36
use crate::{
37
    actor::OffenceSeverity,
38
    discovery::DhtDiscoveryRequester,
39
    envelope::NodeDestination,
40
    inbound::{error::DhtInboundError, message::DecryptedDhtMessage},
41
    outbound::{OutboundMessageRequester, SendMessageParams},
42
    peer_validator::{DhtPeerValidatorError, PeerValidator},
43
    proto::{
44
        dht::{DiscoveryMessage, DiscoveryResponseMessage, JoinMessage},
45
        envelope::DhtMessageType,
46
    },
47
    rpc::UnvalidatedPeerInfo,
48
    DhtConfig,
49
    DhtRequester,
50
};
51

52
const LOG_TARGET: &str = "comms::dht::dht_handler";
53

54
pub struct ProcessDhtMessage<S> {
55
    next_service: S,
56
    peer_manager: Arc<PeerManager>,
57
    outbound_service: OutboundMessageRequester,
58
    node_identity: Arc<NodeIdentity>,
59
    dht: DhtRequester,
60
    message: Option<DecryptedDhtMessage>,
61
    discovery_requester: DhtDiscoveryRequester,
62
    config: Arc<DhtConfig>,
63
}
64

65
impl<S> ProcessDhtMessage<S>
66
where S: Service<DecryptedDhtMessage, Response = (), Error = PipelineError>
67
{
68
    pub fn new(
65✔
69
        next_service: S,
65✔
70
        peer_manager: Arc<PeerManager>,
65✔
71
        outbound_service: OutboundMessageRequester,
65✔
72
        node_identity: Arc<NodeIdentity>,
65✔
73
        dht: DhtRequester,
65✔
74
        discovery_requester: DhtDiscoveryRequester,
65✔
75
        message: DecryptedDhtMessage,
65✔
76
        config: Arc<DhtConfig>,
65✔
77
    ) -> Self {
65✔
78
        Self {
65✔
79
            next_service,
65✔
80
            peer_manager,
65✔
81
            outbound_service,
65✔
82
            node_identity,
65✔
83
            dht,
65✔
84
            discovery_requester,
65✔
85
            message: Some(message),
65✔
86
            config,
65✔
87
        }
65✔
88
    }
65✔
89

90
    pub async fn run(mut self) -> Result<(), PipelineError> {
65✔
91
        let message = self
65✔
92
            .message
65✔
93
            .take()
65✔
94
            .expect("ProcessDhtMessage initialized without message");
65✔
95

65✔
96
        // If this message failed to decrypt, we stop it going further at this layer
65✔
97
        if message.decryption_failed() {
65✔
98
            debug!(
7✔
99
                target: LOG_TARGET,
×
100
                "Message that failed to decrypt will be discarded here. DhtHeader={}", message.dht_header
×
101
            );
102
            return Ok(());
7✔
103
        }
58✔
104

58✔
105
        if message.is_duplicate() {
58✔
106
            debug!(
6✔
107
                target: LOG_TARGET,
×
108
                "Received message ({}) that has already been received {} time(s). Last sent by peer '{}', passing on \
×
109
                 to next service (Trace: {})",
×
110
                message.tag,
×
111
                message.dedup_hit_count,
×
112
                message.source_peer.node_id.short_str(),
×
113
                message.dht_header.message_tag,
114
            );
115
            self.next_service.oneshot(message).await?;
6✔
116
            return Ok(());
6✔
117
        }
52✔
118

52✔
119
        trace!(
52✔
120
            target: LOG_TARGET,
×
121
            "Received DHT message type `{}` (Source peer: {}, Tag: {}, Trace: {})",
×
122
            message.dht_header.message_type,
×
123
            message.source_peer.node_id,
×
124
            message.tag,
125
            message.dht_header.message_tag
126
        );
127
        match message.dht_header.message_type {
52✔
128
            DhtMessageType::Join => self.handle_join(message).await?,
3✔
129
            DhtMessageType::Discovery => self.handle_discover(message).await?,
2✔
130
            DhtMessageType::DiscoveryResponse => self.handle_discover_response(message).await?,
2✔
131
            // Not a DHT message, call downstream middleware
132
            _ => {
133
                trace!(
45✔
134
                    target: LOG_TARGET,
×
135
                    "Passing message {} onto next service (Trace: {})",
×
136
                    message.tag,
137
                    message.dht_header.message_tag
138
                );
139
                self.next_service.oneshot(message).await?;
45✔
140
            },
141
        }
142

143
        Ok(())
51✔
144
    }
65✔
145

146
    #[allow(clippy::too_many_lines)]
147
    async fn handle_join(&mut self, message: DecryptedDhtMessage) -> Result<(), DhtInboundError> {
3✔
148
        let DecryptedDhtMessage {
3✔
149
            decryption_result,
3✔
150
            dht_header,
3✔
151
            source_peer,
3✔
152
            authenticated_origin,
3✔
153
            is_saf_message,
3✔
154
            ..
3✔
155
        } = message;
3✔
156

157
        // Ban the source peer. They should not have propagated a DHT discover response.
158
        let Some(authenticated_pk) = authenticated_origin else {
3✔
159
            warn!(
×
160
                target: LOG_TARGET,
×
NEW
161
                "Received JoinMessage that did not have an authenticated origin from source peer {source_peer}. Banning source"
×
162
            );
163

164
            self.dht
×
165
                .ban_peer(
×
166
                    source_peer.public_key.clone(),
×
167
                    OffenceSeverity::Low,
×
168
                    "Received JoinMessage that did not have an authenticated origin",
×
169
                )
×
170
                .await;
×
171
            return Ok(());
×
172
        };
173

174
        if authenticated_pk == *self.node_identity.public_key() {
3✔
175
            debug!(target: LOG_TARGET, "Received our own join message. Discarding it.");
×
176
            return Ok(());
×
177
        }
3✔
178

3✔
179
        let body = decryption_result.expect("already checked that this message decrypted successfully");
3✔
180
        let join_msg = self
3✔
181
            .ban_on_offence(
3✔
182
                &authenticated_pk,
3✔
183
                body.decode_part::<JoinMessage>(0)
3✔
184
                    .map_err(Into::into)
3✔
185
                    .and_then(|o| o.ok_or(DhtInboundError::InvalidMessageBody)),
3✔
186
            )
3✔
187
            .await?;
3✔
188

189
        if join_msg.public_key.as_slice() != authenticated_pk.as_bytes() {
3✔
190
            warn!(
×
191
                target: LOG_TARGET,
×
192
                "Received JoinMessage from peer that mismatches the authenticated origin. \
×
193
                This message was signed by another party which may be attempting to get other nodes banned. \
×
194
                Banning the message signer."
×
195
            );
196

197
            warn!(
×
198
                target: LOG_TARGET,
×
199
                "Authenticated origin: {:#.6}, Source: {:#.6}, join message: {}",
×
200
                authenticated_pk, source_peer.public_key, join_msg.public_key.to_hex()
×
201
            );
202
            self.dht
×
203
                .ban_peer(
×
204
                    authenticated_pk,
×
205
                    OffenceSeverity::High,
×
206
                    "Received JoinMessage from peer with a public key that does not match the source peer",
×
207
                )
×
208
                .await;
×
209

210
            return Ok(());
×
211
        }
3✔
212

3✔
213
        debug!(
3✔
214
            target: LOG_TARGET,
×
NEW
215
            "Received join Message from '{authenticated_pk}' {join_msg}"
×
216
        );
217

218
        let validator = PeerValidator::new(&self.config);
3✔
219
        let maybe_existing = self.peer_manager.find_by_public_key(&authenticated_pk).await?;
3✔
220
        let valid_peer = self
3✔
221
            .ban_on_offence(
3✔
222
                &authenticated_pk,
3✔
223
                validator
3✔
224
                    .validate_peer(join_msg.try_into()?, maybe_existing)
3✔
225
                    .map_err(Into::into),
3✔
226
            )
3✔
227
            .await?;
3✔
228

229
        let is_banned = valid_peer.is_banned();
2✔
230
        let valid_peer_node_id = valid_peer.node_id.clone();
2✔
231
        let valid_peer_public_key = valid_peer.public_key.clone();
2✔
232
        // Update peer details. If the peer is banned we preserve the ban but still allow them to update their claims.
2✔
233
        self.peer_manager.add_or_update_peer(valid_peer).await?;
2✔
234

235
        // DO NOT propagate this peer if this node has banned them
236
        if is_banned {
2✔
237
            debug!(
×
238
                target: LOG_TARGET,
×
239
                "Received Join request for banned peer. This join request will not be propagated."
×
240
            );
241
            return Ok(());
×
242
        }
2✔
243

2✔
244
        if is_saf_message {
2✔
245
            debug!(
×
246
                target: LOG_TARGET,
×
247
                "Not re-propagating join message received from store and forward"
×
248
            );
249
            return Ok(());
×
250
        }
2✔
251

2✔
252
        // Only propagate a join that was not directly sent to this node
2✔
253
        if dht_header.destination != self.node_identity.public_key() {
2✔
254
            debug!(
2✔
255
                target: LOG_TARGET,
×
256
                "Propagating Join message from peer '{}'",
×
257
                valid_peer_node_id.short_str()
×
258
            );
259
            // Propagate message to closer peers
260
            self.outbound_service
2✔
261
                .send_raw_no_wait(
2✔
262
                    SendMessageParams::new()
2✔
263
                        .propagate(valid_peer_public_key.into(), vec![
2✔
264
                            valid_peer_node_id,
2✔
265
                            source_peer.node_id.clone(),
2✔
266
                        ])
2✔
267
                        .with_debug_info("Propagating join message".to_string())
2✔
268
                        .with_dht_header(dht_header)
2✔
269
                        .finish(),
2✔
270
                    body.encode_into_bytes_mut(),
2✔
271
                )
2✔
272
                .await?;
2✔
273
        }
×
274

275
        Ok(())
2✔
276
    }
3✔
277

278
    async fn handle_discover_response(&mut self, message: DecryptedDhtMessage) -> Result<(), DhtInboundError> {
2✔
279
        trace!(
2✔
280
            target: LOG_TARGET,
×
281
            "Received Discover Response Message from {}",
×
282
            message
×
283
                .authenticated_origin
×
284
                .as_ref()
×
285
                .map(|pk| pk.to_hex())
×
286
                .unwrap_or_else(|| "<unknown>".to_string())
×
287
        );
288

289
        let msg = message
2✔
290
            .success()
2✔
291
            .expect("already checked that this message decrypted successfully");
2✔
292

293
        // Ban the source peer. They should not have propagated a DHT discover response.
294
        let Some(authenticated_origin) = message.authenticated_origin.as_ref() else {
2✔
295
            warn!(
×
296
                target: LOG_TARGET,
×
NEW
297
                "Received DiscoveryResponseMessage that did not have an authenticated origin: {message}. Banning source"
×
298
            );
299
            self.dht
×
300
                .ban_peer(
×
301
                    message.source_peer.public_key.clone(),
×
302
                    OffenceSeverity::Low,
×
303
                    "Received DiscoveryResponseMessage that did not have an authenticated origin",
×
304
                )
×
305
                .await;
×
306

307
            return Ok(());
×
308
        };
309

310
        let discover_msg = self
2✔
311
            .ban_on_offence(
2✔
312
                authenticated_origin,
2✔
313
                msg.decode_part::<DiscoveryResponseMessage>(0)
2✔
314
                    .map_err(Into::into)
2✔
315
                    .and_then(|o| o.ok_or(DhtInboundError::InvalidMessageBody)),
2✔
316
            )
2✔
317
            .await?;
2✔
318

319
        if *authenticated_origin != message.source_peer.public_key ||
2✔
320
            authenticated_origin.as_bytes() != discover_msg.public_key.as_slice()
2✔
321
        {
322
            warn!(
×
323
                target: LOG_TARGET,
×
324
                "Received DiscoveryResponseMessage from peer that mismatches the discovery response. \
×
325
                This message was signed by another party which may be attempting to get other nodes banned. \
×
326
                Banning the message signer."
×
327
            );
328

329
            warn!(
×
330
                target: LOG_TARGET,
×
331
                "Authenticated origin: {:#.6}, Source: {:#.6}, discovery message: {}",
×
332
                authenticated_origin, message.source_peer.public_key, discover_msg.public_key.to_hex()
×
333
            );
334
            self.dht
×
335
                .ban_peer(
×
336
                    authenticated_origin.clone(),
×
337
                    OffenceSeverity::High,
×
338
                    "Received DiscoveryResponseMessage from peer with a public key that does not match the source peer",
×
339
                )
×
340
                .await;
×
341

342
            return Ok(());
×
343
        }
2✔
344

2✔
345
        self.discovery_requester
2✔
346
            .notify_discovery_response_received(discover_msg)
2✔
347
            .await?;
2✔
348

349
        Ok(())
2✔
350
    }
2✔
351

352
    async fn handle_discover(&mut self, message: DecryptedDhtMessage) -> Result<(), DhtInboundError> {
2✔
353
        let msg = message
2✔
354
            .success()
2✔
355
            .expect("already checked that this message decrypted successfully");
2✔
356

357
        let Some(authenticated_pk) = message.authenticated_origin.as_ref() else {
2✔
358
            warn!(
×
359
                target: LOG_TARGET,
×
360
                "Received Discover that did not have an authenticated origin from source peer {}. Banning source", message.source_peer
×
361
            );
362
            self.dht
×
363
                .ban_peer(
×
364
                    message.source_peer.public_key.clone(),
×
365
                    OffenceSeverity::Low,
×
366
                    "Received JoinMessage that did not have an authenticated origin",
×
367
                )
×
368
                .await;
×
369

370
            return Ok(());
×
371
        };
372

373
        let discover_msg = self
2✔
374
            .ban_on_offence(
2✔
375
                authenticated_pk,
2✔
376
                msg.decode_part::<DiscoveryMessage>(0)
2✔
377
                    .map_err(Into::into)
2✔
378
                    .and_then(|o| o.ok_or(DhtInboundError::InvalidMessageBody)),
2✔
379
            )
2✔
380
            .await?;
2✔
381

382
        let nonce = discover_msg.nonce;
2✔
383

2✔
384
        debug!(
2✔
385
            target: LOG_TARGET,
×
386
            "Received discovery message from '{}', forwarded by {}", authenticated_pk, message.source_peer
×
387
        );
388

389
        let new_peer: UnvalidatedPeerInfo = self
2✔
390
            .ban_on_offence(
2✔
391
                authenticated_pk,
2✔
392
                discover_msg
2✔
393
                    .try_into()
2✔
394
                    .map_err(DhtInboundError::InvalidDiscoveryMessage),
2✔
395
            )
2✔
396
            .await?;
2✔
397
        let node_id = NodeId::from_public_key(&new_peer.public_key);
2✔
398

2✔
399
        let peer_validator = PeerValidator::new(&self.config);
2✔
400
        let maybe_existing_peer = self.peer_manager.find_by_public_key(&new_peer.public_key).await?;
2✔
401
        let peer = peer_validator.validate_peer(new_peer, maybe_existing_peer)?;
2✔
402
        self.peer_manager.add_or_update_peer(peer).await?;
2✔
403
        let origin_peer = self
2✔
404
            .peer_manager
2✔
405
            .find_by_node_id(&node_id)
2✔
406
            .await
2✔
407
            .or_not_found(&node_id)?;
2✔
408

409
        // Don't send a join request to the origin peer if they are banned
410
        if origin_peer.is_banned() {
2✔
411
            warn!(
×
412
                target: LOG_TARGET,
×
NEW
413
                "Received Discovery request for banned peer '{node_id}'. Not propagating further."
×
414
            );
415
            return Ok(());
×
416
        }
2✔
417

2✔
418
        // Send the origin the current nodes latest contact info
2✔
419
        self.send_discovery_response(origin_peer.public_key, nonce).await?;
2✔
420

421
        Ok(())
2✔
422
    }
2✔
423

424
    /// Send a `DiscoveryResponseMessage` in response to a `DiscoveryMessage` to the given public key
425
    /// using the given nonce which should come from the `DiscoveryMessage`
426
    async fn send_discovery_response(
2✔
427
        &mut self,
2✔
428
        dest_public_key: CommsPublicKey,
2✔
429
        nonce: u64,
2✔
430
    ) -> Result<(), DhtInboundError> {
2✔
431
        let response = DiscoveryResponseMessage {
2✔
432
            public_key: self.node_identity.public_key().to_vec(),
2✔
433
            addresses: self
2✔
434
                .node_identity
2✔
435
                .public_addresses()
2✔
436
                .iter()
2✔
437
                .map(|a| a.to_vec())
2✔
438
                .collect(),
2✔
439
            peer_features: self.node_identity.features().bits(),
2✔
440
            nonce,
2✔
441
            identity_signature: self.node_identity.identity_signature_read().as_ref().map(Into::into),
2✔
442
        };
2✔
443

2✔
444
        trace!(target: LOG_TARGET, "Sending discovery response to {dest_public_key}");
2✔
445
        self.outbound_service
2✔
446
            .send_message_no_header_no_wait(
2✔
447
                SendMessageParams::new()
2✔
448
                    .direct_public_key(dest_public_key)
2✔
449
                    .with_debug_info("Sending discovery response".to_string())
2✔
450
                    .with_destination(NodeDestination::Unknown)
2✔
451
                    .with_dht_message_type(DhtMessageType::DiscoveryResponse)
2✔
452
                    .force_origin()
2✔
453
                    .finish(),
2✔
454
                response,
2✔
455
            )
2✔
456
            .await?;
2✔
457

458
        Ok(())
2✔
459
    }
2✔
460

461
    async fn ban_on_offence<T>(
12✔
462
        &mut self,
12✔
463
        authenticated_pk: &CommsPublicKey,
12✔
464
        result: Result<T, DhtInboundError>,
12✔
465
    ) -> Result<T, DhtInboundError> {
12✔
466
        match result {
12✔
467
            Ok(r) => Ok(r),
11✔
468
            Err(err) => {
1✔
469
                match &err {
1✔
470
                    DhtInboundError::PeerValidatorError(err) => match err {
1✔
471
                        DhtPeerValidatorError::NewAndExistingMismatch { .. } => {},
×
472
                        err @ DhtPeerValidatorError::ValidatorError(_) |
1✔
473
                        err @ DhtPeerValidatorError::IdentityTooManyClaims { .. } => {
×
474
                            self.dht
1✔
475
                                .ban_peer(authenticated_pk.clone(), OffenceSeverity::Medium, err)
1✔
476
                                .await;
1✔
477
                        },
478
                    },
479
                    err @ DhtInboundError::MessageError(_) | err @ DhtInboundError::InvalidMessageBody => {
×
480
                        self.dht
×
481
                            .ban_peer(authenticated_pk.clone(), OffenceSeverity::High, err)
×
482
                            .await;
×
483
                    },
484
                    DhtInboundError::PeerManagerError(_) => {},
×
485
                    DhtInboundError::DhtOutboundError(_) => {},
×
486
                    DhtInboundError::DhtDiscoveryError(_) => {},
×
487
                    DhtInboundError::OriginRequired(_) => {},
×
488
                    err @ DhtInboundError::InvalidDiscoveryMessage(_) => {
×
489
                        self.dht
×
490
                            .ban_peer(authenticated_pk.clone(), OffenceSeverity::High, err)
×
491
                            .await;
×
492
                    },
493
                    DhtInboundError::ConnectivityError(_) => {},
×
494
                }
495
                Err(err)
1✔
496
            },
497
        }
498
    }
12✔
499
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc