• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tari-project / tari / 18279939245

06 Oct 2025 11:53AM UTC coverage: 58.602% (-1.0%) from 59.56%
18279939245

push

github

web-flow
feat: track ConsensusConstants to detect network changes and notify user (#7533)

## Problem

The minotari_node currently has no mechanism to detect when
ConsensusConstants have changed between restarts. This creates a risk
where a node operator might unknowingly run an incompatible version
after consensus rule changes have already taken effect, potentially
leading to network fork issues or sync problems.

## Solution

This PR implements a lightweight ConsensusConstants tracking system
that:

1. **Stores consensus constants** on each startup in
`{data_dir}/consensus_constants.json`
2. **Compares constants** between restarts to detect changes
3. **Validates timing** - only raises errors when new constants are
already active at current blockchain height
4. **Provides clear feedback** to users about potential network
compatibility issues

## Implementation Details

### New Components

- **ConsensusConstantsTracker**: A new module that handles persistence
and comparison of consensus constants
- **Integration point**: Added to node startup flow in `builder.rs`
after database initialization but before node start
- **Minimal storage**: Only tracks essential fields needed for
compatibility detection

### User Experience

The implementation provides different behaviors based on the situation:

- **First startup**: Silent operation, creates tracking file
- **No changes**: Silent operation, continues normally  
- **Constants changed, not yet active**: Info log message, continues
normally
- **Constants changed, already active**: **Critical error with clear
message and exit**

Example error message:
```
CRITICAL: Consensus constants have changed and the new constants are already active!
Current height: 15000
Active consensus constants changed from effective height 0 to 15000  
This indicates a potential network fork or version mismatch.
Please verify you are running the correct version of the node for this network.
```

### Files Changed

- `base_layer/core/src/ch... (continued)

67 of 70 new or added lines in 7 files covered. (95.71%)

1118 existing lines in 28 files now uncovered.

66535 of 113537 relevant lines covered (58.6%)

550271.13 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.39
/base_layer/core/src/mempool/sync_protocol/mod.rs
1
//  Copyright 2020, The Tari Project
2
//
3
//  Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
4
//  following conditions are met:
5
//
6
//  1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
7
//  disclaimer.
8
//
9
//  2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
10
//  following disclaimer in the documentation and/or other materials provided with the distribution.
11
//
12
//  3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
13
//  products derived from this software without specific prior written permission.
14
//
15
//  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
16
//  INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17
//  DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
18
//  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
19
//  SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
20
//  WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
21
//  USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
22

23
//! # Mempool Sync Protocol
24
//!
25
//! The protocol handler for the mempool is responsible for the initial sync of transactions from peers.
26
//! In order to prevent duplicate transactions being received from multiple peers, syncing occurs one peer at a time.
27
//! This node will initiate this protocol up to a configurable (`MempoolSyncConfig::num_initial_sync_peers`) number
28
//! of times. After that, it will only respond to sync requests from remote peers.
29
//!
30
//! ## Protocol Flow
31
//!
32
//! Alice initiates (initiator) the connection to Bob (responder).
33
//! As the initiator, Alice MUST send a transaction inventory
34
//! Bob SHOULD respond with any transactions known to him, excluding the transactions in the inventory
35
//! Bob MUST send a complete message (An empty `TransactionItem` or 1 byte in protobuf)
36
//! Bob MUST send indexes of inventory items that are not known to him
37
//! Alice SHOULD return the Transactions relating to those indexes
38
//! Alice SHOULD close the stream immediately after sending
39
//!
40
//!
41
//! ```text
42
//!  +-------+                    +-----+
43
//!  | Alice |                    | Bob |
44
//!  +-------+                    +-----+
45
//!  |                                |
46
//!  | Txn Inventory                  |
47
//!  |------------------------------->|
48
//!  |                                |
49
//!  |      TransactionItem(tx_b1)    |
50
//!  |<-------------------------------|
51
//!  |             ...streaming...    |
52
//!  |      TransactionItem(empty)    |
53
//!  |<-------------------------------|
54
//!  |  Inventory missing txn indexes |
55
//!  |<-------------------------------|
56
//!  |                                |
57
//!  | TransactionItem(tx_a1)         |
58
//!  |------------------------------->|
59
//!  |             ...streaming...    |
60
//!  | TransactionItem(empty)         |
61
//!  |------------------------------->|
62
//!  |                                |
63
//!  |             END                |
64
//! ```
65

66
use std::{
67
    convert::TryFrom,
68
    iter,
69
    sync::{
70
        atomic::{AtomicUsize, Ordering},
71
        Arc,
72
    },
73
    time::Duration,
74
};
75

76
use error::MempoolProtocolError;
77
use futures::{stream, SinkExt, Stream, StreamExt};
78
pub use initializer::MempoolSyncInitializer;
79
use log::*;
80
use prost::Message;
81
use tari_comms::{
82
    connectivity::{ConnectivityEvent, ConnectivityRequester, ConnectivitySelection},
83
    framing,
84
    framing::CanonicalFraming,
85
    message::MessageExt,
86
    peer_manager::{NodeId, PeerFeatures},
87
    protocol::{ProtocolEvent, ProtocolNotification, ProtocolNotificationRx},
88
    Bytes,
89
    PeerConnection,
90
};
91
use tari_transaction_components::transaction_components::Transaction;
92
use tari_utilities::{hex::Hex, ByteArray};
93
use tokio::{
94
    io::{AsyncRead, AsyncWrite},
95
    sync::Semaphore,
96
    task,
97
    time,
98
};
99

100
#[cfg(feature = "metrics")]
101
use crate::mempool::metrics;
102
use crate::{
103
    base_node::comms_interface::{BlockEvent, BlockEventReceiver},
104
    chain_storage::BlockAddResult,
105
    mempool::{proto, Mempool, MempoolServiceConfig},
106
    proto as shared_proto,
107
};
108

109
#[cfg(test)]
110
mod test;
111

112
mod error;
113
mod initializer;
114

115
const MAX_FRAME_SIZE: usize = 3 * 1024 * 1024; // 3 MiB
116
const LOG_TARGET: &str = "c::mempool::sync_protocol";
117

118
pub static MEMPOOL_SYNC_PROTOCOL: Bytes = Bytes::from_static(b"t/mempool-sync/1");
119

120
pub struct MempoolSyncProtocol<TSubstream> {
121
    config: MempoolServiceConfig,
122
    protocol_notifier: ProtocolNotificationRx<TSubstream>,
123
    mempool: Mempool,
124
    num_synched: Arc<AtomicUsize>,
125
    permits: Arc<Semaphore>,
126
    connectivity: ConnectivityRequester,
127
    block_event_stream: BlockEventReceiver,
128
}
129

130
impl<TSubstream> MempoolSyncProtocol<TSubstream>
131
where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + Sync + 'static
132
{
133
    pub fn new(
6✔
134
        config: MempoolServiceConfig,
6✔
135
        protocol_notifier: ProtocolNotificationRx<TSubstream>,
6✔
136
        mempool: Mempool,
6✔
137
        connectivity: ConnectivityRequester,
6✔
138
        block_event_stream: BlockEventReceiver,
6✔
139
    ) -> Self {
6✔
140
        Self {
6✔
141
            config,
6✔
142
            protocol_notifier,
6✔
143
            mempool,
6✔
144
            num_synched: Arc::new(AtomicUsize::new(0)),
6✔
145
            permits: Arc::new(Semaphore::new(1)),
6✔
146
            connectivity,
6✔
147
            block_event_stream,
6✔
148
        }
6✔
149
    }
6✔
150

151
    pub async fn run(mut self) {
6✔
152
        info!(target: LOG_TARGET, "Mempool protocol handler has started");
6✔
153

154
        let mut connectivity_events = self.connectivity.get_event_subscription();
6✔
155
        loop {
156
            tokio::select! {
12✔
157
                Ok(block_event) = self.block_event_stream.recv() => {
12✔
158
                    self.handle_block_event(&block_event).await;
×
159
                },
160
                Ok(event) = connectivity_events.recv() => {
12✔
161
                    self.handle_connectivity_event(event).await;
4✔
162
                },
163

164
                Some(notif) = self.protocol_notifier.recv() => {
12✔
165
                    self.handle_protocol_notification(notif);
2✔
166
                }
2✔
167
            }
168
        }
169
    }
170

171
    async fn handle_connectivity_event(&mut self, event: ConnectivityEvent) {
4✔
172
        match event {
4✔
173
            // If this node is connecting to a peer
174
            ConnectivityEvent::PeerConnected(conn) if conn.direction().is_outbound() => {
4✔
175
                // This protocol is only spoken between base nodes
176
                if !conn.peer_features().contains(PeerFeatures::COMMUNICATION_NODE) {
4✔
177
                    return;
×
178
                }
4✔
179

180
                if !self.is_synched() {
4✔
181
                    self.spawn_initiator_protocol(*conn.clone()).await;
4✔
182
                }
×
183
            },
184
            _ => {},
×
185
        }
186
    }
4✔
187

188
    async fn handle_block_event(&mut self, block_event: &BlockEvent) {
×
189
        use BlockEvent::{BlockSyncComplete, ValidBlockAdded};
190
        match block_event {
×
191
            ValidBlockAdded(_, BlockAddResult::ChainReorg { added, removed: _ }) => {
×
192
                if added.len() < self.config.block_sync_trigger {
×
193
                    return;
×
194
                }
×
195
            },
196
            BlockSyncComplete(tip, starting_sync_height) => {
×
197
                let added = tip.height() - starting_sync_height;
×
198
                if added < self.config.block_sync_trigger as u64 {
×
199
                    return;
×
200
                }
×
201
            },
202
            _ => {
203
                return;
×
204
            },
205
        }
206
        // we want to at least sync initial_sync_num_peers, so we reset the num_synced to 0, so it can run till
207
        // initial_sync_num_peers again. This is made to run as a best effort in that it will at least run the
208
        // initial_sync_num_peers
209
        self.num_synched.store(0, Ordering::SeqCst);
×
210
        let connections = match self
×
211
            .connectivity
×
212
            .select_connections(ConnectivitySelection::random_nodes(
×
213
                self.config.initial_sync_num_peers,
×
214
                vec![],
×
215
            ))
216
            .await
×
217
        {
218
            Ok(v) => {
×
219
                if v.is_empty() {
×
220
                    error!(target: LOG_TARGET, "Mempool sync could not get any peers to sync to");
×
221
                    return;
×
222
                };
×
223
                v
×
224
            },
225
            Err(e) => {
×
226
                error!(
×
227
                    target: LOG_TARGET,
×
228
                    "Mempool sync could not get a peer to sync to: {e}"
×
229
                );
230
                return;
×
231
            },
232
        };
233
        for connection in connections {
×
234
            self.spawn_initiator_protocol(connection).await;
×
235
        }
236
    }
×
237

238
    fn is_synched(&self) -> bool {
4✔
239
        self.num_synched.load(Ordering::SeqCst) >= self.config.initial_sync_num_peers
4✔
240
    }
4✔
241

242
    fn handle_protocol_notification(&mut self, notification: ProtocolNotification<TSubstream>) {
2✔
243
        match notification.event {
2✔
244
            ProtocolEvent::NewInboundSubstream(node_id, substream) => {
2✔
245
                self.spawn_inbound_handler(node_id, substream);
2✔
246
            },
2✔
247
        }
248
    }
2✔
249

250
    async fn spawn_initiator_protocol(&mut self, mut conn: PeerConnection) {
4✔
251
        let mempool = self.mempool.clone();
4✔
252
        let permits = self.permits.clone();
4✔
253
        let num_synched = self.num_synched.clone();
4✔
254
        let config = self.config.clone();
4✔
255
        task::spawn(async move {
4✔
256
            // Only initiate this protocol with a single peer at a time
257
            let _permit = permits.acquire().await;
4✔
258
            if num_synched.load(Ordering::SeqCst) >= config.initial_sync_num_peers {
4✔
259
                return;
×
260
            }
4✔
261
            match conn.open_framed_substream(&MEMPOOL_SYNC_PROTOCOL, MAX_FRAME_SIZE).await {
4✔
262
                Ok(framed) => {
4✔
263
                    let protocol = MempoolPeerProtocol::new(config, framed, conn.peer_node_id().clone(), mempool);
4✔
264
                    match protocol.start_initiator().await {
4✔
265
                        Ok(_) => {
266
                            debug!(
3✔
267
                                target: LOG_TARGET,
×
268
                                "Mempool initiator protocol completed successfully for peer `{}`",
×
269
                                conn.peer_node_id().short_str(),
×
270
                            );
271
                            num_synched.fetch_add(1, Ordering::SeqCst);
3✔
272
                        },
273
                        Err(err) => {
×
274
                            debug!(
×
275
                                target: LOG_TARGET,
×
276
                                "Mempool initiator protocol failed for peer `{}`: {}",
×
277
                                conn.peer_node_id().short_str(),
×
278
                                err
279
                            );
280
                        },
281
                    }
282
                },
283
                Err(err) => error!(
×
284
                    target: LOG_TARGET,
×
285
                    "Unable to establish mempool protocol substream to peer `{}`: {}",
×
286
                    conn.peer_node_id().short_str(),
×
287
                    err
288
                ),
289
            }
290
        });
3✔
291
    }
4✔
292

293
    fn spawn_inbound_handler(&self, node_id: NodeId, substream: TSubstream) {
2✔
294
        let mempool = self.mempool.clone();
2✔
295
        let config = self.config.clone();
2✔
296
        task::spawn(async move {
2✔
297
            let framed = framing::canonical(substream, MAX_FRAME_SIZE);
2✔
298
            let mut protocol = MempoolPeerProtocol::new(config, framed, node_id.clone(), mempool);
2✔
299
            match protocol.start_responder().await {
2✔
300
                Ok(_) => {
301
                    debug!(
×
302
                        target: LOG_TARGET,
×
303
                        "Mempool responder protocol succeeded for peer `{}`",
×
304
                        node_id.short_str()
×
305
                    );
306
                },
307
                Err(err) => {
2✔
308
                    debug!(
2✔
309
                        target: LOG_TARGET,
×
310
                        "Mempool responder protocol failed for peer `{}`: {}",
×
311
                        node_id.short_str(),
×
312
                        err
313
                    );
314
                },
315
            }
316
        });
2✔
317
    }
2✔
318
}
319

320
struct MempoolPeerProtocol<TSubstream> {
321
    config: MempoolServiceConfig,
322
    framed: CanonicalFraming<TSubstream>,
323
    mempool: Mempool,
324
    peer_node_id: NodeId,
325
}
326

327
impl<TSubstream> MempoolPeerProtocol<TSubstream>
328
where TSubstream: AsyncRead + AsyncWrite + Unpin
329
{
330
    pub fn new(
10✔
331
        config: MempoolServiceConfig,
10✔
332
        framed: CanonicalFraming<TSubstream>,
10✔
333
        peer_node_id: NodeId,
10✔
334
        mempool: Mempool,
10✔
335
    ) -> Self {
10✔
336
        Self {
10✔
337
            config,
10✔
338
            framed,
10✔
339
            mempool,
10✔
340
            peer_node_id,
10✔
341
        }
10✔
342
    }
10✔
343

344
    pub async fn start_initiator(mut self) -> Result<(), MempoolProtocolError> {
5✔
345
        match self.start_initiator_inner().await {
5✔
346
            Ok(_) => {
347
                debug!(target: LOG_TARGET, "Initiator protocol complete");
3✔
348
                Ok(())
3✔
349
            },
350
            Err(err) => {
1✔
351
                if let Err(err) = self.framed.flush().await {
1✔
352
                    debug!(target: LOG_TARGET, "IO error when flushing stream: {err}");
1✔
353
                }
×
354
                if let Err(err) = self.framed.close().await {
1✔
355
                    debug!(target: LOG_TARGET, "IO error when closing stream: {err}");
1✔
356
                }
×
357
                Err(err)
1✔
358
            },
359
        }
360
    }
4✔
361

362
    async fn start_initiator_inner(&mut self) -> Result<(), MempoolProtocolError> {
5✔
363
        debug!(
5✔
364
            target: LOG_TARGET,
×
365
            "Starting initiator mempool sync for peer `{}`",
×
366
            self.peer_node_id.short_str()
×
367
        );
368

369
        let transactions = self.mempool.snapshot().await?;
5✔
370
        let items = transactions
5✔
371
            .iter()
5✔
372
            .take(self.config.initial_sync_max_transactions)
5✔
373
            .filter_map(|txn| txn.first_kernel_excess_sig())
10✔
374
            .map(|excess| excess.get_signature().to_vec())
10✔
375
            .collect();
5✔
376
        let inventory = proto::TransactionInventory { items };
5✔
377

378
        // Send an inventory of items currently in this node's mempool
379
        debug!(
5✔
380
            target: LOG_TARGET,
×
381
            "Sending transaction inventory containing {} item(s) to peer `{}`",
×
382
            inventory.items.len(),
×
383
            self.peer_node_id.short_str()
×
384
        );
385

386
        self.write_message(inventory).await?;
5✔
387

388
        self.read_and_insert_transactions_until_complete().await?;
4✔
389

390
        let missing_items: proto::InventoryIndexes = self.read_message().await?;
3✔
391
        debug!(
3✔
392
            target: LOG_TARGET,
×
393
            "Received {} missing transaction index(es) from peer `{}`",
×
394
            missing_items.indexes.len(),
×
395
            self.peer_node_id.short_str(),
×
396
        );
397
        let missing_txns = missing_items
3✔
398
            .indexes
3✔
399
            .iter()
3✔
400
            .filter_map(|idx| transactions.get(*idx as usize).cloned())
7✔
401
            .collect::<Vec<_>>();
3✔
402
        debug!(
3✔
403
            target: LOG_TARGET,
×
404
            "Sending {} missing transaction(s) to peer `{}`",
×
405
            missing_items.indexes.len(),
×
406
            self.peer_node_id.short_str(),
×
407
        );
408

409
        // If we don't have any transactions at the given indexes we still need to send back an empty if they requested
410
        // at least one index
411
        if !missing_items.indexes.is_empty() {
3✔
412
            self.write_transactions(missing_txns).await?;
3✔
UNCOV
413
        }
×
414

415
        // Close the stream after writing
416
        self.framed.close().await?;
3✔
417

418
        Ok(())
3✔
419
    }
4✔
420

421
    pub async fn start_responder(&mut self) -> Result<(), MempoolProtocolError> {
5✔
422
        match self.start_responder_inner().await {
5✔
423
            Ok(_) => {
424
                debug!(target: LOG_TARGET, "Responder protocol complete");
3✔
425
                Ok(())
3✔
426
            },
427
            Err(err) => {
2✔
428
                if let Err(err) = self.framed.flush().await {
2✔
429
                    debug!(target: LOG_TARGET, "IO error when flushing stream: {err}");
×
430
                }
2✔
431
                if let Err(err) = self.framed.close().await {
2✔
432
                    debug!(target: LOG_TARGET, "IO error when closing stream: {err}");
×
433
                }
2✔
434
                Err(err)
2✔
435
            },
436
        }
437
    }
5✔
438

439
    async fn start_responder_inner(&mut self) -> Result<(), MempoolProtocolError> {
5✔
440
        debug!(
5✔
441
            target: LOG_TARGET,
×
442
            "Starting responder mempool sync for peer `{}`",
×
443
            self.peer_node_id.short_str()
×
444
        );
445

446
        let inventory: proto::TransactionInventory = self.read_message().await?;
5✔
447

448
        debug!(
3✔
449
            target: LOG_TARGET,
×
450
            "Received inventory from peer `{}` containing {} item(s)",
×
451
            self.peer_node_id.short_str(),
×
452
            inventory.items.len()
×
453
        );
454

455
        let transactions = self.mempool.snapshot().await?;
3✔
456

457
        let mut duplicate_inventory_items = Vec::new();
3✔
458
        let (transactions, _) = transactions.into_iter().partition::<Vec<_>, _>(|transaction| {
5✔
459
            let excess_sig = transaction
5✔
460
                .first_kernel_excess_sig()
5✔
461
                .expect("transaction stored in mempool did not have any kernels");
5✔
462

463
            let has_item = inventory
5✔
464
                .items
5✔
465
                .iter()
5✔
466
                .position(|bytes| bytes.as_slice() == excess_sig.get_signature().as_bytes());
18✔
467

468
            match has_item {
5✔
469
                Some(pos) => {
1✔
470
                    duplicate_inventory_items.push(pos);
1✔
471
                    false
1✔
472
                },
473
                None => true,
4✔
474
            }
475
        });
5✔
476

477
        debug!(
3✔
478
            target: LOG_TARGET,
×
479
            "Streaming {} transaction(s) to peer `{}`",
×
480
            transactions.len(),
×
481
            self.peer_node_id.short_str()
×
482
        );
483

484
        self.write_transactions(transactions).await?;
3✔
485

486
        // Generate an index list of inventory indexes that this node does not have
487
        #[allow(clippy::cast_possible_truncation)]
488
        let missing_items = inventory
3✔
489
            .items
3✔
490
            .into_iter()
3✔
491
            .enumerate()
3✔
492
            .filter_map(|(i, _)| {
7✔
493
                if duplicate_inventory_items.contains(&i) {
7✔
494
                    None
1✔
495
                } else {
496
                    Some(i as u32)
6✔
497
                }
498
            })
7✔
499
            .collect::<Vec<_>>();
3✔
500
        debug!(
3✔
501
            target: LOG_TARGET,
×
502
            "Requesting {} missing transaction index(es) from peer `{}`",
×
503
            missing_items.len(),
×
504
            self.peer_node_id.short_str(),
×
505
        );
506

507
        let missing_items = proto::InventoryIndexes { indexes: missing_items };
3✔
508
        let num_missing_items = missing_items.indexes.len();
3✔
509
        self.write_message(missing_items).await?;
3✔
510

511
        if num_missing_items > 0 {
3✔
512
            debug!(target: LOG_TARGET, "Waiting for missing transactions");
2✔
513
            self.read_and_insert_transactions_until_complete().await?;
2✔
514
        }
1✔
515

516
        Ok(())
3✔
517
    }
5✔
518

519
    async fn read_and_insert_transactions_until_complete(&mut self) -> Result<(), MempoolProtocolError> {
6✔
520
        let mut num_recv = 0;
6✔
521
        while let Some(result) = self.framed.next().await {
16✔
522
            let bytes = result?;
15✔
523
            let item = proto::TransactionItem::decode(&mut bytes.freeze()).map_err(|err| {
15✔
524
                MempoolProtocolError::DecodeFailed {
×
525
                    source: err,
×
526
                    peer: self.peer_node_id.clone(),
×
527
                }
×
528
            })?;
×
529

530
            match item.transaction {
15✔
531
                Some(txn) => {
10✔
532
                    self.validate_and_insert_transaction(txn).await?;
10✔
533
                    num_recv += 1;
10✔
534
                },
535
                None => {
536
                    debug!(
5✔
537
                        target: LOG_TARGET,
×
538
                        "All transaction(s) (count={}) received from peer `{}`. ",
×
539
                        num_recv,
540
                        self.peer_node_id.short_str()
×
541
                    );
542
                    break;
5✔
543
                },
544
            }
545
        }
546

547
        #[allow(clippy::cast_possible_truncation)]
548
        #[allow(clippy::cast_possible_wrap)]
549
        #[cfg(feature = "metrics")]
550
        {
551
            let stats = self.mempool.stats().await?;
5✔
552
            metrics::unconfirmed_pool_size().set(stats.unconfirmed_txs as i64);
5✔
553
            metrics::reorg_pool_size().set(stats.reorg_txs as i64);
5✔
554
        }
555

556
        Ok(())
5✔
557
    }
5✔
558

559
    async fn validate_and_insert_transaction(
10✔
560
        &mut self,
10✔
561
        txn: shared_proto::types::Transaction,
10✔
562
    ) -> Result<(), MempoolProtocolError> {
10✔
563
        let txn = Transaction::try_from(txn).map_err(|err| MempoolProtocolError::MessageConversionFailed {
10✔
564
            peer: self.peer_node_id.clone(),
×
565
            message: err,
×
566
        })?;
×
567
        let excess_sig = txn
10✔
568
            .first_kernel_excess_sig()
10✔
569
            .ok_or_else(|| MempoolProtocolError::ExcessSignatureMissing(self.peer_node_id.clone()))?;
10✔
570
        let excess_sig_hex = excess_sig.get_signature().to_hex();
10✔
571

572
        debug!(
10✔
573
            target: LOG_TARGET,
×
574
            "Received transaction `{}` from peer `{}`",
×
575
            excess_sig_hex,
576
            self.peer_node_id.short_str()
×
577
        );
578
        let txn = Arc::new(txn);
10✔
579
        let store_state = self.mempool.has_transaction(txn.clone()).await?;
10✔
580
        if store_state.is_stored() {
10✔
581
            return Ok(());
×
582
        }
10✔
583

584
        let stored_result = self.mempool.insert(txn).await?;
10✔
585
        if stored_result.is_stored() {
10✔
586
            #[cfg(feature = "metrics")]
587
            metrics::inbound_transactions().inc();
10✔
588
            debug!(
10✔
589
                target: LOG_TARGET,
×
590
                "Inserted transaction `{}` from peer `{}`",
×
591
                excess_sig_hex,
592
                self.peer_node_id.short_str()
×
593
            );
594
        } else {
595
            #[cfg(feature = "metrics")]
596
            metrics::rejected_inbound_transactions().inc();
×
597
            debug!(
×
598
                target: LOG_TARGET,
×
599
                "Did not store new transaction `{excess_sig_hex}` in mempool: {stored_result}"
×
600
            )
601
        }
602

603
        Ok(())
10✔
604
    }
10✔
605

606
    async fn write_transactions(&mut self, transactions: Vec<Arc<Transaction>>) -> Result<(), MempoolProtocolError> {
6✔
607
        let txns = transactions.into_iter().take(self.config.initial_sync_max_transactions)
6✔
608
            .filter_map(|txn| {
11✔
609
                match shared_proto::types::Transaction::try_from(txn) {
11✔
610
                    Ok(txn) =>   Some(proto::TransactionItem {
11✔
611
                        transaction: Some(txn),
11✔
612
                    }),
11✔
613
                    Err(e) => {
×
614
                        warn!(target: LOG_TARGET, "Could not convert transaction: {e}");
×
615
                        None
×
616
                    }
617
                }
618
            })
11✔
619
            // Write an empty `TransactionItem` to indicate we're done
620
            .chain(iter::once(proto::TransactionItem::empty()));
6✔
621

622
        self.write_messages(stream::iter(txns)).await?;
6✔
623

624
        Ok(())
6✔
625
    }
6✔
626

627
    async fn read_message<T: prost::Message + Default>(&mut self) -> Result<T, MempoolProtocolError> {
8✔
628
        let msg = time::timeout(Duration::from_secs(10), self.framed.next())
8✔
629
            .await
8✔
630
            .map_err(|_| MempoolProtocolError::RecvTimeout)?
8✔
631
            .ok_or_else(|| MempoolProtocolError::SubstreamClosed(self.peer_node_id.clone()))??;
6✔
632

633
        T::decode(&mut msg.freeze()).map_err(|err| MempoolProtocolError::DecodeFailed {
6✔
634
            source: err,
×
635
            peer: self.peer_node_id.clone(),
×
636
        })
×
637
    }
8✔
638

639
    async fn write_messages<S, T>(&mut self, stream: S) -> Result<(), MempoolProtocolError>
6✔
640
    where
6✔
641
        S: Stream<Item = T> + Unpin,
6✔
642
        T: prost::Message,
6✔
643
    {
6✔
644
        let mut s = stream.map(|m| Bytes::from(m.to_encoded_bytes())).map(Ok);
17✔
645
        self.framed.send_all(&mut s).await?;
6✔
646
        Ok(())
6✔
647
    }
6✔
648

649
    async fn write_message<T: prost::Message>(&mut self, message: T) -> Result<(), MempoolProtocolError> {
8✔
650
        time::timeout(
8✔
651
            Duration::from_secs(10),
8✔
652
            self.framed.send(message.to_encoded_bytes().into()),
8✔
653
        )
8✔
654
        .await
8✔
655
        .map_err(|_| MempoolProtocolError::SendTimeout)??;
8✔
656
        Ok(())
7✔
657
    }
8✔
658
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc