• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tari-project / tari / 19468834672

18 Nov 2025 02:01PM UTC coverage: 51.294% (-0.3%) from 51.544%
19468834672

push

github

web-flow
feat: add coin selection and spending via bins or buckets (#7584)

Description
---

Add range limit coin-join:
- Added an unspent output coin distribution gRPC method
('CoinHistogramRequest'), whereby the wallet will return the amount and
value of coins in a pre-set range of buckets.
- Added a range limit coin-join gRPC method ('RangeLimitCoinJoin') to
the wallet, whereby the user can specify the minimum target amount,
maximum number of inputs, dust lower bound (inclusive), dust upper bound
(exclusive) and fee. Transaction size will be limited to the specified
maximum number of inputs, and multiple outputs will be created according
to the minimum target amount. All the inputs in the range will be spent,
unless the total available amount does not meet the minimum target
amount.

Closes #7582.

Motivation and Context
---
See #7582.

How Has This Been Tested?
---
System-level testing

gRPC **CoinHistogram** method
```
{
  "buckets": [
    {
      "count": "0",
      "total_amount": "0",
      "lower_bound": "0",
      "upper_bound": "1000"
    },
    {
      "count": "0",
      "total_amount": "0",
      "lower_bound": "1000",
      "upper_bound": "100000"
    },
    {
      "count": "2",
      "total_amount": "1165548",
      "lower_bound": "100000",
      "upper_bound": "1000000"
    },
    {
      "count": "158",
      "total_amount": "1455989209",
      "lower_bound": "1000000",
      "upper_bound": "1000000000"
    },
    {
      "count": "0",
      "total_amount": "0",
      "lower_bound": "1000000000",
      "upper_bound": "100000000000"
    },
    {
      "count": "0",
      "total_amount": "0",
      "lower_bound": "100000000000",
      "upper_bound": "21000000000000000"
    }
  ]
}
```

gRPC **RangeLimitCoinJoin** method

In this example, two transactions were created, bounded by the 350 input
size limit. gRPC client view:

<img width="897" height="396" alt="image"
src="https://github.com/user-attachments/assets/6c5ae857-8a01-4c90-9c55-1eee2fbd... (continued)

0 of 636 new or added lines in 11 files covered. (0.0%)

17 existing lines in 8 files now uncovered.

59180 of 115373 relevant lines covered (51.29%)

7948.46 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.32
/base_layer/core/src/mempool/sync_protocol/mod.rs
1
//  Copyright 2020, The Tari Project
2
//
3
//  Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
4
//  following conditions are met:
5
//
6
//  1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
7
//  disclaimer.
8
//
9
//  2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
10
//  following disclaimer in the documentation and/or other materials provided with the distribution.
11
//
12
//  3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
13
//  products derived from this software without specific prior written permission.
14
//
15
//  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
16
//  INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17
//  DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
18
//  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
19
//  SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
20
//  WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
21
//  USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
22

23
//! # Mempool Sync Protocol
24
//!
25
//! The protocol handler for the mempool is responsible for the initial sync of transactions from peers.
26
//! In order to prevent duplicate transactions being received from multiple peers, syncing occurs one peer at a time.
27
//! This node will initiate this protocol up to a configurable (`MempoolSyncConfig::num_initial_sync_peers`) number
28
//! of times. After that, it will only respond to sync requests from remote peers.
29
//!
30
//! ## Protocol Flow
31
//!
32
//! Alice initiates (initiator) the connection to Bob (responder).
33
//! As the initiator, Alice MUST send a transaction inventory
34
//! Bob SHOULD respond with any transactions known to him, excluding the transactions in the inventory
35
//! Bob MUST send a complete message (An empty `TransactionItem` or 1 byte in protobuf)
36
//! Bob MUST send indexes of inventory items that are not known to him
37
//! Alice SHOULD return the Transactions relating to those indexes
38
//! Alice SHOULD close the stream immediately after sending
39
//!
40
//!
41
//! ```text
42
//!  +-------+                    +-----+
43
//!  | Alice |                    | Bob |
44
//!  +-------+                    +-----+
45
//!  |                                |
46
//!  | Txn Inventory                  |
47
//!  |------------------------------->|
48
//!  |                                |
49
//!  |      TransactionItem(tx_b1)    |
50
//!  |<-------------------------------|
51
//!  |             ...streaming...    |
52
//!  |      TransactionItem(empty)    |
53
//!  |<-------------------------------|
54
//!  |  Inventory missing txn indexes |
55
//!  |<-------------------------------|
56
//!  |                                |
57
//!  | TransactionItem(tx_a1)         |
58
//!  |------------------------------->|
59
//!  |             ...streaming...    |
60
//!  | TransactionItem(empty)         |
61
//!  |------------------------------->|
62
//!  |                                |
63
//!  |             END                |
64
//! ```
65

66
use std::{
67
    convert::TryFrom,
68
    iter,
69
    sync::{
70
        atomic::{AtomicUsize, Ordering},
71
        Arc,
72
    },
73
    time::Duration,
74
};
75

76
use error::MempoolProtocolError;
77
use futures::{stream, SinkExt, Stream, StreamExt};
78
pub use initializer::MempoolSyncInitializer;
79
use log::*;
80
use prost::Message;
81
use tari_comms::{
82
    connectivity::{ConnectivityEvent, ConnectivityRequester, ConnectivitySelection},
83
    framing,
84
    framing::CanonicalFraming,
85
    message::MessageExt,
86
    peer_manager::{NodeId, PeerFeatures},
87
    protocol::{ProtocolEvent, ProtocolNotification, ProtocolNotificationRx},
88
    Bytes,
89
    PeerConnection,
90
};
91
use tari_transaction_components::transaction_components::Transaction;
92
use tari_utilities::{hex::Hex, ByteArray};
93
use tokio::{
94
    io::{AsyncRead, AsyncWrite},
95
    sync::Semaphore,
96
    task,
97
    time,
98
};
99

100
#[cfg(feature = "metrics")]
101
use crate::mempool::metrics;
102
use crate::{
103
    base_node::comms_interface::{BlockEvent, BlockEventReceiver},
104
    chain_storage::BlockAddResult,
105
    mempool::{proto, Mempool, MempoolServiceConfig},
106
    proto as shared_proto,
107
};
108

109
#[cfg(test)]
110
mod test;
111

112
mod error;
113
mod initializer;
114

115
const MAX_FRAME_SIZE: usize = 3 * 1024 * 1024; // 3 MiB
116
const LOG_TARGET: &str = "c::mempool::sync_protocol";
117

118
pub static MEMPOOL_SYNC_PROTOCOL: Bytes = Bytes::from_static(b"t/mempool-sync/1");
119

120
pub struct MempoolSyncProtocol<TSubstream> {
121
    config: MempoolServiceConfig,
122
    protocol_notifier: ProtocolNotificationRx<TSubstream>,
123
    mempool: Mempool,
124
    num_synched: Arc<AtomicUsize>,
125
    permits: Arc<Semaphore>,
126
    connectivity: ConnectivityRequester,
127
    block_event_stream: BlockEventReceiver,
128
}
129

130
impl<TSubstream> MempoolSyncProtocol<TSubstream>
131
where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + Sync + 'static
132
{
133
    pub fn new(
6✔
134
        config: MempoolServiceConfig,
6✔
135
        protocol_notifier: ProtocolNotificationRx<TSubstream>,
6✔
136
        mempool: Mempool,
6✔
137
        connectivity: ConnectivityRequester,
6✔
138
        block_event_stream: BlockEventReceiver,
6✔
139
    ) -> Self {
6✔
140
        Self {
6✔
141
            config,
6✔
142
            protocol_notifier,
6✔
143
            mempool,
6✔
144
            num_synched: Arc::new(AtomicUsize::new(0)),
6✔
145
            permits: Arc::new(Semaphore::new(1)),
6✔
146
            connectivity,
6✔
147
            block_event_stream,
6✔
148
        }
6✔
149
    }
6✔
150

151
    pub async fn run(mut self) {
6✔
152
        info!(target: LOG_TARGET, "Mempool protocol handler has started");
6✔
153

154
        let mut connectivity_events = self.connectivity.get_event_subscription();
6✔
155
        loop {
156
            tokio::select! {
12✔
157
                Ok(block_event) = self.block_event_stream.recv() => {
12✔
158
                    self.handle_block_event(&block_event).await;
×
159
                },
160
                Ok(event) = connectivity_events.recv() => {
12✔
161
                    self.handle_connectivity_event(event).await;
4✔
162
                },
163

164
                Some(notif) = self.protocol_notifier.recv() => {
12✔
165
                    self.handle_protocol_notification(notif);
2✔
166
                }
2✔
167
            }
168
        }
169
    }
170

171
    async fn handle_connectivity_event(&mut self, event: ConnectivityEvent) {
4✔
172
        match event {
4✔
173
            // If this node is connecting to a peer
174
            ConnectivityEvent::PeerConnected(conn) if conn.direction().is_outbound() => {
4✔
175
                // This protocol is only spoken between base nodes
176
                if !conn.peer_features().contains(PeerFeatures::COMMUNICATION_NODE) {
4✔
177
                    return;
×
178
                }
4✔
179

180
                if !self.is_synched() {
4✔
181
                    self.spawn_initiator_protocol(*conn.clone()).await;
4✔
182
                }
×
183
            },
184
            _ => {},
×
185
        }
186
    }
4✔
187

188
    async fn handle_block_event(&mut self, block_event: &BlockEvent) {
×
189
        use BlockEvent::{BlockSyncComplete, ValidBlockAdded};
190
        match block_event {
×
191
            ValidBlockAdded(_, BlockAddResult::ChainReorg { added, removed: _ }) => {
×
192
                if added.len() < self.config.block_sync_trigger {
×
193
                    return;
×
194
                }
×
195
            },
196
            BlockSyncComplete(tip, starting_sync_height) => {
×
197
                let added = tip.height() - starting_sync_height;
×
198
                if added < self.config.block_sync_trigger as u64 {
×
199
                    return;
×
200
                }
×
201
            },
202
            _ => {
203
                return;
×
204
            },
205
        }
206
        // we want to at least sync initial_sync_num_peers, so we reset the num_synced to 0, so it can run till
207
        // initial_sync_num_peers again. This is made to run as a best effort in that it will at least run the
208
        // initial_sync_num_peers
209
        self.num_synched.store(0, Ordering::SeqCst);
×
210
        let connections = match self
×
211
            .connectivity
×
212
            .select_connections(ConnectivitySelection::random_nodes(
×
213
                self.config.initial_sync_num_peers,
×
214
                vec![],
×
215
            ))
216
            .await
×
217
        {
218
            Ok(v) => {
×
219
                if v.is_empty() {
×
220
                    error!(target: LOG_TARGET, "Mempool sync could not get any peers to sync to");
×
221
                    return;
×
222
                };
×
223
                v
×
224
            },
225
            Err(e) => {
×
226
                error!(
×
227
                    target: LOG_TARGET,
×
228
                    "Mempool sync could not get a peer to sync to: {e}"
×
229
                );
230
                return;
×
231
            },
232
        };
233
        for connection in connections {
×
234
            self.spawn_initiator_protocol(connection).await;
×
235
        }
236
    }
×
237

238
    fn is_synched(&self) -> bool {
4✔
239
        self.num_synched.load(Ordering::SeqCst) >= self.config.initial_sync_num_peers
4✔
240
    }
4✔
241

242
    fn handle_protocol_notification(&mut self, notification: ProtocolNotification<TSubstream>) {
2✔
243
        match notification.event {
2✔
244
            ProtocolEvent::NewInboundSubstream(node_id, substream) => {
2✔
245
                self.spawn_inbound_handler(node_id, substream);
2✔
246
            },
2✔
247
        }
248
    }
2✔
249

250
    async fn spawn_initiator_protocol(&mut self, mut conn: PeerConnection) {
4✔
251
        let mempool = self.mempool.clone();
4✔
252
        let permits = self.permits.clone();
4✔
253
        let num_synched = self.num_synched.clone();
4✔
254
        let config = self.config.clone();
4✔
255
        task::spawn(async move {
4✔
256
            // Only initiate this protocol with a single peer at a time
257
            let _permit = permits.acquire().await;
4✔
258
            if num_synched.load(Ordering::SeqCst) >= config.initial_sync_num_peers {
4✔
259
                return;
×
260
            }
4✔
261
            match conn.open_framed_substream(&MEMPOOL_SYNC_PROTOCOL, MAX_FRAME_SIZE).await {
4✔
262
                Ok(framed) => {
4✔
263
                    let protocol = MempoolPeerProtocol::new(config, framed, conn.peer_node_id().clone(), mempool);
4✔
264
                    match protocol.start_initiator().await {
4✔
265
                        Ok(_) => {
266
                            debug!(
3✔
267
                                target: LOG_TARGET,
×
268
                                "Mempool initiator protocol completed successfully for peer `{}`",
×
269
                                conn.peer_node_id().short_str(),
×
270
                            );
271
                            num_synched.fetch_add(1, Ordering::SeqCst);
3✔
272
                        },
273
                        Err(err) => {
×
274
                            debug!(
×
275
                                target: LOG_TARGET,
×
276
                                "Mempool initiator protocol failed for peer `{}`: {}",
×
277
                                conn.peer_node_id().short_str(),
×
278
                                err
279
                            );
280
                        },
281
                    }
282
                },
283
                Err(err) => error!(
×
284
                    target: LOG_TARGET,
×
285
                    "Unable to establish mempool protocol substream to peer `{}`: {}",
×
286
                    conn.peer_node_id().short_str(),
×
287
                    err
288
                ),
289
            }
290
        });
3✔
291
    }
4✔
292

293
    fn spawn_inbound_handler(&self, node_id: NodeId, substream: TSubstream) {
2✔
294
        let mempool = self.mempool.clone();
2✔
295
        let config = self.config.clone();
2✔
296
        task::spawn(async move {
2✔
297
            let framed = framing::canonical(substream, MAX_FRAME_SIZE);
2✔
298
            let mut protocol = MempoolPeerProtocol::new(config, framed, node_id.clone(), mempool);
2✔
299
            match protocol.start_responder().await {
2✔
300
                Ok(_) => {
301
                    debug!(
×
302
                        target: LOG_TARGET,
×
303
                        "Mempool responder protocol succeeded for peer `{}`",
×
304
                        node_id.short_str()
×
305
                    );
306
                },
307
                Err(err) => {
×
308
                    debug!(
×
309
                        target: LOG_TARGET,
×
310
                        "Mempool responder protocol failed for peer `{}`: {}",
×
311
                        node_id.short_str(),
×
312
                        err
313
                    );
314
                },
315
            }
316
        });
×
317
    }
2✔
318
}
319

320
struct MempoolPeerProtocol<TSubstream> {
321
    config: MempoolServiceConfig,
322
    framed: CanonicalFraming<TSubstream>,
323
    mempool: Mempool,
324
    peer_node_id: NodeId,
325
}
326

327
impl<TSubstream> MempoolPeerProtocol<TSubstream>
328
where TSubstream: AsyncRead + AsyncWrite + Unpin
329
{
330
    pub fn new(
10✔
331
        config: MempoolServiceConfig,
10✔
332
        framed: CanonicalFraming<TSubstream>,
10✔
333
        peer_node_id: NodeId,
10✔
334
        mempool: Mempool,
10✔
335
    ) -> Self {
10✔
336
        Self {
10✔
337
            config,
10✔
338
            framed,
10✔
339
            mempool,
10✔
340
            peer_node_id,
10✔
341
        }
10✔
342
    }
10✔
343

344
    pub async fn start_initiator(mut self) -> Result<(), MempoolProtocolError> {
5✔
345
        match self.start_initiator_inner().await {
5✔
346
            Ok(_) => {
347
                debug!(target: LOG_TARGET, "Initiator protocol complete");
4✔
348
                Ok(())
4✔
349
            },
350
            Err(err) => {
×
351
                if let Err(err) = self.framed.flush().await {
×
352
                    debug!(target: LOG_TARGET, "IO error when flushing stream: {err}");
×
353
                }
×
354
                if let Err(err) = self.framed.close().await {
×
355
                    debug!(target: LOG_TARGET, "IO error when closing stream: {err}");
×
356
                }
×
357
                Err(err)
×
358
            },
359
        }
360
    }
4✔
361

362
    async fn start_initiator_inner(&mut self) -> Result<(), MempoolProtocolError> {
5✔
363
        debug!(
5✔
364
            target: LOG_TARGET,
×
365
            "Starting initiator mempool sync for peer `{}`",
×
366
            self.peer_node_id.short_str()
×
367
        );
368

369
        let transactions = self.mempool.snapshot().await?;
5✔
370
        let items = transactions
5✔
371
            .iter()
5✔
372
            .take(self.config.initial_sync_max_transactions)
5✔
373
            .filter_map(|txn| txn.first_kernel_excess_sig())
10✔
374
            .map(|excess| excess.get_signature().to_vec())
10✔
375
            .collect();
5✔
376
        let inventory = proto::TransactionInventory { items };
5✔
377

378
        // Send an inventory of items currently in this node's mempool
379
        debug!(
5✔
380
            target: LOG_TARGET,
×
381
            "Sending transaction inventory containing {} item(s) to peer `{}`",
×
382
            inventory.items.len(),
×
383
            self.peer_node_id.short_str()
×
384
        );
385

386
        self.write_message(inventory).await?;
5✔
387

388
        self.read_and_insert_transactions_until_complete().await?;
5✔
389

390
        let missing_items: proto::InventoryIndexes = self.read_message().await?;
4✔
391
        debug!(
4✔
392
            target: LOG_TARGET,
×
393
            "Received {} missing transaction index(es) from peer `{}`",
×
394
            missing_items.indexes.len(),
×
395
            self.peer_node_id.short_str(),
×
396
        );
397
        let missing_txns = missing_items
4✔
398
            .indexes
4✔
399
            .iter()
4✔
400
            .filter_map(|idx| transactions.get(*idx as usize).cloned())
8✔
401
            .collect::<Vec<_>>();
4✔
402
        debug!(
4✔
403
            target: LOG_TARGET,
×
404
            "Sending {} missing transaction(s) to peer `{}`",
×
405
            missing_items.indexes.len(),
×
406
            self.peer_node_id.short_str(),
×
407
        );
408

409
        // If we don't have any transactions at the given indexes we still need to send back an empty if they requested
410
        // at least one index
411
        if !missing_items.indexes.is_empty() {
4✔
412
            self.write_transactions(missing_txns).await?;
4✔
UNCOV
413
        }
×
414

415
        // Close the stream after writing
416
        self.framed.close().await?;
4✔
417

418
        Ok(())
4✔
419
    }
4✔
420

421
    pub async fn start_responder(&mut self) -> Result<(), MempoolProtocolError> {
5✔
422
        match self.start_responder_inner().await {
5✔
423
            Ok(_) => {
424
                debug!(target: LOG_TARGET, "Responder protocol complete");
3✔
425
                Ok(())
3✔
426
            },
427
            Err(err) => {
×
428
                if let Err(err) = self.framed.flush().await {
×
429
                    debug!(target: LOG_TARGET, "IO error when flushing stream: {err}");
×
430
                }
×
431
                if let Err(err) = self.framed.close().await {
×
432
                    debug!(target: LOG_TARGET, "IO error when closing stream: {err}");
×
433
                }
×
434
                Err(err)
×
435
            },
436
        }
437
    }
3✔
438

439
    async fn start_responder_inner(&mut self) -> Result<(), MempoolProtocolError> {
5✔
440
        debug!(
5✔
441
            target: LOG_TARGET,
×
442
            "Starting responder mempool sync for peer `{}`",
×
443
            self.peer_node_id.short_str()
×
444
        );
445

446
        let inventory: proto::TransactionInventory = self.read_message().await?;
5✔
447

448
        debug!(
5✔
449
            target: LOG_TARGET,
×
450
            "Received inventory from peer `{}` containing {} item(s)",
×
451
            self.peer_node_id.short_str(),
×
452
            inventory.items.len()
×
453
        );
454

455
        let transactions = self.mempool.snapshot().await?;
5✔
456

457
        let mut duplicate_inventory_items = Vec::new();
5✔
458
        let (transactions, _) = transactions.into_iter().partition::<Vec<_>, _>(|transaction| {
9✔
459
            let excess_sig = transaction
9✔
460
                .first_kernel_excess_sig()
9✔
461
                .expect("transaction stored in mempool did not have any kernels");
9✔
462

463
            let has_item = inventory
9✔
464
                .items
9✔
465
                .iter()
9✔
466
                .position(|bytes| bytes.as_slice() == excess_sig.get_signature().as_bytes());
28✔
467

468
            match has_item {
9✔
469
                Some(pos) => {
3✔
470
                    duplicate_inventory_items.push(pos);
3✔
471
                    false
3✔
472
                },
473
                None => true,
6✔
474
            }
475
        });
9✔
476

477
        debug!(
5✔
478
            target: LOG_TARGET,
×
479
            "Streaming {} transaction(s) to peer `{}`",
×
480
            transactions.len(),
×
481
            self.peer_node_id.short_str()
×
482
        );
483

484
        self.write_transactions(transactions).await?;
5✔
485

486
        // Generate an index list of inventory indexes that this node does not have
487
        #[allow(clippy::cast_possible_truncation)]
488
        let missing_items = inventory
5✔
489
            .items
5✔
490
            .into_iter()
5✔
491
            .enumerate()
5✔
492
            .filter_map(|(i, _)| {
12✔
493
                if duplicate_inventory_items.contains(&i) {
12✔
494
                    None
3✔
495
                } else {
496
                    Some(i as u32)
9✔
497
                }
498
            })
12✔
499
            .collect::<Vec<_>>();
5✔
500
        debug!(
5✔
501
            target: LOG_TARGET,
×
502
            "Requesting {} missing transaction index(es) from peer `{}`",
×
503
            missing_items.len(),
×
504
            self.peer_node_id.short_str(),
×
505
        );
506

507
        let missing_items = proto::InventoryIndexes { indexes: missing_items };
5✔
508
        let num_missing_items = missing_items.indexes.len();
5✔
509
        self.write_message(missing_items).await?;
5✔
510

511
        if num_missing_items > 0 {
5✔
512
            debug!(target: LOG_TARGET, "Waiting for missing transactions");
4✔
513
            self.read_and_insert_transactions_until_complete().await?;
4✔
514
        }
1✔
515

516
        Ok(())
3✔
517
    }
3✔
518

519
    async fn read_and_insert_transactions_until_complete(&mut self) -> Result<(), MempoolProtocolError> {
9✔
520
        let mut num_recv = 0;
9✔
521
        while let Some(result) = self.framed.next().await {
20✔
522
            let bytes = result?;
19✔
523
            let item = proto::TransactionItem::decode(&mut bytes.freeze()).map_err(|err| {
19✔
524
                MempoolProtocolError::DecodeFailed {
×
525
                    source: err,
×
526
                    peer: self.peer_node_id.clone(),
×
527
                }
×
528
            })?;
×
529

530
            match item.transaction {
19✔
531
                Some(txn) => {
12✔
532
                    self.validate_and_insert_transaction(txn).await?;
12✔
533
                    num_recv += 1;
11✔
534
                },
535
                None => {
536
                    debug!(
7✔
537
                        target: LOG_TARGET,
×
538
                        "All transaction(s) (count={}) received from peer `{}`. ",
×
539
                        num_recv,
540
                        self.peer_node_id.short_str()
×
541
                    );
542
                    break;
7✔
543
                },
544
            }
545
        }
546

547
        #[allow(clippy::cast_possible_truncation)]
548
        #[allow(clippy::cast_possible_wrap)]
549
        #[cfg(feature = "metrics")]
550
        {
551
            let stats = self.mempool.stats().await?;
7✔
552
            metrics::unconfirmed_pool_size().set(stats.unconfirmed_txs as i64);
6✔
553
            metrics::reorg_pool_size().set(stats.reorg_txs as i64);
6✔
554
        }
555

556
        Ok(())
6✔
557
    }
6✔
558

559
    async fn validate_and_insert_transaction(
12✔
560
        &mut self,
12✔
561
        txn: shared_proto::types::Transaction,
12✔
562
    ) -> Result<(), MempoolProtocolError> {
12✔
563
        let txn = Transaction::try_from(txn).map_err(|err| MempoolProtocolError::MessageConversionFailed {
12✔
564
            peer: self.peer_node_id.clone(),
×
565
            message: err,
×
566
        })?;
×
567
        let excess_sig = txn
12✔
568
            .first_kernel_excess_sig()
12✔
569
            .ok_or_else(|| MempoolProtocolError::ExcessSignatureMissing(self.peer_node_id.clone()))?;
12✔
570
        let excess_sig_hex = excess_sig.get_signature().to_hex();
12✔
571

572
        debug!(
12✔
573
            target: LOG_TARGET,
×
574
            "Received transaction `{}` from peer `{}`",
×
575
            excess_sig_hex,
576
            self.peer_node_id.short_str()
×
577
        );
578
        let txn = Arc::new(txn);
12✔
579
        let store_state = self.mempool.has_transaction(txn.clone()).await?;
12✔
580
        if store_state.is_stored() {
11✔
581
            return Ok(());
×
582
        }
11✔
583

584
        let stored_result = self.mempool.insert(txn).await?;
11✔
585
        if stored_result.is_stored() {
11✔
586
            #[cfg(feature = "metrics")]
587
            metrics::inbound_transactions().inc();
11✔
588
            debug!(
11✔
589
                target: LOG_TARGET,
×
590
                "Inserted transaction `{}` from peer `{}`",
×
591
                excess_sig_hex,
592
                self.peer_node_id.short_str()
×
593
            );
594
        } else {
595
            #[cfg(feature = "metrics")]
596
            metrics::rejected_inbound_transactions().inc();
×
597
            debug!(
×
598
                target: LOG_TARGET,
×
599
                "Did not store new transaction `{excess_sig_hex}` in mempool: {stored_result}"
×
600
            )
601
        }
602

603
        Ok(())
11✔
604
    }
11✔
605

606
    async fn write_transactions(&mut self, transactions: Vec<Arc<Transaction>>) -> Result<(), MempoolProtocolError> {
9✔
607
        let txns = transactions.into_iter().take(self.config.initial_sync_max_transactions)
9✔
608
            .filter_map(|txn| {
14✔
609
                match shared_proto::types::Transaction::try_from(txn) {
14✔
610
                    Ok(txn) =>   Some(proto::TransactionItem {
14✔
611
                        transaction: Some(txn),
14✔
612
                    }),
14✔
613
                    Err(e) => {
×
614
                        warn!(target: LOG_TARGET, "Could not convert transaction: {e}");
×
615
                        None
×
616
                    }
617
                }
618
            })
14✔
619
            // Write an empty `TransactionItem` to indicate we're done
620
            .chain(iter::once(proto::TransactionItem::empty()));
9✔
621

622
        self.write_messages(stream::iter(txns)).await?;
9✔
623

624
        Ok(())
9✔
625
    }
9✔
626

627
    async fn read_message<T: prost::Message + Default>(&mut self) -> Result<T, MempoolProtocolError> {
9✔
628
        let msg = time::timeout(Duration::from_secs(10), self.framed.next())
9✔
629
            .await
9✔
630
            .map_err(|_| MempoolProtocolError::RecvTimeout)?
9✔
631
            .ok_or_else(|| MempoolProtocolError::SubstreamClosed(self.peer_node_id.clone()))??;
9✔
632

633
        T::decode(&mut msg.freeze()).map_err(|err| MempoolProtocolError::DecodeFailed {
9✔
634
            source: err,
×
635
            peer: self.peer_node_id.clone(),
×
636
        })
×
637
    }
9✔
638

639
    async fn write_messages<S, T>(&mut self, stream: S) -> Result<(), MempoolProtocolError>
9✔
640
    where
9✔
641
        S: Stream<Item = T> + Unpin,
9✔
642
        T: prost::Message,
9✔
643
    {
9✔
644
        let mut s = stream.map(|m| Bytes::from(m.to_encoded_bytes())).map(Ok);
23✔
645
        self.framed.send_all(&mut s).await?;
9✔
646
        Ok(())
9✔
647
    }
9✔
648

649
    async fn write_message<T: prost::Message>(&mut self, message: T) -> Result<(), MempoolProtocolError> {
10✔
650
        time::timeout(
10✔
651
            Duration::from_secs(10),
10✔
652
            self.framed.send(message.to_encoded_bytes().into()),
10✔
653
        )
10✔
654
        .await
10✔
655
        .map_err(|_| MempoolProtocolError::SendTimeout)??;
10✔
656
        Ok(())
10✔
657
    }
10✔
658
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc