• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tari-project / tari / 16467678859

23 Jul 2025 10:05AM UTC coverage: 54.21% (+0.004%) from 54.206%
16467678859

push

github

web-flow
docs: update ffi interface spec (#7367)

Description
---
Updated the FFI interface specification for `fn wallet_start_recovery`

Motivation and Context
---
The function interface was changed recently.

How Has This Been Tested?
---
The specification text was compared against the code in `pub async fn
recovery_event_monitoring(..)`

What process can a PR reviewer use to test or verify this change?
---
Code review

<!-- Checklist -->
<!-- 1. Is the title of your PR in the form that would make nice release
notes? The title, excluding the conventional commit
tag, will be included exactly as is in the CHANGELOG, so please think
about it carefully. -->


Breaking Changes
---

- [x] None
- [ ] Requires data directory on base node to be deleted
- [ ] Requires hard fork
- [ ] Other - Please specify

<!-- Does this include a breaking change? If so, include this line as a
footer -->
<!-- BREAKING CHANGE: Description what the user should do, e.g. delete a
database, resync the chain -->


<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->

## Summary by CodeRabbit

* **Documentation**
* Updated the documentation for the wallet recovery progress callback to
clarify the event types and their arguments, simplifying descriptions
and removing outdated event information. No changes were made to
functionality or public interfaces.

<!-- end of auto-generated comment: release notes by coderabbit.ai -->

75384 of 139060 relevant lines covered (54.21%)

195873.36 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.2
/comms/core/src/protocol/rpc/client/mod.rs
1
//  Copyright 2021, The Tari Project
2
//
3
//  Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
4
//  following conditions are met:
5
//
6
//  1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
7
//  disclaimer.
8
//
9
//  2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
10
//  following disclaimer in the documentation and/or other materials provided with the distribution.
11
//
12
//  3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
13
//  products derived from this software without specific prior written permission.
14
//
15
//  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
16
//  INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17
//  DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
18
//  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
19
//  SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
20
//  WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
21
//  USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
22

23
pub mod pool;
24

25
#[cfg(test)]
26
mod tests;
27

28
#[cfg(feature = "metrics")]
29
mod metrics;
30

31
use std::{
32
    borrow::Cow,
33
    convert::TryFrom,
34
    fmt,
35
    future::Future,
36
    marker::PhantomData,
37
    sync::{
38
        atomic::{AtomicBool, Ordering},
39
        Arc,
40
    },
41
    time::{Duration, Instant},
42
};
43

44
use bytes::Bytes;
45
use futures::{
46
    future,
47
    future::{BoxFuture, Either},
48
    task::{Context, Poll},
49
    FutureExt,
50
    SinkExt,
51
    StreamExt,
52
};
53
use log::*;
54
use prost::Message;
55
use tari_shutdown::{oneshot_trigger::OneshotSignal, Shutdown, ShutdownSignal};
56
use tokio::{
57
    io::{AsyncRead, AsyncWrite},
58
    sync::{mpsc, oneshot, watch, Mutex},
59
    time,
60
};
61
use tower::{Service, ServiceExt};
62
use tracing::{span, Instrument, Level};
63

64
use super::message::RpcMethod;
65
use crate::{
66
    framing::CanonicalFraming,
67
    message::MessageExt,
68
    peer_manager::NodeId,
69
    proto,
70
    protocol::{
71
        rpc,
72
        rpc::{
73
            body::ClientStreaming,
74
            message::{BaseRequest, RpcMessageFlags},
75
            Handshake,
76
            NamedProtocolService,
77
            Response,
78
            RpcError,
79
            RpcServerError,
80
            RpcStatus,
81
        },
82
        ProtocolId,
83
    },
84
    stream_id,
85
    stream_id::StreamId,
86
};
87

88
const LOG_TARGET: &str = "comms::rpc::client";
89

90
#[derive(Clone)]
91
pub struct RpcClient {
92
    connector: ClientConnector,
93
}
94

95
impl RpcClient {
96
    pub fn builder<T>() -> RpcClientBuilder<T>
5✔
97
    where T: NamedProtocolService {
5✔
98
        RpcClientBuilder::new().with_protocol_id(T::PROTOCOL_NAME.into())
5✔
99
    }
5✔
100

101
    /// Create a new RpcClient using the given framed substream and perform the RPC handshake.
102
    pub async fn connect<TSubstream>(
92✔
103
        config: RpcClientConfig,
92✔
104
        node_id: NodeId,
92✔
105
        framed: CanonicalFraming<TSubstream>,
92✔
106
        protocol_name: ProtocolId,
92✔
107
        terminate_signal: Option<OneshotSignal<NodeId>>,
92✔
108
        session_state: Arc<AtomicBool>,
92✔
109
    ) -> Result<Self, RpcError>
92✔
110
    where
92✔
111
        TSubstream: AsyncRead + AsyncWrite + Unpin + Send + StreamId + 'static,
92✔
112
    {
92✔
113
        trace!(target: LOG_TARGET,"connect to {:?} with {:?}", node_id, config);
92✔
114
        let (request_tx, request_rx) = mpsc::channel(1);
92✔
115
        let shutdown = Shutdown::new();
92✔
116
        let shutdown_signal = shutdown.to_signal();
92✔
117
        let (last_request_latency_tx, last_request_latency_rx) = watch::channel(None);
92✔
118
        let connector = ClientConnector::new(request_tx, last_request_latency_rx, shutdown);
92✔
119
        let (ready_tx, ready_rx) = oneshot::channel();
92✔
120
        let tracing_id = tracing::Span::current().id();
92✔
121

122
        tokio::spawn({
123
            let span = span!(Level::TRACE, "start_rpc_worker");
92✔
124
            span.follows_from(tracing_id);
92✔
125

92✔
126
            RpcClientWorker::new(
92✔
127
                config,
92✔
128
                node_id,
92✔
129
                request_rx,
92✔
130
                last_request_latency_tx,
92✔
131
                framed,
92✔
132
                ready_tx,
92✔
133
                protocol_name,
92✔
134
                shutdown_signal,
92✔
135
                terminate_signal,
92✔
136
                session_state,
92✔
137
            )
92✔
138
            .run()
92✔
139
            .instrument(span)
92✔
140
        });
92✔
141
        ready_rx
92✔
142
            .await
92✔
143
            .expect("ready_rx oneshot is never dropped without a reply")?;
92✔
144
        Ok(Self { connector })
85✔
145
    }
92✔
146

147
    /// Perform a single request and single response
148
    pub async fn request_response<T, R, M>(&mut self, request: T, method: M) -> Result<R, RpcError>
38✔
149
    where
38✔
150
        T: prost::Message,
38✔
151
        R: prost::Message + Default + std::fmt::Debug,
38✔
152
        M: Into<RpcMethod>,
38✔
153
    {
38✔
154
        let req_bytes = request.to_encoded_bytes();
38✔
155
        let request = BaseRequest::new(method.into(), req_bytes.into());
38✔
156

157
        let mut resp = self.call_inner(request).await?;
38✔
158
        let resp = resp.recv().await.ok_or(RpcError::ServerClosedRequest)??;
34✔
159
        let resp = R::decode(resp.into_message())?;
23✔
160

161
        Ok(resp)
23✔
162
    }
38✔
163

164
    /// Perform a single request and streaming response
165
    pub async fn server_streaming<T, M, R>(&mut self, request: T, method: M) -> Result<ClientStreaming<R>, RpcError>
100✔
166
    where
100✔
167
        T: prost::Message,
100✔
168
        R: prost::Message + Default,
100✔
169
        M: Into<RpcMethod>,
100✔
170
    {
100✔
171
        let req_bytes = request.to_encoded_bytes();
100✔
172
        let request = BaseRequest::new(method.into(), req_bytes.into());
100✔
173

174
        let resp = self.call_inner(request).await?;
100✔
175

176
        Ok(ClientStreaming::new(resp))
100✔
177
    }
100✔
178

179
    /// Close the RPC session. Any subsequent calls will error.
180
    pub async fn close(&mut self) {
3✔
181
        self.connector.close().await;
3✔
182
    }
3✔
183

184
    pub fn is_connected(&self) -> bool {
113✔
185
        self.connector.is_connected()
113✔
186
    }
113✔
187

188
    /// Return the latency of the last request
189
    pub fn get_last_request_latency(&mut self) -> Option<Duration> {
2✔
190
        self.connector.get_last_request_latency()
2✔
191
    }
2✔
192

193
    /// Sends a ping and returns the latency
194
    pub fn ping(&mut self) -> impl Future<Output = Result<Duration, RpcError>> + '_ {
1✔
195
        self.connector.send_ping()
1✔
196
    }
1✔
197

198
    async fn call_inner(
138✔
199
        &mut self,
138✔
200
        request: BaseRequest<Bytes>,
138✔
201
    ) -> Result<mpsc::Receiver<Result<Response<Bytes>, RpcStatus>>, RpcError> {
138✔
202
        let svc = self.connector.ready().await?;
138✔
203
        let resp = svc.call(request).await?;
138✔
204
        Ok(resp)
134✔
205
    }
138✔
206
}
207

208
impl fmt::Debug for RpcClient {
209
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
×
210
        write!(f, "RpcClient {{ inner: ... }}")
×
211
    }
×
212
}
213

214
#[derive(Debug, Clone)]
215
pub struct RpcClientBuilder<TClient> {
216
    config: RpcClientConfig,
217
    protocol_id: Option<ProtocolId>,
218
    node_id: Option<NodeId>,
219
    terminate_signal: Option<OneshotSignal<NodeId>>,
220
    session_state: Option<Arc<AtomicBool>>,
221
    _client: PhantomData<TClient>,
222
}
223

224
impl<TClient> Default for RpcClientBuilder<TClient> {
225
    fn default() -> Self {
47✔
226
        Self {
47✔
227
            config: Default::default(),
47✔
228
            protocol_id: None,
47✔
229
            node_id: None,
47✔
230
            terminate_signal: None,
47✔
231
            session_state: None,
47✔
232
            _client: PhantomData,
47✔
233
        }
47✔
234
    }
47✔
235
}
236

237
impl<TClient> RpcClientBuilder<TClient> {
238
    pub fn new() -> Self {
19✔
239
        Default::default()
19✔
240
    }
19✔
241

242
    /// The deadline to send to the peer when performing a request.
243
    /// If this deadline is exceeded, the server SHOULD abandon the request.
244
    /// The client will return a timeout error if the deadline plus the grace period is exceeded.
245
    ///
246
    /// _Note: That is the deadline is set too low, the responding peer MAY immediately reject the request.
247
    ///
248
    /// Default: 100s
249
    pub fn with_deadline(mut self, timeout: Duration) -> Self {
17✔
250
        self.config.deadline = Some(timeout);
17✔
251
        self
17✔
252
    }
17✔
253

254
    /// Sets the grace period to allow after the configured deadline before giving up and timing out.
255
    /// This configuration should be set to comfortably account for the latency experienced during requests.
256
    ///
257
    /// Default: 10 seconds
258
    pub fn with_deadline_grace_period(mut self, timeout: Duration) -> Self {
2✔
259
        self.config.deadline_grace_period = timeout;
2✔
260
        self
2✔
261
    }
2✔
262

263
    /// Set the length of time that the client will wait for a response in the RPC handshake before returning a timeout
264
    /// error.
265
    /// Default: 15 seconds
266
    pub fn with_handshake_timeout(mut self, timeout: Duration) -> Self {
1✔
267
        self.config.handshake_timeout = timeout;
1✔
268
        self
1✔
269
    }
1✔
270

271
    /// Set the protocol ID associated with this client. This is used for logging purposes only.
272
    pub fn with_protocol_id(mut self, protocol_id: ProtocolId) -> Self {
94✔
273
        self.protocol_id = Some(protocol_id);
94✔
274
        self
94✔
275
    }
94✔
276

277
    /// Set the node_id for logging/metrics purposes
278
    pub fn with_node_id(mut self, node_id: NodeId) -> Self {
75✔
279
        self.node_id = Some(node_id);
75✔
280
        self
75✔
281
    }
75✔
282

283
    /// Set a signal that indicates if this client should be immediately closed
284
    pub fn with_terminate_signal(mut self, terminate_signal: OneshotSignal<NodeId>) -> Self {
75✔
285
        self.terminate_signal = Some(terminate_signal);
75✔
286
        self
75✔
287
    }
75✔
288

289
    /// Set a bool that can be set to false when this client terminates
290
    pub fn with_session_state(mut self, session_state: Arc<AtomicBool>) -> Self {
75✔
291
        self.session_state = Some(session_state);
75✔
292
        self
75✔
293
    }
75✔
294
}
295

296
impl<TClient> RpcClientBuilder<TClient>
297
where TClient: From<RpcClient> + NamedProtocolService
298
{
299
    /// Negotiates and establishes a session to the peer's RPC service
300
    pub async fn connect<TSubstream>(self, framed: CanonicalFraming<TSubstream>) -> Result<TClient, RpcError>
89✔
301
    where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + StreamId + 'static {
89✔
302
        RpcClient::connect(
89✔
303
            self.config,
89✔
304
            self.node_id.unwrap_or_default(),
89✔
305
            framed,
89✔
306
            self.protocol_id
89✔
307
                .as_ref()
89✔
308
                .cloned()
89✔
309
                .unwrap_or_else(|| ProtocolId::from_static(TClient::PROTOCOL_NAME)),
89✔
310
            self.terminate_signal,
89✔
311
            self.session_state.unwrap_or(Arc::new(AtomicBool::new(true))),
89✔
312
        )
89✔
313
        .await
89✔
314
        .map(Into::into)
89✔
315
    }
89✔
316
}
317

318
#[derive(Debug, Clone, Copy)]
319
pub struct RpcClientConfig {
320
    pub deadline: Option<Duration>,
321
    pub deadline_grace_period: Duration,
322
    pub handshake_timeout: Duration,
323
}
324

325
impl RpcClientConfig {
326
    /// Returns the timeout including the configured grace period
327
    pub fn timeout_with_grace_period(&self) -> Option<Duration> {
103,091✔
328
        self.deadline.map(|d| d + self.deadline_grace_period)
103,091✔
329
    }
103,091✔
330

331
    /// Returns the handshake timeout
332
    pub fn handshake_timeout(&self) -> Duration {
92✔
333
        self.handshake_timeout
92✔
334
    }
92✔
335
}
336

337
impl Default for RpcClientConfig {
338
    fn default() -> Self {
50✔
339
        Self {
50✔
340
            deadline: Some(Duration::from_secs(120)),
50✔
341
            deadline_grace_period: Duration::from_secs(60),
50✔
342
            handshake_timeout: Duration::from_secs(90),
50✔
343
        }
50✔
344
    }
50✔
345
}
346

347
#[derive(Clone)]
348
pub struct ClientConnector {
349
    inner: mpsc::Sender<ClientRequest>,
350
    last_request_latency_rx: watch::Receiver<Option<Duration>>,
351
    shutdown: Arc<Mutex<Shutdown>>,
352
}
353

354
impl ClientConnector {
355
    pub(self) fn new(
92✔
356
        sender: mpsc::Sender<ClientRequest>,
92✔
357
        last_request_latency_rx: watch::Receiver<Option<Duration>>,
92✔
358
        shutdown: Shutdown,
92✔
359
    ) -> Self {
92✔
360
        Self {
92✔
361
            inner: sender,
92✔
362
            last_request_latency_rx,
92✔
363
            shutdown: Arc::new(Mutex::new(shutdown)),
92✔
364
        }
92✔
365
    }
92✔
366

367
    pub async fn close(&mut self) {
3✔
368
        let mut lock = self.shutdown.lock().await;
3✔
369
        lock.trigger();
3✔
370
    }
3✔
371

372
    pub fn get_last_request_latency(&mut self) -> Option<Duration> {
2✔
373
        *self.last_request_latency_rx.borrow()
2✔
374
    }
2✔
375

376
    pub async fn send_ping(&mut self) -> Result<Duration, RpcError> {
1✔
377
        let (reply, reply_rx) = oneshot::channel();
1✔
378
        self.inner
1✔
379
            .send(ClientRequest::SendPing(reply))
1✔
380
            .await
1✔
381
            .map_err(|_| RpcError::ClientClosed)?;
1✔
382

383
        let latency = reply_rx.await.map_err(|_| RpcError::RequestCancelled)??;
1✔
384
        Ok(latency)
1✔
385
    }
1✔
386

387
    pub fn is_connected(&self) -> bool {
113✔
388
        !self.inner.is_closed()
113✔
389
    }
113✔
390
}
391

392
impl fmt::Debug for ClientConnector {
393
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
×
394
        write!(f, "ClientConnector {{ inner: ... }}")
×
395
    }
×
396
}
397

398
impl Service<BaseRequest<Bytes>> for ClientConnector {
399
    type Error = RpcError;
400
    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
401
    type Response = mpsc::Receiver<Result<Response<Bytes>, RpcStatus>>;
402

403
    fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
138✔
404
        Poll::Ready(Ok(()))
138✔
405
    }
138✔
406

407
    fn call(&mut self, request: BaseRequest<Bytes>) -> Self::Future {
138✔
408
        let (reply, reply_rx) = oneshot::channel();
138✔
409
        let inner = self.inner.clone();
138✔
410
        async move {
138✔
411
            inner
138✔
412
                .send(ClientRequest::SendRequest { request, reply })
138✔
413
                .await
138✔
414
                .map_err(|_| RpcError::ClientClosed)?;
138✔
415

416
            reply_rx.await.map_err(|_| RpcError::RequestCancelled)
135✔
417
        }
138✔
418
        .boxed()
138✔
419
    }
138✔
420
}
421

422
struct RpcClientWorker<TSubstream> {
423
    config: RpcClientConfig,
424
    node_id: NodeId,
425
    request_rx: mpsc::Receiver<ClientRequest>,
426
    last_request_latency_tx: watch::Sender<Option<Duration>>,
427
    framed: CanonicalFraming<TSubstream>,
428
    // Request ids are limited to u16::MAX because varint encoding is used over the wire and the magnitude of the value
429
    // sent determines the byte size. A u16 will be more than enough for the purpose
430
    next_request_id: u16,
431
    ready_tx: Option<oneshot::Sender<Result<(), RpcError>>>,
432
    protocol_id: ProtocolId,
433
    shutdown_signal: ShutdownSignal,
434
    terminate_signal: Option<OneshotSignal<NodeId>>,
435
    session_state: Arc<AtomicBool>,
436
}
437

438
impl<TSubstream> RpcClientWorker<TSubstream>
439
where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + StreamId
440
{
441
    pub(self) fn new(
92✔
442
        config: RpcClientConfig,
92✔
443
        node_id: NodeId,
92✔
444
        request_rx: mpsc::Receiver<ClientRequest>,
92✔
445
        last_request_latency_tx: watch::Sender<Option<Duration>>,
92✔
446
        framed: CanonicalFraming<TSubstream>,
92✔
447
        ready_tx: oneshot::Sender<Result<(), RpcError>>,
92✔
448
        protocol_id: ProtocolId,
92✔
449
        shutdown_signal: ShutdownSignal,
92✔
450
        terminate_signal: Option<OneshotSignal<NodeId>>,
92✔
451
        session_state: Arc<AtomicBool>,
92✔
452
    ) -> Self {
92✔
453
        Self {
92✔
454
            config,
92✔
455
            node_id,
92✔
456
            request_rx,
92✔
457
            framed,
92✔
458
            next_request_id: 0,
92✔
459
            ready_tx: Some(ready_tx),
92✔
460
            last_request_latency_tx,
92✔
461
            protocol_id,
92✔
462
            shutdown_signal,
92✔
463
            terminate_signal,
92✔
464
            session_state,
92✔
465
        }
92✔
466
    }
92✔
467

468
    fn protocol_name(&self) -> Cow<'_, str> {
103,088✔
469
        String::from_utf8_lossy(&self.protocol_id)
103,088✔
470
    }
103,088✔
471

472
    fn stream_id(&self) -> stream_id::Id {
103,088✔
473
        self.framed.stream_id()
103,088✔
474
    }
103,088✔
475

476
    #[allow(clippy::too_many_lines)]
477
    async fn run(mut self) {
92✔
478
        debug!(
92✔
479
            target: LOG_TARGET,
×
480
            "(stream={}) Performing client handshake for '{}'",
×
481
            self.stream_id(),
×
482
            self.protocol_name()
×
483
        );
484
        let start = Instant::now();
92✔
485
        let mut handshake = Handshake::new(&mut self.framed).with_timeout(self.config.handshake_timeout());
92✔
486
        match handshake.perform_client_handshake().await {
92✔
487
            Ok(_) => {
488
                let latency = start.elapsed();
85✔
489
                debug!(
85✔
490
                    target: LOG_TARGET,
×
491
                    "(stream={}) RPC Session ({}) negotiation completed. Latency: {:.0?}",
×
492
                    self.stream_id(),
×
493
                    self.protocol_name(),
×
494
                    latency
495
                );
496
                let _ = self.last_request_latency_tx.send(Some(latency));
85✔
497
                if let Some(r) = self.ready_tx.take() {
85✔
498
                    let _result = r.send(Ok(()));
85✔
499
                }
85✔
500
                #[cfg(feature = "metrics")]
501
                metrics::handshake_counter(&self.protocol_id).inc();
85✔
502
            },
503
            Err(err) => {
7✔
504
                #[cfg(feature = "metrics")]
7✔
505
                metrics::handshake_errors(&self.protocol_id).inc();
7✔
506
                if let Some(r) = self.ready_tx.take() {
7✔
507
                    let _result = r.send(Err(err.into()));
7✔
508
                }
7✔
509

510
                return;
7✔
511
            },
512
        }
513

514
        let mut terminate_signal = self
85✔
515
            .terminate_signal
85✔
516
            .take()
85✔
517
            .map(|f| f.boxed())
85✔
518
            .unwrap_or_else(|| future::pending::<Option<NodeId>>().boxed());
85✔
519

85✔
520
        #[cfg(feature = "metrics")]
85✔
521
        metrics::num_sessions(&self.protocol_id).inc();
85✔
522
        loop {
523
            tokio::select! {
215✔
524
                // Check the futures in the order they are listed
525
                biased;
526
                _ = &mut self.shutdown_signal => {
215✔
527
                    break;
45✔
528
                }
529
                node_id = &mut terminate_signal => {
215✔
530
                    debug!(
3✔
531
                        target: LOG_TARGET, "(stream={}) Peer '{}' connection has dropped. Worker is terminating.",
×
532
                        self.stream_id(), node_id.unwrap_or_default()
×
533
                    );
534
                    break;
3✔
535
                }
536
                req = self.request_rx.recv() => {
215✔
537
                    match req {
135✔
538
                        Some(req) => {
135✔
539
                            if let Err(err) = self.handle_request(req).await {
135✔
540
                                #[cfg(feature = "metrics")]
541
                                metrics::client_errors(&self.protocol_id).inc();
4✔
542
                                info!(
4✔
543
                                    target: LOG_TARGET,
×
544
                                    "(stream={}) Unexpected error: {}. Worker is terminating.",
×
545
                                    self.stream_id(), err
×
546
                                );
547
                                break;
4✔
548
                            }
130✔
549
                        }
550
                        None => {
551
                            debug!(
×
552
                                target: LOG_TARGET,
×
553
                                "(stream={}) Request channel closed. Worker is terminating.",
×
554
                                self.stream_id()
×
555
                            );
556
                            break
×
557
                        },
558
                    }
559
                }
560
            }
561
        }
562
        #[cfg(feature = "metrics")]
563
        metrics::num_sessions(&self.protocol_id).dec();
52✔
564

52✔
565
        let session_state = self.session_state.as_ref();
52✔
566
        session_state.store(false, Ordering::Relaxed);
52✔
567

568
        if let Err(err) = self.framed.close().await {
52✔
569
            debug!(
3✔
570
                target: LOG_TARGET,
×
571
                "(stream: {}, peer: {}) IO Error when closing substream: {}",
×
572
                self.stream_id(),
×
573
                self.node_id,
574
                err
575
            );
576
        }
49✔
577

578
        debug!(
52✔
579
            target: LOG_TARGET,
×
580
            "(stream: {}, peer: {}) RpcClientWorker ({}) terminated.",
×
581
            self.stream_id(),
×
582
            self.node_id,
×
583
            self.protocol_name()
×
584
        );
585
    }
59✔
586

587
    async fn handle_request(&mut self, req: ClientRequest) -> Result<(), RpcError> {
135✔
588
        use ClientRequest::{SendPing, SendRequest};
589
        match req {
135✔
590
            SendRequest { request, reply } => {
134✔
591
                self.do_request_response(request, reply).await?;
134✔
592
            },
593
            SendPing(reply) => {
1✔
594
                self.do_ping_pong(reply).await?;
1✔
595
            },
596
        }
597
        Ok(())
130✔
598
    }
134✔
599

600
    async fn do_ping_pong(&mut self, reply: oneshot::Sender<Result<Duration, RpcStatus>>) -> Result<(), RpcError> {
1✔
601
        let ack = proto::rpc::RpcRequest {
1✔
602
            flags: u32::from(RpcMessageFlags::ACK.bits()),
1✔
603
            deadline: self.config.deadline.map(|t| t.as_secs()).unwrap_or(0),
1✔
604
            ..Default::default()
1✔
605
        };
1✔
606

1✔
607
        let start = Instant::now();
1✔
608
        self.framed.send(ack.to_encoded_bytes().into()).await?;
1✔
609

610
        trace!(
1✔
611
            target: LOG_TARGET,
×
612
            "(stream={}) Ping (protocol {}) sent in {:.2?}",
×
613
            self.stream_id(),
×
614
            self.protocol_name(),
×
615
            start.elapsed()
×
616
        );
617
        let mut reader = RpcResponseReader::new(&mut self.framed, self.config, 0);
1✔
618
        let resp = match reader.read_ack().await {
1✔
619
            Ok(resp) => resp,
1✔
620
            Err(RpcError::ReplyTimeout) => {
621
                debug!(
×
622
                    target: LOG_TARGET,
×
623
                    "(stream={}) Ping timed out after {:.0?}",
×
624
                    self.stream_id(),
×
625
                    start.elapsed()
×
626
                );
627
                #[cfg(feature = "metrics")]
628
                metrics::client_timeouts(&self.protocol_id).inc();
×
629
                let _result = reply.send(Err(RpcStatus::timed_out("Response timed out")));
×
630
                return Ok(());
×
631
            },
632
            Err(err) => return Err(err),
×
633
        };
634

635
        let status = RpcStatus::from(&resp);
1✔
636
        if !status.is_ok() {
1✔
637
            let _result = reply.send(Err(status.clone()));
×
638
            return Err(status.into());
×
639
        }
1✔
640

641
        let resp_flags =
1✔
642
            RpcMessageFlags::from_bits(u8::try_from(resp.flags).map_err(|_| {
1✔
643
                RpcStatus::protocol_error(&format!("invalid message flag: must be less than {}", u8::MAX))
×
644
            })?)
1✔
645
            .ok_or(RpcStatus::protocol_error(&format!(
1✔
646
                "invalid message flag, does not match any flags ({})",
1✔
647
                resp.flags
1✔
648
            )))?;
1✔
649
        if !resp_flags.contains(RpcMessageFlags::ACK) {
1✔
650
            warn!(
×
651
                target: LOG_TARGET,
×
652
                "(stream={}) Invalid ping response {:?}",
×
653
                self.stream_id(),
×
654
                resp
655
            );
656
            let _result = reply.send(Err(RpcStatus::protocol_error(&format!(
×
657
                "Received invalid ping response on protocol '{}'",
×
658
                self.protocol_name()
×
659
            ))));
×
660
            return Err(RpcError::InvalidPingResponse);
×
661
        }
1✔
662

1✔
663
        let _result = reply.send(Ok(start.elapsed()));
1✔
664
        Ok(())
1✔
665
    }
1✔
666

667
    #[allow(clippy::too_many_lines)]
668
    async fn do_request_response(
134✔
669
        &mut self,
134✔
670
        request: BaseRequest<Bytes>,
134✔
671
        reply: oneshot::Sender<mpsc::Receiver<Result<Response<Bytes>, RpcStatus>>>,
134✔
672
    ) -> Result<(), RpcError> {
134✔
673
        #[cfg(feature = "metrics")]
134✔
674
        metrics::outbound_request_bytes(&self.protocol_id).observe(request.get_ref().len() as f64);
134✔
675

134✔
676
        let request_id = self.next_request_id();
134✔
677
        let method = request.method.into();
134✔
678
        let req = proto::rpc::RpcRequest {
134✔
679
            request_id: u32::from(request_id),
134✔
680
            method,
134✔
681
            deadline: self.config.deadline.map(|t| t.as_secs()).unwrap_or(0),
134✔
682
            flags: 0,
134✔
683
            payload: request.message.to_vec(),
134✔
684
        };
134✔
685

134✔
686
        trace!(target: LOG_TARGET, "Sending request: {}", req);
134✔
687

688
        if reply.is_closed() {
134✔
689
            warn!(
×
690
                target: LOG_TARGET,
×
691
                "Client request was cancelled before request was sent"
×
692
            );
693
        }
134✔
694

695
        let (response_tx, response_rx) = mpsc::channel(5);
134✔
696
        if let Err(mut rx) = reply.send(response_rx) {
134✔
697
            warn!(
×
698
                target: LOG_TARGET,
×
699
                "Client request was cancelled after request was sent. This means that you are making an RPC request \
×
700
                 and then immediately dropping the response! (protocol = {})",
×
701
                self.protocol_name(),
×
702
            );
703
            rx.close();
×
704
            return Ok(());
×
705
        }
134✔
706

134✔
707
        #[cfg(feature = "metrics")]
134✔
708
        let latency = metrics::request_response_latency(&self.protocol_id);
134✔
709
        #[cfg(feature = "metrics")]
134✔
710
        let mut metrics_timer = Some(latency.start_timer());
134✔
711

134✔
712
        let timer = Instant::now();
134✔
713
        if let Err(err) = self.send_request(req).await {
134✔
714
            warn!(target: LOG_TARGET, "{}", err);
3✔
715
            #[cfg(feature = "metrics")]
716
            metrics::client_errors(&self.protocol_id).inc();
3✔
717
            let _result = response_tx.send(Err(err.into())).await;
3✔
718
            return Ok(());
3✔
719
        }
131✔
720
        let partial_latency = timer.elapsed();
131✔
721

722
        loop {
723
            if self.shutdown_signal.is_triggered() {
103,088✔
724
                debug!(
×
725
                    target: LOG_TARGET,
×
726
                    "[peer: {}, protocol: {}, stream_id: {}, req_id: {}] Client connector closed. Quitting stream \
×
727
                     early",
×
728
                    self.node_id,
×
729
                    self.protocol_name(),
×
730
                    self.stream_id(),
×
731
                    request_id
732
                );
733
                break;
×
734
            }
103,088✔
735

736
            // Check if the response receiver has been dropped while receiving messages
737
            let resp_result = {
103,087✔
738
                let resp_fut = self.read_response(request_id);
103,088✔
739
                tokio::pin!(resp_fut);
103,088✔
740
                let closed_fut = response_tx.closed();
103,088✔
741
                tokio::pin!(closed_fut);
103,088✔
742
                match future::select(resp_fut, closed_fut).await {
103,088✔
743
                    Either::Left((r, _)) => Some(r),
103,084✔
744
                    Either::Right(_) => None,
3✔
745
                }
746
            };
747
            let resp_result = match resp_result {
103,087✔
748
                Some(r) => r,
103,084✔
749
                None => {
750
                    self.premature_close(request_id, method).await?;
3✔
751
                    break;
3✔
752
                },
753
            };
754

755
            let resp = match resp_result {
103,079✔
756
                Ok((resp, time_to_first_msg)) => {
103,079✔
757
                    if let Some(t) = time_to_first_msg {
103,079✔
758
                        let _ = self.last_request_latency_tx.send(Some(partial_latency + t));
103,079✔
759
                    }
103,079✔
760
                    trace!(
103,079✔
761
                        target: LOG_TARGET,
×
762
                        "Received response ({} byte(s)) from request #{} (protocol = {}, method={})",
×
763
                        resp.payload.len(),
×
764
                        request_id,
×
765
                        self.protocol_name(),
×
766
                        method,
767
                    );
768

769
                    #[cfg(feature = "metrics")]
770
                    if let Some(t) = metrics_timer.take() {
103,079✔
771
                        t.observe_duration();
125✔
772
                    }
102,954✔
773
                    resp
103,079✔
774
                },
775
                Err(RpcError::ReplyTimeout) => {
776
                    debug!(
1✔
777
                        target: LOG_TARGET,
×
778
                        "Request {} (method={}) timed out", request_id, method,
×
779
                    );
780
                    #[cfg(feature = "metrics")]
781
                    metrics::client_timeouts(&self.protocol_id).inc();
1✔
782
                    if response_tx.is_closed() {
1✔
783
                        self.premature_close(request_id, method).await?;
×
784
                    } else {
785
                        let _result = response_tx.send(Err(RpcStatus::timed_out("Response timed out"))).await;
1✔
786
                    }
787
                    break;
1✔
788
                },
789
                Err(RpcError::ClientClosed) => {
790
                    debug!(
×
791
                        target: LOG_TARGET,
×
792
                        "Request {} (method={}) was closed (read_reply)", request_id, method,
×
793
                    );
794
                    self.request_rx.close();
×
795
                    break;
×
796
                },
797
                Err(err) => {
4✔
798
                    return Err(err);
4✔
799
                },
800
            };
801

802
            match Self::convert_to_result(resp) {
103,079✔
803
                Ok(Ok(resp)) => {
103,074✔
804
                    let is_finished = resp.is_finished();
103,074✔
805
                    // The consumer may drop the receiver before all responses are received.
103,074✔
806
                    // We handle this by sending a 'FIN' message to the server.
103,074✔
807
                    if response_tx.is_closed() {
103,074✔
808
                        self.premature_close(request_id, method).await?;
×
809
                        break;
×
810
                    } else {
811
                        let _result = response_tx.send(Ok(resp)).await;
103,074✔
812
                    }
813
                    if is_finished {
103,074✔
814
                        break;
117✔
815
                    }
102,957✔
816
                },
817
                Ok(Err(err)) => {
5✔
818
                    debug!(target: LOG_TARGET, "Remote service returned error: {}", err);
5✔
819
                    if !response_tx.is_closed() {
5✔
820
                        let _result = response_tx.send(Err(err)).await;
5✔
821
                    }
×
822
                    break;
5✔
823
                },
824
                Err(err @ RpcError::ResponseIdDidNotMatchRequest { .. }) |
×
825
                Err(err @ RpcError::UnexpectedAckResponse) => {
×
826
                    warn!(target: LOG_TARGET, "{}", err);
×
827
                    // Ignore the response, this can happen when there is excessive latency. The server sends back a
828
                    // reply before the deadline but it is only received after the client has timed
829
                    // out
830
                    continue;
×
831
                },
832
                Err(err) => return Err(err),
×
833
            }
834
        }
835

836
        Ok(())
126✔
837
    }
133✔
838

839
    async fn premature_close(&mut self, request_id: u16, method: u32) -> Result<(), RpcError> {
3✔
840
        info!(
3✔
841
            target: LOG_TARGET,
×
842
            "(stream={}) Response receiver was dropped before the response/stream could complete for protocol {}, \
×
843
             interrupting the stream. ",
×
844
            self.stream_id(),
×
845
            self.protocol_name()
×
846
        );
847
        let req = proto::rpc::RpcRequest {
3✔
848
            request_id: u32::from(request_id),
3✔
849
            method,
3✔
850
            flags: RpcMessageFlags::FIN.bits().into(),
3✔
851
            deadline: self.config.deadline.map(|d| d.as_secs()).unwrap_or(0),
3✔
852
            ..Default::default()
3✔
853
        };
3✔
854

855
        // If we cannot set FIN quickly, just exit
856
        if let Ok(res) = time::timeout(Duration::from_secs(2), self.send_request(req)).await {
3✔
857
            res?;
3✔
858
        }
×
859
        Ok(())
3✔
860
    }
3✔
861

862
    async fn send_request(&mut self, req: proto::rpc::RpcRequest) -> Result<(), RpcError> {
137✔
863
        let payload = req.to_encoded_bytes();
137✔
864
        if payload.len() > rpc::max_request_size() {
137✔
865
            return Err(RpcError::MaxRequestSizeExceeded {
×
866
                got: payload.len(),
×
867
                expected: rpc::max_request_size(),
×
868
            });
×
869
        }
137✔
870
        self.framed.send(payload.into()).await?;
137✔
871
        Ok(())
134✔
872
    }
137✔
873

874
    async fn read_response(
103,088✔
875
        &mut self,
103,088✔
876
        request_id: u16,
103,088✔
877
    ) -> Result<(proto::rpc::RpcResponse, Option<Duration>), RpcError> {
103,088✔
878
        let stream_id = self.stream_id();
103,088✔
879
        let protocol_name = self.protocol_name().to_string();
103,088✔
880

103,088✔
881
        let mut reader = RpcResponseReader::new(&mut self.framed, self.config, request_id);
103,088✔
882
        let mut num_ignored = 0;
103,088✔
883
        let resp = loop {
103,079✔
884
            match reader.read_response().await {
103,090✔
885
                Ok(resp) => {
103,079✔
886
                    trace!(
103,079✔
887
                        target: LOG_TARGET,
×
888
                        "(stream: {}, {}) Received body len = {}",
×
889
                        stream_id,
×
890
                        protocol_name,
×
891
                        reader.bytes_read()
×
892
                    );
893
                    #[cfg(feature = "metrics")]
894
                    metrics::inbound_response_bytes(&self.protocol_id).observe(reader.bytes_read() as f64);
103,079✔
895
                    let time_to_first_msg = reader.time_to_first_msg();
103,079✔
896
                    break (resp, time_to_first_msg);
103,079✔
897
                },
898
                Err(RpcError::ResponseIdDidNotMatchRequest { actual, expected })
2✔
899
                    if actual.wrapping_add(1) == request_id =>
2✔
900
                {
2✔
901
                    warn!(
2✔
902
                        target: LOG_TARGET,
×
903
                        "Possible delayed response received for previous request {}", actual
×
904
                    );
905
                    num_ignored += 1;
2✔
906

907
                    // Be lenient for a number of messages that may have been buffered to come through for the previous
908
                    // request.
909
                    const MAX_ALLOWED_IGNORED: usize = 20;
910
                    if num_ignored > MAX_ALLOWED_IGNORED {
2✔
911
                        return Err(RpcError::ResponseIdDidNotMatchRequest { actual, expected });
×
912
                    }
2✔
913
                    continue;
2✔
914
                },
915
                Err(err) => return Err(err),
5✔
916
            }
917
        };
918
        Ok(resp)
103,079✔
919
    }
103,084✔
920

921
    fn next_request_id(&mut self) -> u16 {
134✔
922
        let mut next_id = self.next_request_id;
134✔
923
        // request_id is allowed to wrap around back to 0
134✔
924
        self.next_request_id = self.next_request_id.wrapping_add(1);
134✔
925
        // We dont want request id of zero because that is the default for varint on protobuf, so it is possible for the
134✔
926
        // entire message to be zero bytes (WriteZero IO error)
134✔
927
        if next_id == 0 {
134✔
928
            next_id += 1;
67✔
929
            self.next_request_id += 1;
67✔
930
        }
67✔
931
        next_id
134✔
932
    }
134✔
933

934
    fn convert_to_result(resp: proto::rpc::RpcResponse) -> Result<Result<Response<Bytes>, RpcStatus>, RpcError> {
103,079✔
935
        let status = RpcStatus::from(&resp);
103,079✔
936
        if !status.is_ok() {
103,079✔
937
            return Ok(Err(status));
5✔
938
        }
103,074✔
939
        let flags = match resp.flags() {
103,074✔
940
            Ok(flags) => flags,
103,074✔
941
            Err(e) => return Ok(Err(RpcError::ServerError(RpcServerError::ProtocolError(e)).into())),
×
942
        };
943
        let resp = Response {
103,074✔
944
            flags,
103,074✔
945
            payload: resp.payload.into(),
103,074✔
946
        };
103,074✔
947

103,074✔
948
        Ok(Ok(resp))
103,074✔
949
    }
103,079✔
950
}
951

952
pub enum ClientRequest {
953
    SendRequest {
954
        request: BaseRequest<Bytes>,
955
        reply: oneshot::Sender<mpsc::Receiver<Result<Response<Bytes>, RpcStatus>>>,
956
    },
957
    SendPing(oneshot::Sender<Result<Duration, RpcStatus>>),
958
}
959

960
struct RpcResponseReader<'a, TSubstream> {
961
    framed: &'a mut CanonicalFraming<TSubstream>,
962
    config: RpcClientConfig,
963
    request_id: u16,
964
    bytes_read: usize,
965
    time_to_first_msg: Option<Duration>,
966
}
967

968
impl<'a, TSubstream> RpcResponseReader<'a, TSubstream>
969
where TSubstream: AsyncRead + AsyncWrite + Unpin
970
{
971
    pub fn new(framed: &'a mut CanonicalFraming<TSubstream>, config: RpcClientConfig, request_id: u16) -> Self {
103,089✔
972
        Self {
103,089✔
973
            framed,
103,089✔
974
            config,
103,089✔
975
            request_id,
103,089✔
976
            bytes_read: 0,
103,089✔
977
            time_to_first_msg: None,
103,089✔
978
        }
103,089✔
979
    }
103,089✔
980

981
    pub fn bytes_read(&self) -> usize {
103,079✔
982
        self.bytes_read
103,079✔
983
    }
103,079✔
984

985
    pub fn time_to_first_msg(&self) -> Option<Duration> {
103,079✔
986
        self.time_to_first_msg
103,079✔
987
    }
103,079✔
988

989
    pub async fn read_response(&mut self) -> Result<proto::rpc::RpcResponse, RpcError> {
103,090✔
990
        let timer = Instant::now();
103,090✔
991
        let resp = self.next().await?;
103,090✔
992
        self.time_to_first_msg = Some(timer.elapsed());
103,081✔
993
        self.check_response(&resp)?;
103,081✔
994
        self.bytes_read = resp.payload.len();
103,079✔
995
        trace!(
103,079✔
996
            target: LOG_TARGET,
×
997
            "Received {} bytes in {:.2?}",
×
998
            resp.payload.len(),
×
999
            self.time_to_first_msg.unwrap_or_default()
×
1000
        );
1001
        Ok(resp)
103,079✔
1002
    }
103,086✔
1003

1004
    pub async fn read_ack(&mut self) -> Result<proto::rpc::RpcResponse, RpcError> {
1✔
1005
        let resp = self.next().await?;
1✔
1006
        Ok(resp)
1✔
1007
    }
1✔
1008

1009
    fn check_response(&self, resp: &proto::rpc::RpcResponse) -> Result<(), RpcError> {
103,081✔
1010
        let resp_id = u16::try_from(resp.request_id)
103,081✔
1011
            .map_err(|_| RpcStatus::protocol_error(&format!("invalid request_id: must be less than {}", u16::MAX)))?;
103,081✔
1012

1013
        let flags =
103,081✔
1014
            RpcMessageFlags::from_bits(u8::try_from(resp.flags).map_err(|_| {
103,081✔
1015
                RpcStatus::protocol_error(&format!("invalid message flag: must be less than {}", u8::MAX))
×
1016
            })?)
103,081✔
1017
            .ok_or(RpcStatus::protocol_error(&format!(
103,081✔
1018
                "invalid message flag, does not match any flags ({})",
103,081✔
1019
                resp.flags
103,081✔
1020
            )))?;
103,081✔
1021
        if flags.contains(RpcMessageFlags::ACK) {
103,081✔
1022
            return Err(RpcError::UnexpectedAckResponse);
×
1023
        }
103,081✔
1024

103,081✔
1025
        if resp_id != self.request_id {
103,081✔
1026
            return Err(RpcError::ResponseIdDidNotMatchRequest {
1027
                expected: self.request_id,
2✔
1028
                actual: u16::try_from(resp.request_id).map_err(|_| {
2✔
1029
                    RpcStatus::protocol_error(&format!("invalid request_id: must be less than {}", u16::MAX))
×
1030
                })?,
2✔
1031
            });
1032
        }
103,079✔
1033

103,079✔
1034
        Ok(())
103,079✔
1035
    }
103,081✔
1036

1037
    async fn next(&mut self) -> Result<proto::rpc::RpcResponse, RpcError> {
103,091✔
1038
        // Wait until the timeout, allowing an extra grace period to account for latency
1039
        let next_msg_fut = match self.config.timeout_with_grace_period() {
103,091✔
1040
            Some(timeout) => Either::Left(time::timeout(timeout, self.framed.next())),
103,091✔
1041
            None => Either::Right(self.framed.next().map(Ok)),
×
1042
        };
1043

1044
        match next_msg_fut.await {
103,091✔
1045
            Ok(Some(Ok(resp))) => Ok(proto::rpc::RpcResponse::decode(resp)?),
103,082✔
1046
            Ok(Some(Err(err))) => Err(err.into()),
×
1047
            Ok(None) => Err(RpcError::ServerClosedRequest),
4✔
1048
            Err(_) => Err(RpcError::ReplyTimeout),
1✔
1049
        }
1050
    }
103,087✔
1051
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc