• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tari-project / tari / 20989403475

14 Jan 2026 09:38AM UTC coverage: 60.476%. Remained the same
20989403475

push

github

SWvheerden
chore: new release v5.2.1-pre.0

70122 of 115951 relevant lines covered (60.48%)

226461.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.32
/base_layer/core/src/mempool/sync_protocol/mod.rs
1
//  Copyright 2020, The Tari Project
2
//
3
//  Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
4
//  following conditions are met:
5
//
6
//  1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
7
//  disclaimer.
8
//
9
//  2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
10
//  following disclaimer in the documentation and/or other materials provided with the distribution.
11
//
12
//  3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
13
//  products derived from this software without specific prior written permission.
14
//
15
//  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
16
//  INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17
//  DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
18
//  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
19
//  SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
20
//  WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
21
//  USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
22

23
//! # Mempool Sync Protocol
24
//!
25
//! The protocol handler for the mempool is responsible for the initial sync of transactions from peers.
26
//! In order to prevent duplicate transactions being received from multiple peers, syncing occurs one peer at a time.
27
//! This node will initiate this protocol up to a configurable (`MempoolSyncConfig::num_initial_sync_peers`) number
28
//! of times. After that, it will only respond to sync requests from remote peers.
29
//!
30
//! ## Protocol Flow
31
//!
32
//! Alice initiates (initiator) the connection to Bob (responder).
33
//! As the initiator, Alice MUST send a transaction inventory
34
//! Bob SHOULD respond with any transactions known to him, excluding the transactions in the inventory
35
//! Bob MUST send a complete message (An empty `TransactionItem` or 1 byte in protobuf)
36
//! Bob MUST send indexes of inventory items that are not known to him
37
//! Alice SHOULD return the Transactions relating to those indexes
38
//! Alice SHOULD close the stream immediately after sending
39
//!
40
//!
41
//! ```text
42
//!  +-------+                    +-----+
43
//!  | Alice |                    | Bob |
44
//!  +-------+                    +-----+
45
//!  |                                |
46
//!  | Txn Inventory                  |
47
//!  |------------------------------->|
48
//!  |                                |
49
//!  |      TransactionItem(tx_b1)    |
50
//!  |<-------------------------------|
51
//!  |             ...streaming...    |
52
//!  |      TransactionItem(empty)    |
53
//!  |<-------------------------------|
54
//!  |  Inventory missing txn indexes |
55
//!  |<-------------------------------|
56
//!  |                                |
57
//!  | TransactionItem(tx_a1)         |
58
//!  |------------------------------->|
59
//!  |             ...streaming...    |
60
//!  | TransactionItem(empty)         |
61
//!  |------------------------------->|
62
//!  |                                |
63
//!  |             END                |
64
//! ```
65

66
use std::{
67
    convert::TryFrom,
68
    iter,
69
    sync::{
70
        atomic::{AtomicUsize, Ordering},
71
        Arc,
72
    },
73
    time::Duration,
74
};
75

76
use error::MempoolProtocolError;
77
use futures::{stream, SinkExt, Stream, StreamExt};
78
pub use initializer::MempoolSyncInitializer;
79
use log::*;
80
use prost::Message;
81
use tari_comms::{
82
    connectivity::{ConnectivityEvent, ConnectivityRequester, ConnectivitySelection},
83
    framing,
84
    framing::CanonicalFraming,
85
    message::MessageExt,
86
    peer_manager::{NodeId, PeerFeatures},
87
    protocol::{ProtocolEvent, ProtocolNotification, ProtocolNotificationRx},
88
    Bytes,
89
    PeerConnection,
90
};
91
use tari_transaction_components::transaction_components::Transaction;
92
use tari_utilities::{hex::Hex, ByteArray};
93
use tokio::{
94
    io::{AsyncRead, AsyncWrite},
95
    sync::Semaphore,
96
    task,
97
    time,
98
};
99

100
#[cfg(feature = "metrics")]
101
use crate::mempool::metrics;
102
use crate::{
103
    base_node::comms_interface::{BlockEvent, BlockEventReceiver},
104
    chain_storage::BlockAddResult,
105
    mempool::{proto, Mempool, MempoolServiceConfig},
106
    proto as shared_proto,
107
};
108

109
#[cfg(test)]
110
mod test;
111

112
mod error;
113
mod initializer;
114

115
const MAX_FRAME_SIZE: usize = 3 * 1024 * 1024; // 3 MiB
116
const LOG_TARGET: &str = "c::mempool::sync_protocol";
117

118
pub static MEMPOOL_SYNC_PROTOCOL: Bytes = Bytes::from_static(b"t/mempool-sync/1");
119

120
pub struct MempoolSyncProtocol<TSubstream> {
121
    config: MempoolServiceConfig,
122
    protocol_notifier: ProtocolNotificationRx<TSubstream>,
123
    mempool: Mempool,
124
    num_synched: Arc<AtomicUsize>,
125
    permits: Arc<Semaphore>,
126
    connectivity: ConnectivityRequester,
127
    block_event_stream: BlockEventReceiver,
128
}
129

130
impl<TSubstream> MempoolSyncProtocol<TSubstream>
131
where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + Sync + 'static
132
{
133
    pub fn new(
6✔
134
        config: MempoolServiceConfig,
6✔
135
        protocol_notifier: ProtocolNotificationRx<TSubstream>,
6✔
136
        mempool: Mempool,
6✔
137
        connectivity: ConnectivityRequester,
6✔
138
        block_event_stream: BlockEventReceiver,
6✔
139
    ) -> Self {
6✔
140
        Self {
6✔
141
            config,
6✔
142
            protocol_notifier,
6✔
143
            mempool,
6✔
144
            num_synched: Arc::new(AtomicUsize::new(0)),
6✔
145
            permits: Arc::new(Semaphore::new(1)),
6✔
146
            connectivity,
6✔
147
            block_event_stream,
6✔
148
        }
6✔
149
    }
6✔
150

151
    pub async fn run(mut self) {
6✔
152
        info!(target: LOG_TARGET, "Mempool protocol handler has started");
6✔
153

154
        let mut connectivity_events = self.connectivity.get_event_subscription();
6✔
155
        loop {
156
            tokio::select! {
12✔
157
                Ok(block_event) = self.block_event_stream.recv() => {
12✔
158
                    self.handle_block_event(&block_event).await;
×
159
                },
160
                Ok(event) = connectivity_events.recv() => {
12✔
161
                    self.handle_connectivity_event(event).await;
4✔
162
                },
163

164
                Some(notif) = self.protocol_notifier.recv() => {
12✔
165
                    self.handle_protocol_notification(notif);
2✔
166
                }
2✔
167
            }
168
        }
169
    }
170

171
    async fn handle_connectivity_event(&mut self, event: ConnectivityEvent) {
4✔
172
        match event {
4✔
173
            // If this node is connecting to a peer
174
            ConnectivityEvent::PeerConnected(conn) if conn.direction().is_outbound() => {
4✔
175
                // This protocol is only spoken between base nodes
176
                if !conn.peer_features().contains(PeerFeatures::COMMUNICATION_NODE) {
4✔
177
                    return;
×
178
                }
4✔
179

180
                if !self.is_synched() {
4✔
181
                    self.spawn_initiator_protocol(*conn.clone()).await;
4✔
182
                }
×
183
            },
184
            _ => {},
×
185
        }
186
    }
4✔
187

188
    async fn handle_block_event(&mut self, block_event: &BlockEvent) {
×
189
        use BlockEvent::{BlockSyncComplete, ValidBlockAdded};
190
        match block_event {
×
191
            ValidBlockAdded(_, BlockAddResult::ChainReorg { added, removed: _ }) => {
×
192
                if added.len() < self.config.block_sync_trigger {
×
193
                    return;
×
194
                }
×
195
            },
196
            BlockSyncComplete(tip, starting_sync_height) => {
×
197
                let added = tip.height() - starting_sync_height;
×
198
                if added < self.config.block_sync_trigger as u64 {
×
199
                    return;
×
200
                }
×
201
            },
202
            _ => {
203
                return;
×
204
            },
205
        }
206
        // we want to at least sync initial_sync_num_peers, so we reset the num_synced to 0, so it can run till
207
        // initial_sync_num_peers again. This is made to run as a best effort in that it will at least run the
208
        // initial_sync_num_peers
209
        self.num_synched.store(0, Ordering::SeqCst);
×
210
        let connections = match self
×
211
            .connectivity
×
212
            .select_connections(ConnectivitySelection::random_nodes(
×
213
                self.config.initial_sync_num_peers,
×
214
                vec![],
×
215
            ))
216
            .await
×
217
        {
218
            Ok(v) => {
×
219
                if v.is_empty() {
×
220
                    error!(target: LOG_TARGET, "Mempool sync could not get any peers to sync to");
×
221
                    return;
×
222
                };
×
223
                v
×
224
            },
225
            Err(e) => {
×
226
                error!(
×
227
                    target: LOG_TARGET,
×
228
                    "Mempool sync could not get a peer to sync to: {e}"
×
229
                );
230
                return;
×
231
            },
232
        };
233
        for connection in connections {
×
234
            self.spawn_initiator_protocol(connection).await;
×
235
        }
236
    }
×
237

238
    fn is_synched(&self) -> bool {
4✔
239
        self.num_synched.load(Ordering::SeqCst) >= self.config.initial_sync_num_peers
4✔
240
    }
4✔
241

242
    fn handle_protocol_notification(&mut self, notification: ProtocolNotification<TSubstream>) {
2✔
243
        match notification.event {
2✔
244
            ProtocolEvent::NewInboundSubstream(node_id, substream) => {
2✔
245
                self.spawn_inbound_handler(node_id, substream);
2✔
246
            },
2✔
247
        }
248
    }
2✔
249

250
    async fn spawn_initiator_protocol(&mut self, mut conn: PeerConnection) {
4✔
251
        let mempool = self.mempool.clone();
4✔
252
        let permits = self.permits.clone();
4✔
253
        let num_synched = self.num_synched.clone();
4✔
254
        let config = self.config.clone();
4✔
255
        task::spawn(async move {
4✔
256
            // Only initiate this protocol with a single peer at a time
257
            let _permit = permits.acquire().await;
4✔
258
            if num_synched.load(Ordering::SeqCst) >= config.initial_sync_num_peers {
4✔
259
                return;
×
260
            }
4✔
261
            match conn.open_framed_substream(&MEMPOOL_SYNC_PROTOCOL, MAX_FRAME_SIZE).await {
4✔
262
                Ok(framed) => {
4✔
263
                    let protocol = MempoolPeerProtocol::new(config, framed, conn.peer_node_id().clone(), mempool);
4✔
264
                    match protocol.start_initiator().await {
4✔
265
                        Ok(_) => {
266
                            debug!(
3✔
267
                                target: LOG_TARGET,
×
268
                                "Mempool initiator protocol completed successfully for peer `{}`",
×
269
                                conn.peer_node_id().short_str(),
×
270
                            );
271
                            num_synched.fetch_add(1, Ordering::SeqCst);
3✔
272
                        },
273
                        Err(err) => {
×
274
                            debug!(
×
275
                                target: LOG_TARGET,
×
276
                                "Mempool initiator protocol failed for peer `{}`: {}",
×
277
                                conn.peer_node_id().short_str(),
×
278
                                err
279
                            );
280
                        },
281
                    }
282
                },
283
                Err(err) => error!(
×
284
                    target: LOG_TARGET,
×
285
                    "Unable to establish mempool protocol substream to peer `{}`: {}",
×
286
                    conn.peer_node_id().short_str(),
×
287
                    err
288
                ),
289
            }
290
        });
3✔
291
    }
4✔
292

293
    fn spawn_inbound_handler(&self, node_id: NodeId, substream: TSubstream) {
2✔
294
        let mempool = self.mempool.clone();
2✔
295
        let config = self.config.clone();
2✔
296
        task::spawn(async move {
2✔
297
            let framed = framing::canonical(substream, MAX_FRAME_SIZE);
2✔
298
            let mut protocol = MempoolPeerProtocol::new(config, framed, node_id.clone(), mempool);
2✔
299
            match protocol.start_responder().await {
2✔
300
                Ok(_) => {
301
                    debug!(
×
302
                        target: LOG_TARGET,
×
303
                        "Mempool responder protocol succeeded for peer `{}`",
×
304
                        node_id.short_str()
×
305
                    );
306
                },
307
                Err(err) => {
×
308
                    debug!(
×
309
                        target: LOG_TARGET,
×
310
                        "Mempool responder protocol failed for peer `{}`: {}",
×
311
                        node_id.short_str(),
×
312
                        err
313
                    );
314
                },
315
            }
316
        });
×
317
    }
2✔
318
}
319

320
struct MempoolPeerProtocol<TSubstream> {
321
    config: MempoolServiceConfig,
322
    framed: CanonicalFraming<TSubstream>,
323
    mempool: Mempool,
324
    peer_node_id: NodeId,
325
}
326

327
impl<TSubstream> MempoolPeerProtocol<TSubstream>
328
where TSubstream: AsyncRead + AsyncWrite + Unpin
329
{
330
    pub fn new(
10✔
331
        config: MempoolServiceConfig,
10✔
332
        framed: CanonicalFraming<TSubstream>,
10✔
333
        peer_node_id: NodeId,
10✔
334
        mempool: Mempool,
10✔
335
    ) -> Self {
10✔
336
        Self {
10✔
337
            config,
10✔
338
            framed,
10✔
339
            mempool,
10✔
340
            peer_node_id,
10✔
341
        }
10✔
342
    }
10✔
343

344
    pub async fn start_initiator(mut self) -> Result<(), MempoolProtocolError> {
5✔
345
        match self.start_initiator_inner().await {
5✔
346
            Ok(_) => {
347
                debug!(target: LOG_TARGET, "Initiator protocol complete");
4✔
348
                Ok(())
4✔
349
            },
350
            Err(err) => {
×
351
                if let Err(err) = self.framed.flush().await {
×
352
                    debug!(target: LOG_TARGET, "IO error when flushing stream: {err}");
×
353
                }
×
354
                if let Err(err) = self.framed.close().await {
×
355
                    debug!(target: LOG_TARGET, "IO error when closing stream: {err}");
×
356
                }
×
357
                Err(err)
×
358
            },
359
        }
360
    }
4✔
361

362
    async fn start_initiator_inner(&mut self) -> Result<(), MempoolProtocolError> {
5✔
363
        debug!(
5✔
364
            target: LOG_TARGET,
×
365
            "Starting initiator mempool sync for peer `{}`",
×
366
            self.peer_node_id.short_str()
×
367
        );
368

369
        let transactions = self.mempool.snapshot().await?;
5✔
370
        let items = transactions
5✔
371
            .iter()
5✔
372
            .take(self.config.initial_sync_max_transactions)
5✔
373
            .filter_map(|txn| txn.first_kernel_excess_sig())
10✔
374
            .map(|excess| excess.get_signature().to_vec())
10✔
375
            .collect();
5✔
376
        let inventory = proto::TransactionInventory { items };
5✔
377

378
        // Send an inventory of items currently in this node's mempool
379
        debug!(
5✔
380
            target: LOG_TARGET,
×
381
            "Sending transaction inventory containing {} item(s) to peer `{}`",
×
382
            inventory.items.len(),
×
383
            self.peer_node_id.short_str()
×
384
        );
385

386
        self.write_message(inventory).await?;
5✔
387

388
        self.read_and_insert_transactions_until_complete().await?;
5✔
389

390
        let missing_items: proto::InventoryIndexes = self.read_message().await?;
4✔
391
        debug!(
4✔
392
            target: LOG_TARGET,
×
393
            "Received {} missing transaction index(es) from peer `{}`",
×
394
            missing_items.indexes.len(),
×
395
            self.peer_node_id.short_str(),
×
396
        );
397
        let missing_txns = missing_items
4✔
398
            .indexes
4✔
399
            .iter()
4✔
400
            .filter_map(|idx| transactions.get(*idx as usize).cloned())
8✔
401
            .collect::<Vec<_>>();
4✔
402
        debug!(
4✔
403
            target: LOG_TARGET,
×
404
            "Sending {} missing transaction(s) to peer `{}`",
×
405
            missing_items.indexes.len(),
×
406
            self.peer_node_id.short_str(),
×
407
        );
408

409
        // If we don't have any transactions at the given indexes we still need to send back an empty if they requested
410
        // at least one index
411
        if !missing_items.indexes.is_empty() {
4✔
412
            self.write_transactions(missing_txns).await?;
4✔
413
        }
×
414

415
        // Close the stream after writing
416
        self.framed.close().await?;
4✔
417

418
        Ok(())
4✔
419
    }
4✔
420

421
    pub async fn start_responder(&mut self) -> Result<(), MempoolProtocolError> {
5✔
422
        match self.start_responder_inner().await {
5✔
423
            Ok(_) => {
424
                debug!(target: LOG_TARGET, "Responder protocol complete");
3✔
425
                Ok(())
3✔
426
            },
427
            Err(err) => {
×
428
                if let Err(err) = self.framed.flush().await {
×
429
                    debug!(target: LOG_TARGET, "IO error when flushing stream: {err}");
×
430
                }
×
431
                if let Err(err) = self.framed.close().await {
×
432
                    debug!(target: LOG_TARGET, "IO error when closing stream: {err}");
×
433
                }
×
434
                Err(err)
×
435
            },
436
        }
437
    }
3✔
438

439
    async fn start_responder_inner(&mut self) -> Result<(), MempoolProtocolError> {
5✔
440
        debug!(
5✔
441
            target: LOG_TARGET,
×
442
            "Starting responder mempool sync for peer `{}`",
×
443
            self.peer_node_id.short_str()
×
444
        );
445

446
        let inventory: proto::TransactionInventory = self.read_message().await?;
5✔
447

448
        debug!(
5✔
449
            target: LOG_TARGET,
×
450
            "Received inventory from peer `{}` containing {} item(s)",
×
451
            self.peer_node_id.short_str(),
×
452
            inventory.items.len()
×
453
        );
454

455
        let transactions = self.mempool.snapshot().await?;
5✔
456

457
        let mut duplicate_inventory_items = Vec::new();
5✔
458
        let (transactions, _) = transactions.into_iter().partition::<Vec<_>, _>(|transaction| {
9✔
459
            let excess_sig = transaction
9✔
460
                .first_kernel_excess_sig()
9✔
461
                .expect("transaction stored in mempool did not have any kernels");
9✔
462

463
            let has_item = inventory
9✔
464
                .items
9✔
465
                .iter()
9✔
466
                .position(|bytes| bytes.as_slice() == excess_sig.get_signature().as_bytes());
28✔
467

468
            match has_item {
9✔
469
                Some(pos) => {
3✔
470
                    duplicate_inventory_items.push(pos);
3✔
471
                    false
3✔
472
                },
473
                None => true,
6✔
474
            }
475
        });
9✔
476

477
        debug!(
5✔
478
            target: LOG_TARGET,
×
479
            "Streaming {} transaction(s) to peer `{}`",
×
480
            transactions.len(),
×
481
            self.peer_node_id.short_str()
×
482
        );
483

484
        self.write_transactions(transactions).await?;
5✔
485

486
        // Generate an index list of inventory indexes that this node does not have
487
        #[allow(clippy::cast_possible_truncation)]
488
        let missing_items = inventory
5✔
489
            .items
5✔
490
            .into_iter()
5✔
491
            .enumerate()
5✔
492
            .filter_map(|(i, _)| {
12✔
493
                if duplicate_inventory_items.contains(&i) {
12✔
494
                    None
3✔
495
                } else {
496
                    Some(i as u32)
9✔
497
                }
498
            })
12✔
499
            .collect::<Vec<_>>();
5✔
500
        debug!(
5✔
501
            target: LOG_TARGET,
×
502
            "Requesting {} missing transaction index(es) from peer `{}`",
×
503
            missing_items.len(),
×
504
            self.peer_node_id.short_str(),
×
505
        );
506

507
        let missing_items = proto::InventoryIndexes { indexes: missing_items };
5✔
508
        let num_missing_items = missing_items.indexes.len();
5✔
509
        self.write_message(missing_items).await?;
5✔
510

511
        if num_missing_items > 0 {
5✔
512
            debug!(target: LOG_TARGET, "Waiting for missing transactions");
4✔
513
            self.read_and_insert_transactions_until_complete().await?;
4✔
514
        }
1✔
515

516
        Ok(())
3✔
517
    }
3✔
518

519
    async fn read_and_insert_transactions_until_complete(&mut self) -> Result<(), MempoolProtocolError> {
9✔
520
        let mut num_recv = 0;
9✔
521
        while let Some(result) = self.framed.next().await {
20✔
522
            let bytes = result?;
18✔
523
            let item = proto::TransactionItem::decode(&mut bytes.freeze()).map_err(|err| {
18✔
524
                MempoolProtocolError::DecodeFailed {
×
525
                    source: err,
×
526
                    peer: self.peer_node_id.clone(),
×
527
                }
×
528
            })?;
×
529

530
            match item.transaction {
18✔
531
                Some(txn) => {
12✔
532
                    self.validate_and_insert_transaction(txn).await?;
12✔
533
                    num_recv += 1;
11✔
534
                },
535
                None => {
536
                    debug!(
6✔
537
                        target: LOG_TARGET,
×
538
                        "All transaction(s) (count={}) received from peer `{}`. ",
×
539
                        num_recv,
540
                        self.peer_node_id.short_str()
×
541
                    );
542
                    break;
6✔
543
                },
544
            }
545
        }
546

547
        #[allow(clippy::cast_possible_truncation)]
548
        #[allow(clippy::cast_possible_wrap)]
549
        #[cfg(feature = "metrics")]
550
        {
551
            let stats = self.mempool.stats().await?;
6✔
552
            metrics::unconfirmed_pool_size().set(stats.unconfirmed_txs as i64);
6✔
553
            metrics::reorg_pool_size().set(stats.reorg_txs as i64);
6✔
554
        }
555

556
        Ok(())
6✔
557
    }
6✔
558

559
    async fn validate_and_insert_transaction(
12✔
560
        &mut self,
12✔
561
        txn: shared_proto::types::Transaction,
12✔
562
    ) -> Result<(), MempoolProtocolError> {
12✔
563
        let txn = Transaction::try_from(txn).map_err(|err| MempoolProtocolError::MessageConversionFailed {
12✔
564
            peer: self.peer_node_id.clone(),
×
565
            message: err,
×
566
        })?;
×
567
        let excess_sig = txn
12✔
568
            .first_kernel_excess_sig()
12✔
569
            .ok_or_else(|| MempoolProtocolError::ExcessSignatureMissing(self.peer_node_id.clone()))?;
12✔
570
        let excess_sig_hex = excess_sig.get_signature().to_hex();
12✔
571

572
        debug!(
12✔
573
            target: LOG_TARGET,
×
574
            "Received transaction `{}` from peer `{}`",
×
575
            excess_sig_hex,
576
            self.peer_node_id.short_str()
×
577
        );
578
        let txn = Arc::new(txn);
12✔
579
        let store_state = self.mempool.has_transaction(txn.clone()).await?;
12✔
580
        if store_state.is_stored() {
11✔
581
            return Ok(());
×
582
        }
11✔
583

584
        let stored_result = self.mempool.insert(txn).await?;
11✔
585
        if stored_result.is_stored() {
11✔
586
            #[cfg(feature = "metrics")]
587
            metrics::inbound_transactions().inc();
11✔
588
            debug!(
11✔
589
                target: LOG_TARGET,
×
590
                "Inserted transaction `{}` from peer `{}`",
×
591
                excess_sig_hex,
592
                self.peer_node_id.short_str()
×
593
            );
594
        } else {
595
            #[cfg(feature = "metrics")]
596
            metrics::rejected_inbound_transactions().inc();
×
597
            debug!(
×
598
                target: LOG_TARGET,
×
599
                "Did not store new transaction `{excess_sig_hex}` in mempool: {stored_result}"
×
600
            )
601
        }
602

603
        Ok(())
11✔
604
    }
11✔
605

606
    async fn write_transactions(&mut self, transactions: Vec<Arc<Transaction>>) -> Result<(), MempoolProtocolError> {
9✔
607
        let txns = transactions.into_iter().take(self.config.initial_sync_max_transactions)
9✔
608
            .filter_map(|txn| {
14✔
609
                match shared_proto::types::Transaction::try_from(txn) {
14✔
610
                    Ok(txn) =>   Some(proto::TransactionItem {
14✔
611
                        transaction: Some(txn),
14✔
612
                    }),
14✔
613
                    Err(e) => {
×
614
                        warn!(target: LOG_TARGET, "Could not convert transaction: {e}");
×
615
                        None
×
616
                    }
617
                }
618
            })
14✔
619
            // Write an empty `TransactionItem` to indicate we're done
620
            .chain(iter::once(proto::TransactionItem::empty()));
9✔
621

622
        self.write_messages(stream::iter(txns)).await?;
9✔
623

624
        Ok(())
9✔
625
    }
9✔
626

627
    async fn read_message<T: prost::Message + Default>(&mut self) -> Result<T, MempoolProtocolError> {
9✔
628
        let msg = time::timeout(Duration::from_secs(10), self.framed.next())
9✔
629
            .await
9✔
630
            .map_err(|_| MempoolProtocolError::RecvTimeout)?
9✔
631
            .ok_or_else(|| MempoolProtocolError::SubstreamClosed(self.peer_node_id.clone()))??;
9✔
632

633
        T::decode(&mut msg.freeze()).map_err(|err| MempoolProtocolError::DecodeFailed {
9✔
634
            source: err,
×
635
            peer: self.peer_node_id.clone(),
×
636
        })
×
637
    }
9✔
638

639
    async fn write_messages<S, T>(&mut self, stream: S) -> Result<(), MempoolProtocolError>
9✔
640
    where
9✔
641
        S: Stream<Item = T> + Unpin,
9✔
642
        T: prost::Message,
9✔
643
    {
9✔
644
        let mut s = stream.map(|m| Bytes::from(m.to_encoded_bytes())).map(Ok);
23✔
645
        self.framed.send_all(&mut s).await?;
9✔
646
        Ok(())
9✔
647
    }
9✔
648

649
    async fn write_message<T: prost::Message>(&mut self, message: T) -> Result<(), MempoolProtocolError> {
10✔
650
        time::timeout(
10✔
651
            Duration::from_secs(10),
10✔
652
            self.framed.send(message.to_encoded_bytes().into()),
10✔
653
        )
10✔
654
        .await
10✔
655
        .map_err(|_| MempoolProtocolError::SendTimeout)??;
10✔
656
        Ok(())
10✔
657
    }
10✔
658
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc