• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tari-project / tari / 22761905253

06 Mar 2026 11:40AM UTC coverage: 62.01% (+0.05%) from 61.96%
22761905253

push

github

web-flow
chore: upgrade core to 2024 (#7693)

Description
---
Upgrades tari core to rust editition 2024

560 of 915 new or added lines in 96 files covered. (61.2%)

26 existing lines in 15 files now uncovered.

71928 of 115994 relevant lines covered (62.01%)

225062.34 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/base_layer/core/src/mempool/sync_protocol/initializer.rs
1
//  Copyright 2020, The Tari Project
2
//
3
//  Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
4
//  following conditions are met:
5
//
6
//  1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
7
//  disclaimer.
8
//
9
//  2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
10
//  following disclaimer in the documentation and/or other materials provided with the distribution.
11
//
12
//  3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
13
//  products derived from this software without specific prior written permission.
14
//
15
//  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
16
//  INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17
//  DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
18
//  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
19
//  SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
20
//  WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
21
//  USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
22

23
use std::time::Duration;
24

25
use log::*;
26
use tari_comms::{
27
    Substream,
28
    connectivity::ConnectivityRequester,
29
    protocol::{ProtocolExtension, ProtocolExtensionContext, ProtocolExtensionError, ProtocolNotification},
30
};
31
use tari_service_framework::{ServiceInitializationError, ServiceInitializer, ServiceInitializerContext, async_trait};
32
use tokio::{sync::mpsc, time::sleep};
33

34
use crate::{
35
    base_node::{StateMachineHandle, comms_interface::LocalNodeCommsInterface},
36
    mempool::{
37
        Mempool,
38
        MempoolServiceConfig,
39
        sync_protocol::{MEMPOOL_SYNC_PROTOCOL, MempoolSyncProtocol},
40
    },
41
};
42

43
const LOG_TARGET: &str = "c::mempool::sync_protocol";
44

45
pub struct MempoolSyncInitializer {
46
    config: MempoolServiceConfig,
47
    mempool: Mempool,
48
    notif_rx: Option<mpsc::Receiver<ProtocolNotification<Substream>>>,
49
    notif_tx: mpsc::Sender<ProtocolNotification<Substream>>,
50
}
51

52
impl MempoolSyncInitializer {
53
    pub fn new(config: MempoolServiceConfig, mempool: Mempool) -> Self {
×
54
        let (notif_tx, notif_rx) = mpsc::channel(3);
×
55
        Self {
×
56
            mempool,
×
57
            config,
×
58
            notif_tx,
×
59
            notif_rx: Some(notif_rx),
×
60
        }
×
61
    }
×
62

NEW
63
    pub fn get_protocol_extension(&self) -> impl ProtocolExtension + use<> {
×
64
        let notif_tx = self.notif_tx.clone();
×
65
        move |context: &mut ProtocolExtensionContext| -> Result<(), ProtocolExtensionError> {
×
66
            context.add_protocol([MEMPOOL_SYNC_PROTOCOL.clone()], &notif_tx);
×
67
            Ok(())
×
68
        }
×
69
    }
×
70
}
71

72
#[async_trait]
73
impl ServiceInitializer for MempoolSyncInitializer {
74
    async fn initialize(&mut self, context: ServiceInitializerContext) -> Result<(), ServiceInitializationError> {
×
75
        trace!(target: LOG_TARGET, "Initializing Mempool Sync Service");
×
76
        let config = self.config.clone();
×
77
        let mempool = self.mempool.clone();
×
78
        let notif_rx = self.notif_rx.take().unwrap();
×
79

80
        context.spawn_until_shutdown(move |handles| async move {
×
81
            let state_machine = handles.expect_handle::<StateMachineHandle>();
×
82
            let connectivity = handles.expect_handle::<ConnectivityRequester>();
×
83
            let base_node = handles.expect_handle::<LocalNodeCommsInterface>();
×
84

85
            let mut status_watch = state_machine.get_status_info_watch();
×
86
            if !status_watch.borrow().state_info.is_synced() {
×
87
                debug!(target: LOG_TARGET, "Waiting for node to do initial sync...");
×
88
                while status_watch.changed().await.is_ok() {
×
89
                    if status_watch.borrow().state_info.is_synced() {
×
90
                        debug!(
×
91
                            target: LOG_TARGET,
×
92
                            "Initial sync is done. Starting mempool sync protocol"
93
                        );
94
                        break;
×
95
                    }
×
96
                    trace!(
×
97
                        target: LOG_TARGET,
×
98
                        "Mempool sync still on hold, waiting for node to do initial sync",
99
                    );
100
                    sleep(Duration::from_secs(30)).await;
×
101
                }
102
            }
×
103
            let base_node_events = base_node.get_block_event_stream();
×
104

105
            MempoolSyncProtocol::new(config, notif_rx, mempool, connectivity, base_node_events)
×
106
                .run()
×
107
                .await;
×
108
        });
×
109

110
        trace!(target: LOG_TARGET, "Mempool sync service initialized");
×
111
        Ok(())
×
112
    }
×
113
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc