• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tari-project / tari / 22761905253

06 Mar 2026 11:40AM UTC coverage: 62.01% (+0.05%) from 61.96%
22761905253

push

github

web-flow
chore: upgrade core to 2024 (#7693)

Description
---
Upgrades tari core to rust editition 2024

560 of 915 new or added lines in 96 files covered. (61.2%)

26 existing lines in 15 files now uncovered.

71928 of 115994 relevant lines covered (62.01%)

225062.34 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.02
/base_layer/core/src/base_node/service/initializer.rs
1
// Copyright 2019. The Tari Project
2
//
3
// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
4
// following conditions are met:
5
//
6
// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
7
// disclaimer.
8
//
9
// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
10
// following disclaimer in the documentation and/or other materials provided with the distribution.
11
//
12
// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
13
// products derived from this software without specific prior written permission.
14
//
15
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
16
// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
18
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
19
// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
20
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
21
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
22

23
use std::{sync::Arc, time::Duration};
24

25
use futures::{Stream, StreamExt, future};
26
use log::*;
27
use tari_comms::connectivity::ConnectivityRequester;
28
use tari_comms_dht::Dht;
29
use tari_node_components::blocks::NewBlock;
30
use tari_p2p::{
31
    comms_connector::{PeerMessage, SubscriptionFactory},
32
    domain_message::DomainMessage,
33
    services::utils::map_decode,
34
    tari_message::TariMessageType,
35
};
36
use tari_service_framework::{
37
    ServiceInitializationError,
38
    ServiceInitializer,
39
    ServiceInitializerContext,
40
    async_trait,
41
    reply_channel,
42
};
43
use thiserror::Error;
44
use tokio::sync::{broadcast, mpsc};
45

46
use crate::{
47
    base_node::{
48
        BaseNodeStateMachineConfig,
49
        StateMachineHandle,
50
        comms_interface::{InboundNodeCommsHandlers, LocalNodeCommsInterface, OutboundNodeCommsInterface},
51
        service::service::{BaseNodeService, BaseNodeStreams},
52
    },
53
    chain_storage::{BlockchainBackend, async_db::AsyncBlockchainDb},
54
    consensus::BaseNodeConsensusManager,
55
    mempool::Mempool,
56
    proof_of_work::randomx_factory::RandomXFactory,
57
    proto as shared_protos,
58
    proto::base_node as proto,
59
};
60

61
const LOG_TARGET: &str = "c::bn::service::initializer";
62
const SUBSCRIPTION_LABEL: &str = "Base Node";
63

64
/// Initializer for the Base Node service handle and service future.
65
pub struct BaseNodeServiceInitializer<T> {
66
    inbound_message_subscription_factory: Arc<SubscriptionFactory>,
67
    blockchain_db: AsyncBlockchainDb<T>,
68
    mempool: Mempool,
69
    consensus_manager: BaseNodeConsensusManager,
70
    service_request_timeout: Duration,
71
    randomx_factory: RandomXFactory,
72
    base_node_config: BaseNodeStateMachineConfig,
73
}
74

75
impl<T> BaseNodeServiceInitializer<T>
76
where T: BlockchainBackend
77
{
78
    /// Create a new BaseNodeServiceInitializer from the inbound message subscriber.
79
    pub fn new(
47✔
80
        inbound_message_subscription_factory: Arc<SubscriptionFactory>,
47✔
81
        blockchain_db: AsyncBlockchainDb<T>,
47✔
82
        mempool: Mempool,
47✔
83
        consensus_manager: BaseNodeConsensusManager,
47✔
84
        service_request_timeout: Duration,
47✔
85
        randomx_factory: RandomXFactory,
47✔
86
        base_node_config: BaseNodeStateMachineConfig,
47✔
87
    ) -> Self {
47✔
88
        Self {
47✔
89
            inbound_message_subscription_factory,
47✔
90
            blockchain_db,
47✔
91
            mempool,
47✔
92
            consensus_manager,
47✔
93
            service_request_timeout,
47✔
94
            randomx_factory,
47✔
95
            base_node_config,
47✔
96
        }
47✔
97
    }
47✔
98

99
    /// Get a stream for inbound Base Node request messages
100
    fn inbound_request_stream(
47✔
101
        &self,
47✔
102
    ) -> impl Stream<Item = DomainMessage<Result<proto::BaseNodeServiceRequest, prost::DecodeError>>> + use<T> {
47✔
103
        self.inbound_message_subscription_factory
47✔
104
            .get_subscription(TariMessageType::BaseNodeRequest, SUBSCRIPTION_LABEL)
47✔
105
            .map(map_decode::<proto::BaseNodeServiceRequest>)
47✔
106
    }
47✔
107

108
    /// Get a stream for inbound Base Node response messages
109
    fn inbound_response_stream(
47✔
110
        &self,
47✔
111
    ) -> impl Stream<Item = DomainMessage<Result<proto::BaseNodeServiceResponse, prost::DecodeError>>> + use<T> {
47✔
112
        self.inbound_message_subscription_factory
47✔
113
            .get_subscription(TariMessageType::BaseNodeResponse, SUBSCRIPTION_LABEL)
47✔
114
            .map(map_decode::<proto::BaseNodeServiceResponse>)
47✔
115
    }
47✔
116

117
    /// Create a stream of 'New Block` messages
118
    fn inbound_block_stream(&self) -> impl Stream<Item = DomainMessage<Result<NewBlock, ExtractBlockError>>> + use<T> {
47✔
119
        self.inbound_message_subscription_factory
47✔
120
            .get_subscription(TariMessageType::NewBlock, SUBSCRIPTION_LABEL)
47✔
121
            .map(extract_block)
47✔
122
    }
47✔
123
}
124

125
#[derive(Error, Debug)]
126
pub enum ExtractBlockError {
127
    #[error("Could not decode inbound block message. {0}")]
128
    DecodeError(#[from] prost::DecodeError),
129
    #[error("Inbound block message was ill-formed. {0}")]
130
    MalformedMessage(String),
131
}
132

133
fn extract_block(msg: Arc<PeerMessage>) -> DomainMessage<Result<NewBlock, ExtractBlockError>> {
29✔
134
    let new_block = match msg.decode_message::<shared_protos::core::NewBlock>() {
29✔
135
        Ok(block) => block,
29✔
136
        Err(e) => {
×
137
            return DomainMessage {
×
138
                source_peer: msg.source_peer.clone(),
×
139
                dht_header: msg.dht_header.clone(),
×
140
                authenticated_origin: msg.authenticated_origin.clone(),
×
141
                inner: Err(e.into()),
×
NEW
142
            };
×
143
        },
144
    };
145
    let block = NewBlock::try_from(new_block).map_err(ExtractBlockError::MalformedMessage);
29✔
146
    DomainMessage {
29✔
147
        source_peer: msg.source_peer.clone(),
29✔
148
        dht_header: msg.dht_header.clone(),
29✔
149
        authenticated_origin: msg.authenticated_origin.clone(),
29✔
150
        inner: block,
29✔
151
    }
29✔
152
}
29✔
153

154
#[async_trait]
155
impl<T> ServiceInitializer for BaseNodeServiceInitializer<T>
156
where T: BlockchainBackend + 'static
157
{
158
    async fn initialize(&mut self, context: ServiceInitializerContext) -> Result<(), ServiceInitializationError> {
94✔
159
        trace!(target: LOG_TARGET, "Initializing Base Node Service");
47✔
160
        // Create streams for receiving Base Node requests and response messages from comms
161
        let inbound_request_stream = self.inbound_request_stream();
47✔
162
        let inbound_response_stream = self.inbound_response_stream();
47✔
163
        let inbound_block_stream = self.inbound_block_stream();
47✔
164
        // Connect InboundNodeCommsInterface and OutboundNodeCommsInterface to BaseNodeService
165
        let (outbound_request_sender_service, outbound_request_stream) = reply_channel::unbounded();
47✔
166
        let (outbound_block_sender_service, outbound_block_stream) = mpsc::unbounded_channel();
47✔
167
        let (local_request_sender_service, local_request_stream) = reply_channel::unbounded();
47✔
168
        let (local_block_sender_service, local_block_stream) = reply_channel::unbounded();
47✔
169
        let outbound_nci =
47✔
170
            OutboundNodeCommsInterface::new(outbound_request_sender_service, outbound_block_sender_service);
47✔
171
        let (block_event_sender, _) = broadcast::channel(50);
47✔
172
        let local_nci = LocalNodeCommsInterface::new(
47✔
173
            local_request_sender_service,
47✔
174
            local_block_sender_service,
47✔
175
            block_event_sender.clone(),
47✔
176
        );
177

178
        // Register handle to OutboundNodeCommsInterface before waiting for handles to be ready
179
        context.register_handle(outbound_nci.clone());
47✔
180
        context.register_handle(local_nci);
47✔
181

182
        let service_request_timeout = self.service_request_timeout;
47✔
183
        let blockchain_db = self.blockchain_db.clone();
47✔
184
        let mempool = self.mempool.clone();
47✔
185
        let consensus_manager = self.consensus_manager.clone();
47✔
186
        let randomx_factory = self.randomx_factory.clone();
47✔
187
        let config = self.base_node_config.clone();
47✔
188

189
        context.spawn_when_ready(move |handles| async move {
47✔
190
            let dht = handles.expect_handle::<Dht>();
47✔
191
            let connectivity = handles.expect_handle::<ConnectivityRequester>();
47✔
192
            let outbound_message_service = dht.outbound_requester();
47✔
193

194
            let state_machine = handles.expect_handle::<StateMachineHandle>();
47✔
195

196
            let inbound_nch = InboundNodeCommsHandlers::new(
47✔
197
                block_event_sender,
47✔
198
                blockchain_db,
47✔
199
                mempool,
47✔
200
                consensus_manager,
47✔
201
                outbound_nci.clone(),
47✔
202
                connectivity.clone(),
47✔
203
                randomx_factory,
47✔
204
            );
205

206
            let streams = BaseNodeStreams {
47✔
207
                outbound_request_stream,
47✔
208
                outbound_block_stream,
47✔
209
                inbound_request_stream,
47✔
210
                inbound_response_stream,
47✔
211
                inbound_block_stream,
47✔
212
                local_request_stream,
47✔
213
                local_block_stream,
47✔
214
            };
47✔
215
            let service = BaseNodeService::new(
47✔
216
                outbound_message_service,
47✔
217
                inbound_nch,
47✔
218
                service_request_timeout,
47✔
219
                state_machine,
47✔
220
                connectivity,
47✔
221
                config,
47✔
222
            )
223
            .start(streams);
47✔
224
            futures::pin_mut!(service);
47✔
225
            future::select(service, handles.get_shutdown_signal()).await;
47✔
226
            info!(target: LOG_TARGET, "Base Node Service shutdown");
41✔
227
        });
88✔
228

229
        debug!(target: LOG_TARGET, "Base Node Service initialized");
47✔
230
        Ok(())
47✔
231
    }
94✔
232
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc