• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tari-project / tari / 16990089413

15 Aug 2025 12:36PM UTC coverage: 54.497% (+0.06%) from 54.441%
16990089413

push

github

web-flow
chore: cleanup indexes (#7411)

Description
---
Forces clean indexs

970 of 2919 new or added lines in 369 files covered. (33.23%)

60 existing lines in 33 files now uncovered.

76698 of 140739 relevant lines covered (54.5%)

193749.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/base_layer/core/src/mempool/sync_protocol/initializer.rs
1
//  Copyright 2020, The Tari Project
2
//
3
//  Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
4
//  following conditions are met:
5
//
6
//  1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
7
//  disclaimer.
8
//
9
//  2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
10
//  following disclaimer in the documentation and/or other materials provided with the distribution.
11
//
12
//  3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
13
//  products derived from this software without specific prior written permission.
14
//
15
//  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
16
//  INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17
//  DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
18
//  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
19
//  SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
20
//  WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
21
//  USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
22

23
use std::time::Duration;
24

25
use log::*;
26
use tari_comms::{
27
    connectivity::ConnectivityRequester,
28
    protocol::{ProtocolExtension, ProtocolExtensionContext, ProtocolExtensionError, ProtocolNotification},
29
    Substream,
30
};
31
use tari_service_framework::{async_trait, ServiceInitializationError, ServiceInitializer, ServiceInitializerContext};
32
use tokio::{sync::mpsc, time::sleep};
33

34
use crate::{
35
    base_node::{comms_interface::LocalNodeCommsInterface, StateMachineHandle},
36
    mempool::{
37
        sync_protocol::{MempoolSyncProtocol, MEMPOOL_SYNC_PROTOCOL},
38
        Mempool,
39
        MempoolServiceConfig,
40
    },
41
};
42

43
const LOG_TARGET: &str = "c::mempool::sync_protocol";
44

45
pub struct MempoolSyncInitializer {
46
    config: MempoolServiceConfig,
47
    mempool: Mempool,
48
    notif_rx: Option<mpsc::Receiver<ProtocolNotification<Substream>>>,
49
    notif_tx: mpsc::Sender<ProtocolNotification<Substream>>,
50
}
51

52
impl MempoolSyncInitializer {
53
    pub fn new(config: MempoolServiceConfig, mempool: Mempool) -> Self {
×
54
        let (notif_tx, notif_rx) = mpsc::channel(3);
×
55
        Self {
×
56
            mempool,
×
57
            config,
×
58
            notif_tx,
×
59
            notif_rx: Some(notif_rx),
×
60
        }
×
61
    }
×
62

63
    pub fn get_protocol_extension(&self) -> impl ProtocolExtension {
×
64
        let notif_tx = self.notif_tx.clone();
×
65
        move |context: &mut ProtocolExtensionContext| -> Result<(), ProtocolExtensionError> {
×
NEW
66
            context.add_protocol([MEMPOOL_SYNC_PROTOCOL.clone()], &notif_tx);
×
67
            Ok(())
×
68
        }
×
69
    }
×
70
}
71

72
#[async_trait]
73
impl ServiceInitializer for MempoolSyncInitializer {
74
    async fn initialize(&mut self, context: ServiceInitializerContext) -> Result<(), ServiceInitializationError> {
×
75
        trace!(target: LOG_TARGET, "Initializing Mempool Sync Service");
×
76
        let config = self.config.clone();
×
77
        let mempool = self.mempool.clone();
×
78
        let notif_rx = self.notif_rx.take().unwrap();
×
79

×
80
        context.spawn_until_shutdown(move |handles| async move {
×
81
            let state_machine = handles.expect_handle::<StateMachineHandle>();
×
82
            let connectivity = handles.expect_handle::<ConnectivityRequester>();
×
83
            let base_node = handles.expect_handle::<LocalNodeCommsInterface>();
×
84

×
85
            let mut status_watch = state_machine.get_status_info_watch();
×
86
            if !status_watch.borrow().state_info.is_synced() {
×
87
                debug!(target: LOG_TARGET, "Waiting for node to do initial sync...");
×
88
                while status_watch.changed().await.is_ok() {
×
89
                    if status_watch.borrow().state_info.is_synced() {
×
90
                        debug!(
×
91
                            target: LOG_TARGET,
×
92
                            "Initial sync is done. Starting mempool sync protocol"
×
93
                        );
94
                        break;
×
95
                    }
×
96
                    trace!(
×
97
                        target: LOG_TARGET,
×
98
                        "Mempool sync still on hold, waiting for node to do initial sync",
×
99
                    );
100
                    sleep(Duration::from_secs(30)).await;
×
101
                }
102
            }
×
103
            let base_node_events = base_node.get_block_event_stream();
×
104

×
105
            MempoolSyncProtocol::new(config, notif_rx, mempool, connectivity, base_node_events)
×
106
                .run()
×
107
                .await;
×
108
        });
×
109

×
110
        trace!(target: LOG_TARGET, "Mempool sync service initialized");
×
111
        Ok(())
×
112
    }
×
113
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc