• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tari-project / tari / 23541521288

25 Mar 2026 12:41PM UTC coverage: 61.304% (-0.1%) from 61.403%
23541521288

push

github

web-flow
chore: fix most cucumber tests (#7710)

Description
---
flags broken tests

37 of 289 new or added lines in 26 files covered. (12.8%)

25 existing lines in 10 files now uncovered.

70599 of 115162 relevant lines covered (61.3%)

225644.25 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.94
/base_layer/core/src/mempool/sync_protocol/mod.rs
1
//  Copyright 2020, The Tari Project
2
//
3
//  Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
4
//  following conditions are met:
5
//
6
//  1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
7
//  disclaimer.
8
//
9
//  2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
10
//  following disclaimer in the documentation and/or other materials provided with the distribution.
11
//
12
//  3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
13
//  products derived from this software without specific prior written permission.
14
//
15
//  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
16
//  INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17
//  DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
18
//  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
19
//  SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
20
//  WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
21
//  USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
22

23
//! # Mempool Sync Protocol
24
//!
25
//! The protocol handler for the mempool is responsible for the initial sync of transactions from peers.
26
//! In order to prevent duplicate transactions being received from multiple peers, syncing occurs one peer at a time.
27
//! This node will initiate this protocol up to a configurable (`MempoolSyncConfig::num_initial_sync_peers`) number
28
//! of times. After that, it will only respond to sync requests from remote peers.
29
//!
30
//! ## Protocol Flow
31
//!
32
//! Alice initiates (initiator) the connection to Bob (responder).
33
//! As the initiator, Alice MUST send a transaction inventory
34
//! Bob SHOULD respond with any transactions known to him, excluding the transactions in the inventory
35
//! Bob MUST send a complete message (An empty `TransactionItem` or 1 byte in protobuf)
36
//! Bob MUST send indexes of inventory items that are not known to him
37
//! Alice SHOULD return the Transactions relating to those indexes
38
//! Alice SHOULD close the stream immediately after sending
39
//!
40
//!
41
//! ```text
42
//!  +-------+                    +-----+
43
//!  | Alice |                    | Bob |
44
//!  +-------+                    +-----+
45
//!  |                                |
46
//!  | Txn Inventory                  |
47
//!  |------------------------------->|
48
//!  |                                |
49
//!  |      TransactionItem(tx_b1)    |
50
//!  |<-------------------------------|
51
//!  |             ...streaming...    |
52
//!  |      TransactionItem(empty)    |
53
//!  |<-------------------------------|
54
//!  |  Inventory missing txn indexes |
55
//!  |<-------------------------------|
56
//!  |                                |
57
//!  | TransactionItem(tx_a1)         |
58
//!  |------------------------------->|
59
//!  |             ...streaming...    |
60
//!  | TransactionItem(empty)         |
61
//!  |------------------------------->|
62
//!  |                                |
63
//!  |             END                |
64
//! ```
65

66
use std::{
67
    convert::TryFrom,
68
    iter,
69
    sync::{
70
        Arc,
71
        atomic::{AtomicUsize, Ordering},
72
    },
73
    time::Duration,
74
};
75

76
use error::MempoolProtocolError;
77
use futures::{SinkExt, Stream, StreamExt, stream};
78
pub use initializer::MempoolSyncInitializer;
79
use log::*;
80
use prost::Message;
81
use tari_comms::{
82
    Bytes,
83
    PeerConnection,
84
    connectivity::{ConnectivityEvent, ConnectivityRequester, ConnectivitySelection},
85
    framing,
86
    framing::CanonicalFraming,
87
    message::MessageExt,
88
    peer_manager::{NodeId, PeerFeatures},
89
    protocol::{ProtocolEvent, ProtocolNotification, ProtocolNotificationRx},
90
};
91
use tari_transaction_components::transaction_components::Transaction;
92
use tari_utilities::{ByteArray, hex::Hex};
93
use tokio::{
94
    io::{AsyncRead, AsyncWrite},
95
    sync::Semaphore,
96
    task,
97
    time,
98
};
99

100
#[cfg(feature = "metrics")]
101
use crate::mempool::metrics;
102
use crate::{
103
    base_node::comms_interface::{BlockEvent, BlockEventReceiver},
104
    chain_storage::BlockAddResult,
105
    mempool::{Mempool, MempoolServiceConfig, proto},
106
    proto as shared_proto,
107
};
108

109
#[cfg(test)]
110
mod test;
111

112
mod error;
113
mod initializer;
114

115
const MAX_FRAME_SIZE: usize = 3 * 1024 * 1024; // 3 MiB
116
const LOG_TARGET: &str = "c::mempool::sync_protocol";
117

118
pub static MEMPOOL_SYNC_PROTOCOL: Bytes = Bytes::from_static(b"t/mempool-sync/1");
119

120
pub struct MempoolSyncProtocol<TSubstream> {
121
    config: MempoolServiceConfig,
122
    protocol_notifier: ProtocolNotificationRx<TSubstream>,
123
    mempool: Mempool,
124
    num_synched: Arc<AtomicUsize>,
125
    permits: Arc<Semaphore>,
126
    connectivity: ConnectivityRequester,
127
    block_event_stream: BlockEventReceiver,
128
}
129

130
impl<TSubstream> MempoolSyncProtocol<TSubstream>
131
where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + Sync + 'static
132
{
133
    pub fn new(
6✔
134
        config: MempoolServiceConfig,
6✔
135
        protocol_notifier: ProtocolNotificationRx<TSubstream>,
6✔
136
        mempool: Mempool,
6✔
137
        connectivity: ConnectivityRequester,
6✔
138
        block_event_stream: BlockEventReceiver,
6✔
139
    ) -> Self {
6✔
140
        Self {
6✔
141
            config,
6✔
142
            protocol_notifier,
6✔
143
            mempool,
6✔
144
            num_synched: Arc::new(AtomicUsize::new(0)),
6✔
145
            permits: Arc::new(Semaphore::new(1)),
6✔
146
            connectivity,
6✔
147
            block_event_stream,
6✔
148
        }
6✔
149
    }
6✔
150

151
    pub async fn run(mut self) {
6✔
152
        info!(target: LOG_TARGET, "Mempool protocol handler has started");
6✔
153

154
        let mut connectivity_events = self.connectivity.get_event_subscription();
6✔
155

156
        // Trigger initial mempool sync with already-connected peers. When the mempool sync
157
        // protocol starts, the node has already completed chain sync, so PeerConnected and
158
        // BlockSyncComplete events have already been emitted and cannot be received by this
159
        // protocol's event loop. Proactively request existing connections here.
160
        if !self.is_synched() {
6✔
161
            match self
6✔
162
                .connectivity
6✔
163
                .select_connections(ConnectivitySelection::random_nodes(
6✔
164
                    self.config.initial_sync_num_peers,
6✔
165
                    vec![],
6✔
166
                ))
167
                .await
6✔
168
            {
169
                Ok(connections) => {
6✔
170
                    for connection in connections {
6✔
NEW
171
                        self.spawn_initiator_protocol(connection).await;
×
172
                    }
173
                },
NEW
174
                Err(e) => {
×
NEW
175
                    debug!(target: LOG_TARGET, "Mempool startup sync: could not get peers: {e}");
×
176
                },
177
            }
NEW
178
        }
×
179

180
        loop {
181
            tokio::select! {
12✔
182
                Ok(block_event) = self.block_event_stream.recv() => {
12✔
183
                    self.handle_block_event(&block_event).await;
×
184
                },
185
                Ok(event) = connectivity_events.recv() => {
12✔
186
                    self.handle_connectivity_event(event).await;
4✔
187
                },
188

189
                Some(notif) = self.protocol_notifier.recv() => {
12✔
190
                    self.handle_protocol_notification(notif);
2✔
191
                }
2✔
192
            }
193
        }
194
    }
195

196
    async fn handle_connectivity_event(&mut self, event: ConnectivityEvent) {
4✔
197
        match event {
4✔
198
            // If this node is connecting to a peer
199
            ConnectivityEvent::PeerConnected(conn) if conn.direction().is_outbound() => {
4✔
200
                // This protocol is only spoken between base nodes
201
                if !conn.peer_features().contains(PeerFeatures::COMMUNICATION_NODE) {
4✔
202
                    return;
×
203
                }
4✔
204

205
                if !self.is_synched() {
4✔
206
                    self.spawn_initiator_protocol(*conn.clone()).await;
4✔
207
                }
×
208
            },
209
            _ => {},
×
210
        }
211
    }
4✔
212

213
    async fn handle_block_event(&mut self, block_event: &BlockEvent) {
×
214
        use BlockEvent::{BlockSyncComplete, ValidBlockAdded};
215
        match block_event {
×
216
            ValidBlockAdded(_, BlockAddResult::ChainReorg { added, removed: _ }) => {
×
217
                if added.len() < self.config.block_sync_trigger {
×
218
                    return;
×
219
                }
×
220
            },
221
            BlockSyncComplete(tip, starting_sync_height) => {
×
222
                let added = tip.height() - starting_sync_height;
×
223
                if added < self.config.block_sync_trigger as u64 {
×
224
                    return;
×
225
                }
×
226
            },
227
            _ => {
228
                return;
×
229
            },
230
        }
231
        // we want to at least sync initial_sync_num_peers, so we reset the num_synced to 0, so it can run till
232
        // initial_sync_num_peers again. This is made to run as a best effort in that it will at least run the
233
        // initial_sync_num_peers
234
        self.num_synched.store(0, Ordering::SeqCst);
×
235
        let connections = match self
×
236
            .connectivity
×
237
            .select_connections(ConnectivitySelection::random_nodes(
×
238
                self.config.initial_sync_num_peers,
×
239
                vec![],
×
240
            ))
241
            .await
×
242
        {
243
            Ok(v) => {
×
244
                if v.is_empty() {
×
245
                    error!(target: LOG_TARGET, "Mempool sync could not get any peers to sync to");
×
246
                    return;
×
247
                };
×
248
                v
×
249
            },
250
            Err(e) => {
×
251
                error!(
×
252
                    target: LOG_TARGET,
×
253
                    "Mempool sync could not get a peer to sync to: {e}"
254
                );
255
                return;
×
256
            },
257
        };
258
        for connection in connections {
×
259
            self.spawn_initiator_protocol(connection).await;
×
260
        }
261
    }
×
262

263
    fn is_synched(&self) -> bool {
10✔
264
        self.num_synched.load(Ordering::SeqCst) >= self.config.initial_sync_num_peers
10✔
265
    }
10✔
266

267
    fn handle_protocol_notification(&mut self, notification: ProtocolNotification<TSubstream>) {
2✔
268
        match notification.event {
2✔
269
            ProtocolEvent::NewInboundSubstream(node_id, substream) => {
2✔
270
                self.spawn_inbound_handler(node_id, substream);
2✔
271
            },
2✔
272
        }
273
    }
2✔
274

275
    async fn spawn_initiator_protocol(&mut self, mut conn: PeerConnection) {
4✔
276
        let mempool = self.mempool.clone();
4✔
277
        let permits = self.permits.clone();
4✔
278
        let num_synched = self.num_synched.clone();
4✔
279
        let config = self.config.clone();
4✔
280
        task::spawn(async move {
4✔
281
            // Only initiate this protocol with a single peer at a time
282
            let _permit = permits.acquire().await;
4✔
283
            if num_synched.load(Ordering::SeqCst) >= config.initial_sync_num_peers {
4✔
284
                return;
×
285
            }
4✔
286
            match conn.open_framed_substream(&MEMPOOL_SYNC_PROTOCOL, MAX_FRAME_SIZE).await {
4✔
287
                Ok(framed) => {
4✔
288
                    let protocol = MempoolPeerProtocol::new(config, framed, conn.peer_node_id().clone(), mempool);
4✔
289
                    match protocol.start_initiator().await {
4✔
290
                        Ok(_) => {
291
                            debug!(
4✔
292
                                target: LOG_TARGET,
×
293
                                "Mempool initiator protocol completed successfully for peer `{}`",
294
                                conn.peer_node_id().short_str(),
×
295
                            );
296
                            num_synched.fetch_add(1, Ordering::SeqCst);
4✔
297
                        },
298
                        Err(err) => {
×
299
                            debug!(
×
300
                                target: LOG_TARGET,
×
301
                                "Mempool initiator protocol failed for peer `{}`: {}",
302
                                conn.peer_node_id().short_str(),
×
303
                                err
304
                            );
305
                        },
306
                    }
307
                },
308
                Err(err) => error!(
×
309
                    target: LOG_TARGET,
×
310
                    "Unable to establish mempool protocol substream to peer `{}`: {}",
311
                    conn.peer_node_id().short_str(),
×
312
                    err
313
                ),
314
            }
315
        });
4✔
316
    }
4✔
317

318
    fn spawn_inbound_handler(&self, node_id: NodeId, substream: TSubstream) {
2✔
319
        let mempool = self.mempool.clone();
2✔
320
        let config = self.config.clone();
2✔
321
        task::spawn(async move {
2✔
322
            let framed = framing::canonical(substream, MAX_FRAME_SIZE);
2✔
323
            let mut protocol = MempoolPeerProtocol::new(config, framed, node_id.clone(), mempool);
2✔
324
            match protocol.start_responder().await {
2✔
325
                Ok(_) => {
326
                    debug!(
×
327
                        target: LOG_TARGET,
×
328
                        "Mempool responder protocol succeeded for peer `{}`",
329
                        node_id.short_str()
×
330
                    );
331
                },
332
                Err(err) => {
×
333
                    debug!(
×
334
                        target: LOG_TARGET,
×
335
                        "Mempool responder protocol failed for peer `{}`: {}",
336
                        node_id.short_str(),
×
337
                        err
338
                    );
339
                },
340
            }
341
        });
×
342
    }
2✔
343
}
344

345
struct MempoolPeerProtocol<TSubstream> {
346
    config: MempoolServiceConfig,
347
    framed: CanonicalFraming<TSubstream>,
348
    mempool: Mempool,
349
    peer_node_id: NodeId,
350
}
351

352
impl<TSubstream> MempoolPeerProtocol<TSubstream>
353
where TSubstream: AsyncRead + AsyncWrite + Unpin
354
{
355
    pub fn new(
10✔
356
        config: MempoolServiceConfig,
10✔
357
        framed: CanonicalFraming<TSubstream>,
10✔
358
        peer_node_id: NodeId,
10✔
359
        mempool: Mempool,
10✔
360
    ) -> Self {
10✔
361
        Self {
10✔
362
            config,
10✔
363
            framed,
10✔
364
            mempool,
10✔
365
            peer_node_id,
10✔
366
        }
10✔
367
    }
10✔
368

369
    pub async fn start_initiator(mut self) -> Result<(), MempoolProtocolError> {
5✔
370
        match self.start_initiator_inner().await {
5✔
371
            Ok(_) => {
372
                debug!(target: LOG_TARGET, "Initiator protocol complete");
5✔
373
                Ok(())
5✔
374
            },
375
            Err(err) => {
×
376
                if let Err(err) = self.framed.flush().await {
×
377
                    debug!(target: LOG_TARGET, "IO error when flushing stream: {err}");
×
378
                }
×
379
                if let Err(err) = self.framed.close().await {
×
380
                    debug!(target: LOG_TARGET, "IO error when closing stream: {err}");
×
381
                }
×
382
                Err(err)
×
383
            },
384
        }
385
    }
5✔
386

387
    async fn start_initiator_inner(&mut self) -> Result<(), MempoolProtocolError> {
5✔
388
        debug!(
5✔
389
            target: LOG_TARGET,
×
390
            "Starting initiator mempool sync for peer `{}`",
391
            self.peer_node_id.short_str()
×
392
        );
393

394
        let transactions = self.mempool.snapshot().await?;
5✔
395
        let items = transactions
5✔
396
            .iter()
5✔
397
            .take(self.config.initial_sync_max_transactions)
5✔
398
            .filter_map(|txn| txn.first_kernel_excess_sig())
10✔
399
            .map(|excess| excess.get_signature().to_vec())
10✔
400
            .collect();
5✔
401
        let inventory = proto::TransactionInventory { items };
5✔
402

403
        // Send an inventory of items currently in this node's mempool
404
        debug!(
5✔
405
            target: LOG_TARGET,
×
406
            "Sending transaction inventory containing {} item(s) to peer `{}`",
407
            inventory.items.len(),
×
408
            self.peer_node_id.short_str()
×
409
        );
410

411
        self.write_message(inventory).await?;
5✔
412

413
        self.read_and_insert_transactions_until_complete().await?;
5✔
414

415
        let missing_items: proto::InventoryIndexes = self.read_message().await?;
5✔
416
        debug!(
5✔
417
            target: LOG_TARGET,
×
418
            "Received {} missing transaction index(es) from peer `{}`",
419
            missing_items.indexes.len(),
×
420
            self.peer_node_id.short_str(),
×
421
        );
422
        let missing_txns = missing_items
5✔
423
            .indexes
5✔
424
            .iter()
5✔
425
            .filter_map(|idx| transactions.get(*idx as usize).cloned())
8✔
426
            .collect::<Vec<_>>();
5✔
427
        debug!(
5✔
428
            target: LOG_TARGET,
×
429
            "Sending {} missing transaction(s) to peer `{}`",
430
            missing_items.indexes.len(),
×
431
            self.peer_node_id.short_str(),
×
432
        );
433

434
        // If we don't have any transactions at the given indexes we still need to send back an empty if they requested
435
        // at least one index
436
        if !missing_items.indexes.is_empty() {
5✔
437
            self.write_transactions(missing_txns).await?;
4✔
438
        }
1✔
439

440
        // Close the stream after writing
441
        self.framed.close().await?;
5✔
442

443
        Ok(())
5✔
444
    }
5✔
445

446
    pub async fn start_responder(&mut self) -> Result<(), MempoolProtocolError> {
5✔
447
        match self.start_responder_inner().await {
5✔
448
            Ok(_) => {
449
                debug!(target: LOG_TARGET, "Responder protocol complete");
3✔
450
                Ok(())
3✔
451
            },
452
            Err(err) => {
×
453
                if let Err(err) = self.framed.flush().await {
×
454
                    debug!(target: LOG_TARGET, "IO error when flushing stream: {err}");
×
455
                }
×
456
                if let Err(err) = self.framed.close().await {
×
457
                    debug!(target: LOG_TARGET, "IO error when closing stream: {err}");
×
458
                }
×
459
                Err(err)
×
460
            },
461
        }
462
    }
3✔
463

464
    async fn start_responder_inner(&mut self) -> Result<(), MempoolProtocolError> {
5✔
465
        debug!(
5✔
466
            target: LOG_TARGET,
×
467
            "Starting responder mempool sync for peer `{}`",
468
            self.peer_node_id.short_str()
×
469
        );
470

471
        let inventory: proto::TransactionInventory = self.read_message().await?;
5✔
472

473
        debug!(
5✔
474
            target: LOG_TARGET,
×
475
            "Received inventory from peer `{}` containing {} item(s)",
476
            self.peer_node_id.short_str(),
×
477
            inventory.items.len()
×
478
        );
479

480
        let transactions = self.mempool.snapshot().await?;
5✔
481

482
        let mut duplicate_inventory_items = Vec::new();
5✔
483
        let (transactions, _) = transactions.into_iter().partition::<Vec<_>, _>(|transaction| {
9✔
484
            let excess_sig = transaction
9✔
485
                .first_kernel_excess_sig()
9✔
486
                .expect("transaction stored in mempool did not have any kernels");
9✔
487

488
            let has_item = inventory
9✔
489
                .items
9✔
490
                .iter()
9✔
491
                .position(|bytes| bytes.as_slice() == excess_sig.get_signature().as_bytes());
29✔
492

493
            match has_item {
9✔
494
                Some(pos) => {
3✔
495
                    duplicate_inventory_items.push(pos);
3✔
496
                    false
3✔
497
                },
498
                None => true,
6✔
499
            }
500
        });
9✔
501

502
        debug!(
5✔
503
            target: LOG_TARGET,
×
504
            "Streaming {} transaction(s) to peer `{}`",
505
            transactions.len(),
×
506
            self.peer_node_id.short_str()
×
507
        );
508

509
        self.write_transactions(transactions).await?;
5✔
510

511
        // Generate an index list of inventory indexes that this node does not have
512
        #[allow(clippy::cast_possible_truncation)]
513
        let missing_items = inventory
5✔
514
            .items
5✔
515
            .into_iter()
5✔
516
            .enumerate()
5✔
517
            .filter_map(|(i, _)| {
12✔
518
                if duplicate_inventory_items.contains(&i) {
12✔
519
                    None
3✔
520
                } else {
521
                    Some(i as u32)
9✔
522
                }
523
            })
12✔
524
            .collect::<Vec<_>>();
5✔
525
        debug!(
5✔
526
            target: LOG_TARGET,
×
527
            "Requesting {} missing transaction index(es) from peer `{}`",
528
            missing_items.len(),
×
529
            self.peer_node_id.short_str(),
×
530
        );
531

532
        let missing_items = proto::InventoryIndexes { indexes: missing_items };
5✔
533
        let num_missing_items = missing_items.indexes.len();
5✔
534
        self.write_message(missing_items).await?;
5✔
535

536
        if num_missing_items > 0 {
5✔
537
            debug!(target: LOG_TARGET, "Waiting for missing transactions");
4✔
538
            self.read_and_insert_transactions_until_complete().await?;
4✔
539
        }
1✔
540

541
        Ok(())
3✔
542
    }
3✔
543

544
    async fn read_and_insert_transactions_until_complete(&mut self) -> Result<(), MempoolProtocolError> {
9✔
545
        let mut num_recv = 0;
9✔
546
        while let Some(result) = self.framed.next().await {
20✔
547
            let bytes = result?;
19✔
548
            let item = proto::TransactionItem::decode(&mut bytes.freeze()).map_err(|err| {
19✔
549
                MempoolProtocolError::DecodeFailed {
×
550
                    source: err,
×
551
                    peer: self.peer_node_id.clone(),
×
552
                }
×
553
            })?;
×
554

555
            match item.transaction {
19✔
556
                Some(txn) => {
12✔
557
                    self.validate_and_insert_transaction(txn).await?;
12✔
558
                    num_recv += 1;
11✔
559
                },
560
                None => {
561
                    debug!(
7✔
562
                        target: LOG_TARGET,
×
563
                        "All transaction(s) (count={}) received from peer `{}`. ",
564
                        num_recv,
565
                        self.peer_node_id.short_str()
×
566
                    );
567
                    break;
7✔
568
                },
569
            }
570
        }
571

572
        #[allow(clippy::cast_possible_truncation)]
573
        #[allow(clippy::cast_possible_wrap)]
574
        #[cfg(feature = "metrics")]
575
        {
576
            let stats = self.mempool.stats().await?;
7✔
577
            metrics::unconfirmed_pool_size().set(stats.unconfirmed_txs as i64);
7✔
578
            metrics::reorg_pool_size().set(stats.reorg_txs as i64);
7✔
579
        }
580

581
        Ok(())
7✔
582
    }
7✔
583

584
    async fn validate_and_insert_transaction(
12✔
585
        &mut self,
12✔
586
        txn: shared_proto::types::Transaction,
12✔
587
    ) -> Result<(), MempoolProtocolError> {
12✔
588
        let txn = Transaction::try_from(txn).map_err(|err| MempoolProtocolError::MessageConversionFailed {
12✔
589
            peer: self.peer_node_id.clone(),
×
590
            message: err,
×
591
        })?;
×
592
        let excess_sig = txn
12✔
593
            .first_kernel_excess_sig()
12✔
594
            .ok_or_else(|| MempoolProtocolError::ExcessSignatureMissing(self.peer_node_id.clone()))?;
12✔
595
        let excess_sig_hex = excess_sig.get_signature().to_hex();
12✔
596

597
        debug!(
12✔
598
            target: LOG_TARGET,
×
599
            "Received transaction `{}` from peer `{}`",
600
            excess_sig_hex,
601
            self.peer_node_id.short_str()
×
602
        );
603
        let txn = Arc::new(txn);
12✔
604
        let store_state = self.mempool.has_transaction(txn.clone()).await?;
12✔
605
        if store_state.is_stored() {
11✔
606
            return Ok(());
×
607
        }
11✔
608

609
        let stored_result = self.mempool.insert(txn).await?;
11✔
610
        if stored_result.is_stored() {
11✔
611
            #[cfg(feature = "metrics")]
612
            metrics::inbound_transactions().inc();
11✔
613
            debug!(
11✔
614
                target: LOG_TARGET,
×
615
                "Inserted transaction `{}` from peer `{}`",
616
                excess_sig_hex,
617
                self.peer_node_id.short_str()
×
618
            );
619
        } else {
620
            #[cfg(feature = "metrics")]
621
            metrics::rejected_inbound_transactions().inc();
×
622
            debug!(
×
623
                target: LOG_TARGET,
×
624
                "Did not store new transaction `{excess_sig_hex}` in mempool: {stored_result}"
625
            )
626
        }
627

628
        Ok(())
11✔
629
    }
11✔
630

631
    async fn write_transactions(&mut self, transactions: Vec<Arc<Transaction>>) -> Result<(), MempoolProtocolError> {
9✔
632
        let txns = transactions.into_iter().take(self.config.initial_sync_max_transactions)
9✔
633
            .filter_map(|txn| {
14✔
634
                match shared_proto::types::Transaction::try_from(txn) {
14✔
635
                    Ok(txn) =>   Some(proto::TransactionItem {
14✔
636
                        transaction: Some(txn),
14✔
637
                    }),
14✔
638
                    Err(e) => {
×
639
                        warn!(target: LOG_TARGET, "Could not convert transaction: {e}");
×
640
                        None
×
641
                    }
642
                }
643
            })
14✔
644
            // Write an empty `TransactionItem` to indicate we're done
645
            .chain(iter::once(proto::TransactionItem::empty()));
9✔
646

647
        self.write_messages(stream::iter(txns)).await?;
9✔
648

649
        Ok(())
9✔
650
    }
9✔
651

652
    async fn read_message<T: prost::Message + Default>(&mut self) -> Result<T, MempoolProtocolError> {
10✔
653
        let msg = time::timeout(Duration::from_secs(10), self.framed.next())
10✔
654
            .await
10✔
655
            .map_err(|_| MempoolProtocolError::RecvTimeout)?
10✔
656
            .ok_or_else(|| MempoolProtocolError::SubstreamClosed(self.peer_node_id.clone()))??;
10✔
657

658
        T::decode(&mut msg.freeze()).map_err(|err| MempoolProtocolError::DecodeFailed {
10✔
659
            source: err,
×
660
            peer: self.peer_node_id.clone(),
×
661
        })
×
662
    }
10✔
663

664
    async fn write_messages<S, T>(&mut self, stream: S) -> Result<(), MempoolProtocolError>
9✔
665
    where
9✔
666
        S: Stream<Item = T> + Unpin,
9✔
667
        T: prost::Message,
9✔
668
    {
9✔
669
        let mut s = stream.map(|m| Bytes::from(m.to_encoded_bytes())).map(Ok);
23✔
670
        self.framed.send_all(&mut s).await?;
9✔
671
        Ok(())
9✔
672
    }
9✔
673

674
    async fn write_message<T: prost::Message>(&mut self, message: T) -> Result<(), MempoolProtocolError> {
10✔
675
        time::timeout(
10✔
676
            Duration::from_secs(10),
10✔
677
            self.framed.send(message.to_encoded_bytes().into()),
10✔
678
        )
10✔
679
        .await
10✔
680
        .map_err(|_| MempoolProtocolError::SendTimeout)??;
10✔
681
        Ok(())
10✔
682
    }
10✔
683
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc