• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tari-project / tari / 13947560149

19 Mar 2025 01:20PM UTC coverage: 73.397% (-0.2%) from 73.603%
13947560149

push

github

web-flow
feat: add num connections to network state (#6884)

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- The network state now reports the current number of active peer
connections, giving users improved visibility into connectivity.
- A new field for the number of connections has been added to the
network state response.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

83004 of 113089 relevant lines covered (73.4%)

258041.01 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/base_layer/chat_ffi/src/callback_handler.rs
1
// Copyright 2023, The Tari Project
2
//
3
// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
4
// following conditions are met:
5
//
6
// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
7
// disclaimer.
8
//
9
// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
10
// following disclaimer in the documentation and/or other materials provided with the distribution.
11
//
12
// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
13
// products derived from this software without specific prior written permission.
14
//
15
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
16
// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
18
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
19
// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
20
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
21
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
22

23
use std::ops::Deref;
24

25
use log::{debug, info, trace};
26
use tari_contacts::contacts_service::{
27
    handle::{ContactsLivenessData, ContactsLivenessEvent, ContactsServiceHandle},
28
    types::{Confirmation, Message, MessageDispatch},
29
};
30
use tari_shutdown::ShutdownSignal;
31

32
const LOG_TARGET: &str = "chat_ffi::callback_handler";
33

34
pub(crate) type CallbackContactStatusChange = unsafe extern "C" fn(*mut ContactsLivenessData);
35
pub(crate) type CallbackMessageReceived = unsafe extern "C" fn(*mut Message);
36
pub(crate) type CallbackDeliveryConfirmationReceived = unsafe extern "C" fn(*mut Confirmation);
37
pub(crate) type CallbackReadConfirmationReceived = unsafe extern "C" fn(*mut Confirmation);
38

39
#[derive(Clone)]
40
pub struct CallbackHandler {
41
    contacts_service_handle: ContactsServiceHandle,
42
    callback_contact_status_change: CallbackContactStatusChange,
43
    callback_message_received: CallbackMessageReceived,
44
    callback_delivery_confirmation_received: CallbackDeliveryConfirmationReceived,
45
    callback_read_confirmation_received: CallbackReadConfirmationReceived,
46
    shutdown: ShutdownSignal,
47
}
48

49
impl CallbackHandler {
50
    pub fn new(
×
51
        contacts_service_handle: ContactsServiceHandle,
×
52
        shutdown: ShutdownSignal,
×
53
        callback_contact_status_change: CallbackContactStatusChange,
×
54
        callback_message_received: CallbackMessageReceived,
×
55
        callback_delivery_confirmation_received: CallbackDeliveryConfirmationReceived,
×
56
        callback_read_confirmation_received: CallbackReadConfirmationReceived,
×
57
    ) -> Self {
×
58
        Self {
×
59
            contacts_service_handle,
×
60
            shutdown,
×
61
            callback_contact_status_change,
×
62
            callback_message_received,
×
63
            callback_delivery_confirmation_received,
×
64
            callback_read_confirmation_received,
×
65
        }
×
66
    }
×
67

68
    pub(crate) async fn start(&mut self) {
×
69
        let mut liveness_events = self.contacts_service_handle.get_contacts_liveness_event_stream();
×
70
        let mut chat_messages = self.contacts_service_handle.get_messages_event_stream();
×
71

72
        loop {
73
            tokio::select! {
×
74
                rec_message = chat_messages.recv() => {
×
75
                    match rec_message {
×
76
                        Ok(message_dispatch) => {
×
77
                            match message_dispatch.deref() {
×
78
                                MessageDispatch::Message(m) => {
×
79
                                    trace!(
×
80
                                        target: LOG_TARGET,
×
81
                                        "FFI Callback monitor received a new Message ({})",
×
82
                                        m
83
                                    );
84
                                    self.trigger_message_received(*m.clone());
×
85
                                }
86
                                MessageDispatch::DeliveryConfirmation(c) => {
×
87
                                    trace!(
×
88
                                        target: LOG_TARGET,
×
89
                                        "FFI Callback monitor received a new Delivery Confirmation ({})",
×
90
                                        c
91
                                    );
92
                                    self.trigger_delivery_confirmation_received(c.clone());
×
93
                                },
94
                                MessageDispatch::ReadConfirmation(c) => {
×
95
                                    trace!(
×
96
                                        target: LOG_TARGET,
×
97
                                        "FFI Callback monitor received a new Read Confirmation ({})",
×
98
                                        c
99
                                    );
100
                                    self.trigger_read_confirmation_received(c.clone());
×
101
                                }
102
                            };
103
                        },
104
                        Err(e) => {
×
105
                            debug!(
×
106
                                target: LOG_TARGET,
×
107
                                "FFI Callback monitor had an error receiving new messages ({})",
×
108
                                e
109
                            )
110
                        }
111
                    }
112
                },
113

114
                event = liveness_events.recv() => {
×
115
                    match event {
×
116
                        Ok(liveness_event) => {
×
117
                            match liveness_event.deref() {
×
118
                                ContactsLivenessEvent::StatusUpdated(data) => {
×
119
                                    trace!(target: LOG_TARGET,
×
120
                                        "FFI Callback monitor received Contact Status Updated event ({})", data
×
121
                                    );
122
                                    self.trigger_contact_status_change(data.deref().clone());
×
123
                                }
124
                                ContactsLivenessEvent::NetworkSilence => {},
×
125
                            }
126
                        },
127
                        Err(e) => {
×
128
                            debug!(
×
129
                                target: LOG_TARGET,
×
130
                                "FFI Callback monitor had an error with contacts liveness ({})",
×
131
                                e
132
                            )
133
                        }
134
                    }
135
                },
136
                _ = self.shutdown.wait() => {
×
137
                    info!(
×
138
                        target: LOG_TARGET,
×
139
                        "ChatFFI Callback Handler shutting down because the shutdown signal was received"
×
140
                    );
141
                    break;
×
142
                },
×
143
            }
×
144
        }
×
145
    }
×
146

147
    fn trigger_contact_status_change(&mut self, data: ContactsLivenessData) {
×
148
        debug!(
×
149
            target: LOG_TARGET,
×
150
            "Calling ContactStatusChanged callback function for contact {}",
×
151
            data.address(),
×
152
        );
153

154
        unsafe {
×
155
            (self.callback_contact_status_change)(Box::into_raw(Box::new(data)));
×
156
        }
×
157
    }
×
158

159
    fn trigger_message_received(&mut self, message: Message) {
×
160
        debug!(
×
161
            target: LOG_TARGET,
×
162
            "Calling MessageReceived callback function for sender {}",
×
163
            message.sender_address,
164
        );
165

166
        unsafe {
×
167
            (self.callback_message_received)(Box::into_raw(Box::new(message)));
×
168
        }
×
169
    }
×
170

171
    fn trigger_delivery_confirmation_received(&mut self, confirmation: Confirmation) {
×
172
        debug!(
×
173
            target: LOG_TARGET,
×
174
            "Calling DeliveryConfirmationReceived callback function for message {}",
×
175
            confirmation.message_id,
176
        );
177

178
        unsafe {
×
179
            (self.callback_delivery_confirmation_received)(Box::into_raw(Box::new(confirmation)));
×
180
        }
×
181
    }
×
182

183
    fn trigger_read_confirmation_received(&mut self, confirmation: Confirmation) {
×
184
        debug!(
×
185
            target: LOG_TARGET,
×
186
            "Calling ReadConfirmationReceived callback function for message {}",
×
187
            confirmation.message_id,
188
        );
189

190
        unsafe {
×
191
            (self.callback_read_confirmation_received)(Box::into_raw(Box::new(confirmation)));
×
192
        }
×
193
    }
×
194
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc