• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tari-project / tari / 17297453740

28 Aug 2025 01:33PM UTC coverage: 61.046% (+0.9%) from 60.14%
17297453740

push

github

web-flow
chore(ci): add a wasm build step in ci (#7448)

Description
Add a wasm build step in ci

Motivation and Context
Test the wasm builds

How Has This Been Tested?
Builds in local fork


<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->

## Summary by CodeRabbit

* **Chores**
* Added CI workflow to build WebAssembly targets with optimized caching
on both hosted and self-hosted runners, improving build consistency and
speed.
* **Tests**
* Expanded automated checks to include WebAssembly build verification
for multiple modules, increasing coverage and early detection of build
issues.

<!-- end of auto-generated comment: release notes by coderabbit.ai -->

72582 of 118897 relevant lines covered (61.05%)

301536.67 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.2
/comms/core/src/protocol/rpc/client/mod.rs
1
//  Copyright 2021, The Tari Project
2
//
3
//  Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
4
//  following conditions are met:
5
//
6
//  1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
7
//  disclaimer.
8
//
9
//  2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
10
//  following disclaimer in the documentation and/or other materials provided with the distribution.
11
//
12
//  3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
13
//  products derived from this software without specific prior written permission.
14
//
15
//  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
16
//  INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17
//  DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
18
//  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
19
//  SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
20
//  WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
21
//  USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
22

23
pub mod pool;
24

25
#[cfg(test)]
26
mod tests;
27

28
#[cfg(feature = "metrics")]
29
mod metrics;
30

31
use std::{
32
    borrow::Cow,
33
    convert::TryFrom,
34
    fmt,
35
    future::Future,
36
    marker::PhantomData,
37
    sync::{
38
        atomic::{AtomicBool, Ordering},
39
        Arc,
40
    },
41
    time::{Duration, Instant},
42
};
43

44
use bytes::Bytes;
45
use futures::{
46
    future,
47
    future::{BoxFuture, Either},
48
    task::{Context, Poll},
49
    FutureExt,
50
    SinkExt,
51
    StreamExt,
52
};
53
use log::*;
54
use prost::Message;
55
use tari_shutdown::{oneshot_trigger::OneshotSignal, Shutdown, ShutdownSignal};
56
use tokio::{
57
    io::{AsyncRead, AsyncWrite},
58
    sync::{mpsc, oneshot, watch, Mutex},
59
    time,
60
};
61
use tower::{Service, ServiceExt};
62
use tracing::{span, Instrument, Level};
63

64
use super::message::RpcMethod;
65
use crate::{
66
    framing::CanonicalFraming,
67
    message::MessageExt,
68
    peer_manager::NodeId,
69
    proto,
70
    protocol::{
71
        rpc,
72
        rpc::{
73
            body::ClientStreaming,
74
            message::{BaseRequest, RpcMessageFlags},
75
            Handshake,
76
            NamedProtocolService,
77
            Response,
78
            RpcError,
79
            RpcServerError,
80
            RpcStatus,
81
        },
82
        ProtocolId,
83
    },
84
    stream_id,
85
    stream_id::StreamId,
86
};
87

88
const LOG_TARGET: &str = "comms::rpc::client";
89

90
#[derive(Clone)]
91
pub struct RpcClient {
92
    connector: ClientConnector,
93
}
94

95
impl RpcClient {
96
    pub fn builder<T>() -> RpcClientBuilder<T>
5✔
97
    where T: NamedProtocolService {
5✔
98
        RpcClientBuilder::new().with_protocol_id(T::PROTOCOL_NAME.into())
5✔
99
    }
5✔
100

101
    /// Create a new RpcClient using the given framed substream and perform the RPC handshake.
102
    pub async fn connect<TSubstream>(
92✔
103
        config: RpcClientConfig,
92✔
104
        node_id: NodeId,
92✔
105
        framed: CanonicalFraming<TSubstream>,
92✔
106
        protocol_name: ProtocolId,
92✔
107
        terminate_signal: Option<OneshotSignal<NodeId>>,
92✔
108
        session_state: Arc<AtomicBool>,
92✔
109
    ) -> Result<Self, RpcError>
92✔
110
    where
92✔
111
        TSubstream: AsyncRead + AsyncWrite + Unpin + Send + StreamId + 'static,
92✔
112
    {
92✔
113
        trace!(target: LOG_TARGET,"connect to {node_id:?} with {config:?}");
92✔
114
        let (request_tx, request_rx) = mpsc::channel(1);
92✔
115
        let shutdown = Shutdown::new();
92✔
116
        let shutdown_signal = shutdown.to_signal();
92✔
117
        let (last_request_latency_tx, last_request_latency_rx) = watch::channel(None);
92✔
118
        let connector = ClientConnector::new(request_tx, last_request_latency_rx, shutdown);
92✔
119
        let (ready_tx, ready_rx) = oneshot::channel();
92✔
120
        let tracing_id = tracing::Span::current().id();
92✔
121

122
        tokio::spawn({
123
            let span = span!(Level::TRACE, "start_rpc_worker");
92✔
124
            span.follows_from(tracing_id);
92✔
125

92✔
126
            RpcClientWorker::new(
92✔
127
                config,
92✔
128
                node_id,
92✔
129
                request_rx,
92✔
130
                last_request_latency_tx,
92✔
131
                framed,
92✔
132
                ready_tx,
92✔
133
                protocol_name,
92✔
134
                shutdown_signal,
92✔
135
                terminate_signal,
92✔
136
                session_state,
92✔
137
            )
92✔
138
            .run()
92✔
139
            .instrument(span)
92✔
140
        });
92✔
141
        ready_rx
92✔
142
            .await
92✔
143
            .expect("ready_rx oneshot is never dropped without a reply")?;
92✔
144
        Ok(Self { connector })
85✔
145
    }
92✔
146

147
    /// Perform a single request and single response
148
    pub async fn request_response<T, R, M>(&mut self, request: T, method: M) -> Result<R, RpcError>
38✔
149
    where
38✔
150
        T: prost::Message,
38✔
151
        R: prost::Message + Default + std::fmt::Debug,
38✔
152
        M: Into<RpcMethod>,
38✔
153
    {
38✔
154
        let req_bytes = request.to_encoded_bytes();
38✔
155
        let request = BaseRequest::new(method.into(), req_bytes.into());
38✔
156

157
        let mut resp = self.call_inner(request).await?;
38✔
158
        let resp = resp.recv().await.ok_or(RpcError::ServerClosedRequest)??;
34✔
159
        let resp = R::decode(resp.into_message())?;
23✔
160

161
        Ok(resp)
23✔
162
    }
38✔
163

164
    /// Perform a single request and streaming response
165
    pub async fn server_streaming<T, M, R>(&mut self, request: T, method: M) -> Result<ClientStreaming<R>, RpcError>
100✔
166
    where
100✔
167
        T: prost::Message,
100✔
168
        R: prost::Message + Default,
100✔
169
        M: Into<RpcMethod>,
100✔
170
    {
100✔
171
        let req_bytes = request.to_encoded_bytes();
100✔
172
        let request = BaseRequest::new(method.into(), req_bytes.into());
100✔
173

174
        let resp = self.call_inner(request).await?;
100✔
175

176
        Ok(ClientStreaming::new(resp))
100✔
177
    }
100✔
178

179
    /// Close the RPC session. Any subsequent calls will error.
180
    pub async fn close(&mut self) {
3✔
181
        self.connector.close().await;
3✔
182
    }
3✔
183

184
    pub fn is_connected(&self) -> bool {
113✔
185
        self.connector.is_connected()
113✔
186
    }
113✔
187

188
    /// Return the latency of the last request
189
    pub fn get_last_request_latency(&mut self) -> Option<Duration> {
2✔
190
        self.connector.get_last_request_latency()
2✔
191
    }
2✔
192

193
    /// Sends a ping and returns the latency
194
    pub fn ping(&mut self) -> impl Future<Output = Result<Duration, RpcError>> + '_ {
1✔
195
        self.connector.send_ping()
1✔
196
    }
1✔
197

198
    async fn call_inner(
138✔
199
        &mut self,
138✔
200
        request: BaseRequest<Bytes>,
138✔
201
    ) -> Result<mpsc::Receiver<Result<Response<Bytes>, RpcStatus>>, RpcError> {
138✔
202
        let svc = self.connector.ready().await?;
138✔
203
        let resp = svc.call(request).await?;
138✔
204
        Ok(resp)
134✔
205
    }
138✔
206
}
207

208
impl fmt::Debug for RpcClient {
209
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
×
210
        write!(f, "RpcClient {{ inner: ... }}")
×
211
    }
×
212
}
213

214
#[derive(Debug, Clone)]
215
pub struct RpcClientBuilder<TClient> {
216
    config: RpcClientConfig,
217
    protocol_id: Option<ProtocolId>,
218
    node_id: Option<NodeId>,
219
    terminate_signal: Option<OneshotSignal<NodeId>>,
220
    session_state: Option<Arc<AtomicBool>>,
221
    _client: PhantomData<TClient>,
222
}
223

224
impl<TClient> Default for RpcClientBuilder<TClient> {
225
    fn default() -> Self {
47✔
226
        Self {
47✔
227
            config: Default::default(),
47✔
228
            protocol_id: None,
47✔
229
            node_id: None,
47✔
230
            terminate_signal: None,
47✔
231
            session_state: None,
47✔
232
            _client: PhantomData,
47✔
233
        }
47✔
234
    }
47✔
235
}
236

237
impl<TClient> RpcClientBuilder<TClient> {
238
    pub fn new() -> Self {
19✔
239
        Default::default()
19✔
240
    }
19✔
241

242
    /// The deadline to send to the peer when performing a request.
243
    /// If this deadline is exceeded, the server SHOULD abandon the request.
244
    /// The client will return a timeout error if the deadline plus the grace period is exceeded.
245
    ///
246
    /// _Note: That is the deadline is set too low, the responding peer MAY immediately reject the request.
247
    ///
248
    /// Default: 100s
249
    pub fn with_deadline(mut self, timeout: Duration) -> Self {
17✔
250
        self.config.deadline = Some(timeout);
17✔
251
        self
17✔
252
    }
17✔
253

254
    /// Sets the grace period to allow after the configured deadline before giving up and timing out.
255
    /// This configuration should be set to comfortably account for the latency experienced during requests.
256
    ///
257
    /// Default: 10 seconds
258
    pub fn with_deadline_grace_period(mut self, timeout: Duration) -> Self {
2✔
259
        self.config.deadline_grace_period = timeout;
2✔
260
        self
2✔
261
    }
2✔
262

263
    /// Set the length of time that the client will wait for a response in the RPC handshake before returning a timeout
264
    /// error.
265
    /// Default: 15 seconds
266
    pub fn with_handshake_timeout(mut self, timeout: Duration) -> Self {
1✔
267
        self.config.handshake_timeout = timeout;
1✔
268
        self
1✔
269
    }
1✔
270

271
    /// Set the protocol ID associated with this client. This is used for logging purposes only.
272
    pub fn with_protocol_id(mut self, protocol_id: ProtocolId) -> Self {
94✔
273
        self.protocol_id = Some(protocol_id);
94✔
274
        self
94✔
275
    }
94✔
276

277
    /// Set the node_id for logging/metrics purposes
278
    pub fn with_node_id(mut self, node_id: NodeId) -> Self {
75✔
279
        self.node_id = Some(node_id);
75✔
280
        self
75✔
281
    }
75✔
282

283
    /// Set a signal that indicates if this client should be immediately closed
284
    pub fn with_terminate_signal(mut self, terminate_signal: OneshotSignal<NodeId>) -> Self {
75✔
285
        self.terminate_signal = Some(terminate_signal);
75✔
286
        self
75✔
287
    }
75✔
288

289
    /// Set a bool that can be set to false when this client terminates
290
    pub fn with_session_state(mut self, session_state: Arc<AtomicBool>) -> Self {
75✔
291
        self.session_state = Some(session_state);
75✔
292
        self
75✔
293
    }
75✔
294
}
295

296
impl<TClient> RpcClientBuilder<TClient>
297
where TClient: From<RpcClient> + NamedProtocolService
298
{
299
    /// Negotiates and establishes a session to the peer's RPC service
300
    pub async fn connect<TSubstream>(self, framed: CanonicalFraming<TSubstream>) -> Result<TClient, RpcError>
89✔
301
    where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + StreamId + 'static {
89✔
302
        RpcClient::connect(
89✔
303
            self.config,
89✔
304
            self.node_id.unwrap_or_default(),
89✔
305
            framed,
89✔
306
            self.protocol_id
89✔
307
                .as_ref()
89✔
308
                .cloned()
89✔
309
                .unwrap_or_else(|| ProtocolId::from_static(TClient::PROTOCOL_NAME)),
89✔
310
            self.terminate_signal,
89✔
311
            self.session_state.unwrap_or(Arc::new(AtomicBool::new(true))),
89✔
312
        )
89✔
313
        .await
89✔
314
        .map(Into::into)
89✔
315
    }
89✔
316
}
317

318
#[derive(Debug, Clone, Copy)]
319
pub struct RpcClientConfig {
320
    pub deadline: Option<Duration>,
321
    pub deadline_grace_period: Duration,
322
    pub handshake_timeout: Duration,
323
}
324

325
impl RpcClientConfig {
326
    /// Returns the timeout including the configured grace period
327
    pub fn timeout_with_grace_period(&self) -> Option<Duration> {
103,091✔
328
        self.deadline.map(|d| d + self.deadline_grace_period)
103,091✔
329
    }
103,091✔
330

331
    /// Returns the handshake timeout
332
    pub fn handshake_timeout(&self) -> Duration {
92✔
333
        self.handshake_timeout
92✔
334
    }
92✔
335
}
336

337
impl Default for RpcClientConfig {
338
    fn default() -> Self {
50✔
339
        Self {
50✔
340
            deadline: Some(Duration::from_secs(120)),
50✔
341
            deadline_grace_period: Duration::from_secs(60),
50✔
342
            handshake_timeout: Duration::from_secs(90),
50✔
343
        }
50✔
344
    }
50✔
345
}
346

347
#[derive(Clone)]
348
pub struct ClientConnector {
349
    inner: mpsc::Sender<ClientRequest>,
350
    last_request_latency_rx: watch::Receiver<Option<Duration>>,
351
    shutdown: Arc<Mutex<Shutdown>>,
352
}
353

354
impl ClientConnector {
355
    pub(self) fn new(
92✔
356
        sender: mpsc::Sender<ClientRequest>,
92✔
357
        last_request_latency_rx: watch::Receiver<Option<Duration>>,
92✔
358
        shutdown: Shutdown,
92✔
359
    ) -> Self {
92✔
360
        Self {
92✔
361
            inner: sender,
92✔
362
            last_request_latency_rx,
92✔
363
            shutdown: Arc::new(Mutex::new(shutdown)),
92✔
364
        }
92✔
365
    }
92✔
366

367
    pub async fn close(&mut self) {
3✔
368
        let mut lock = self.shutdown.lock().await;
3✔
369
        lock.trigger();
3✔
370
    }
3✔
371

372
    pub fn get_last_request_latency(&mut self) -> Option<Duration> {
2✔
373
        *self.last_request_latency_rx.borrow()
2✔
374
    }
2✔
375

376
    pub async fn send_ping(&mut self) -> Result<Duration, RpcError> {
1✔
377
        let (reply, reply_rx) = oneshot::channel();
1✔
378
        self.inner
1✔
379
            .send(ClientRequest::SendPing(reply))
1✔
380
            .await
1✔
381
            .map_err(|_| RpcError::ClientClosed)?;
1✔
382

383
        let latency = reply_rx.await.map_err(|_| RpcError::RequestCancelled)??;
1✔
384
        Ok(latency)
1✔
385
    }
1✔
386

387
    pub fn is_connected(&self) -> bool {
113✔
388
        !self.inner.is_closed()
113✔
389
    }
113✔
390
}
391

392
impl fmt::Debug for ClientConnector {
393
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
×
394
        write!(f, "ClientConnector {{ inner: ... }}")
×
395
    }
×
396
}
397

398
impl Service<BaseRequest<Bytes>> for ClientConnector {
399
    type Error = RpcError;
400
    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
401
    type Response = mpsc::Receiver<Result<Response<Bytes>, RpcStatus>>;
402

403
    fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
138✔
404
        Poll::Ready(Ok(()))
138✔
405
    }
138✔
406

407
    fn call(&mut self, request: BaseRequest<Bytes>) -> Self::Future {
138✔
408
        let (reply, reply_rx) = oneshot::channel();
138✔
409
        let inner = self.inner.clone();
138✔
410
        async move {
138✔
411
            inner
138✔
412
                .send(ClientRequest::SendRequest { request, reply })
138✔
413
                .await
138✔
414
                .map_err(|_| RpcError::ClientClosed)?;
138✔
415

416
            reply_rx.await.map_err(|_| RpcError::RequestCancelled)
135✔
417
        }
138✔
418
        .boxed()
138✔
419
    }
138✔
420
}
421

422
struct RpcClientWorker<TSubstream> {
423
    config: RpcClientConfig,
424
    node_id: NodeId,
425
    request_rx: mpsc::Receiver<ClientRequest>,
426
    last_request_latency_tx: watch::Sender<Option<Duration>>,
427
    framed: CanonicalFraming<TSubstream>,
428
    // Request ids are limited to u16::MAX because varint encoding is used over the wire and the magnitude of the value
429
    // sent determines the byte size. A u16 will be more than enough for the purpose
430
    next_request_id: u16,
431
    ready_tx: Option<oneshot::Sender<Result<(), RpcError>>>,
432
    protocol_id: ProtocolId,
433
    shutdown_signal: ShutdownSignal,
434
    terminate_signal: Option<OneshotSignal<NodeId>>,
435
    session_state: Arc<AtomicBool>,
436
}
437

438
impl<TSubstream> RpcClientWorker<TSubstream>
439
where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + StreamId
440
{
441
    pub(self) fn new(
92✔
442
        config: RpcClientConfig,
92✔
443
        node_id: NodeId,
92✔
444
        request_rx: mpsc::Receiver<ClientRequest>,
92✔
445
        last_request_latency_tx: watch::Sender<Option<Duration>>,
92✔
446
        framed: CanonicalFraming<TSubstream>,
92✔
447
        ready_tx: oneshot::Sender<Result<(), RpcError>>,
92✔
448
        protocol_id: ProtocolId,
92✔
449
        shutdown_signal: ShutdownSignal,
92✔
450
        terminate_signal: Option<OneshotSignal<NodeId>>,
92✔
451
        session_state: Arc<AtomicBool>,
92✔
452
    ) -> Self {
92✔
453
        Self {
92✔
454
            config,
92✔
455
            node_id,
92✔
456
            request_rx,
92✔
457
            framed,
92✔
458
            next_request_id: 0,
92✔
459
            ready_tx: Some(ready_tx),
92✔
460
            last_request_latency_tx,
92✔
461
            protocol_id,
92✔
462
            shutdown_signal,
92✔
463
            terminate_signal,
92✔
464
            session_state,
92✔
465
        }
92✔
466
    }
92✔
467

468
    fn protocol_name(&self) -> Cow<'_, str> {
103,088✔
469
        String::from_utf8_lossy(&self.protocol_id)
103,088✔
470
    }
103,088✔
471

472
    fn stream_id(&self) -> stream_id::Id {
103,088✔
473
        self.framed.stream_id()
103,088✔
474
    }
103,088✔
475

476
    #[allow(clippy::too_many_lines)]
477
    async fn run(mut self) {
92✔
478
        debug!(
92✔
479
            target: LOG_TARGET,
×
480
            "(stream={}) Performing client handshake for '{}'",
×
481
            self.stream_id(),
×
482
            self.protocol_name()
×
483
        );
484
        let start = Instant::now();
92✔
485
        let mut handshake = Handshake::new(&mut self.framed).with_timeout(self.config.handshake_timeout());
92✔
486
        match handshake.perform_client_handshake().await {
92✔
487
            Ok(_) => {
488
                let latency = start.elapsed();
85✔
489
                debug!(
85✔
490
                    target: LOG_TARGET,
×
491
                    "(stream={}) RPC Session ({}) negotiation completed. Latency: {:.0?}",
×
492
                    self.stream_id(),
×
493
                    self.protocol_name(),
×
494
                    latency
495
                );
496
                let _ = self.last_request_latency_tx.send(Some(latency));
85✔
497
                if let Some(r) = self.ready_tx.take() {
85✔
498
                    let _result = r.send(Ok(()));
85✔
499
                }
85✔
500
                #[cfg(feature = "metrics")]
501
                metrics::handshake_counter(&self.protocol_id).inc();
85✔
502
            },
503
            Err(err) => {
7✔
504
                #[cfg(feature = "metrics")]
7✔
505
                metrics::handshake_errors(&self.protocol_id).inc();
7✔
506
                if let Some(r) = self.ready_tx.take() {
7✔
507
                    let _result = r.send(Err(err.into()));
7✔
508
                }
7✔
509

510
                return;
7✔
511
            },
512
        }
513

514
        let mut terminate_signal = self
85✔
515
            .terminate_signal
85✔
516
            .take()
85✔
517
            .map(|f| f.boxed())
85✔
518
            .unwrap_or_else(|| future::pending::<Option<NodeId>>().boxed());
85✔
519

85✔
520
        #[cfg(feature = "metrics")]
85✔
521
        metrics::num_sessions(&self.protocol_id).inc();
85✔
522
        loop {
523
            tokio::select! {
215✔
524
                // Check the futures in the order they are listed
525
                biased;
526
                _ = &mut self.shutdown_signal => {
215✔
527
                    break;
46✔
528
                }
529
                node_id = &mut terminate_signal => {
215✔
530
                    debug!(
3✔
531
                        target: LOG_TARGET, "(stream={}) Peer '{}' connection has dropped. Worker is terminating.",
×
532
                        self.stream_id(), node_id.unwrap_or_default()
×
533
                    );
534
                    break;
3✔
535
                }
536
                req = self.request_rx.recv() => {
215✔
537
                    match req {
135✔
538
                        Some(req) => {
135✔
539
                            if let Err(err) = self.handle_request(req).await {
135✔
540
                                #[cfg(feature = "metrics")]
541
                                metrics::client_errors(&self.protocol_id).inc();
4✔
542
                                info!(
4✔
543
                                    target: LOG_TARGET,
×
544
                                    "(stream={}) Unexpected error: {}. Worker is terminating.",
×
545
                                    self.stream_id(), err
×
546
                                );
547
                                break;
4✔
548
                            }
130✔
549
                        }
550
                        None => {
551
                            debug!(
×
552
                                target: LOG_TARGET,
×
553
                                "(stream={}) Request channel closed. Worker is terminating.",
×
554
                                self.stream_id()
×
555
                            );
556
                            break
×
557
                        },
558
                    }
559
                }
560
            }
561
        }
562
        #[cfg(feature = "metrics")]
563
        metrics::num_sessions(&self.protocol_id).dec();
53✔
564

53✔
565
        let session_state = self.session_state.as_ref();
53✔
566
        session_state.store(false, Ordering::Relaxed);
53✔
567

568
        if let Err(err) = self.framed.close().await {
53✔
569
            debug!(
3✔
570
                target: LOG_TARGET,
×
571
                "(stream: {}, peer: {}) IO Error when closing substream: {}",
×
572
                self.stream_id(),
×
573
                self.node_id,
574
                err
575
            );
576
        }
50✔
577

578
        debug!(
53✔
579
            target: LOG_TARGET,
×
580
            "(stream: {}, peer: {}) RpcClientWorker ({}) terminated.",
×
581
            self.stream_id(),
×
582
            self.node_id,
×
583
            self.protocol_name()
×
584
        );
585
    }
60✔
586

587
    async fn handle_request(&mut self, req: ClientRequest) -> Result<(), RpcError> {
135✔
588
        use ClientRequest::{SendPing, SendRequest};
589
        match req {
135✔
590
            SendRequest { request, reply } => {
134✔
591
                self.do_request_response(request, reply).await?;
134✔
592
            },
593
            SendPing(reply) => {
1✔
594
                self.do_ping_pong(reply).await?;
1✔
595
            },
596
        }
597
        Ok(())
130✔
598
    }
134✔
599

600
    async fn do_ping_pong(&mut self, reply: oneshot::Sender<Result<Duration, RpcStatus>>) -> Result<(), RpcError> {
1✔
601
        let ack = proto::rpc::RpcRequest {
1✔
602
            flags: u32::from(RpcMessageFlags::ACK.bits()),
1✔
603
            deadline: self.config.deadline.map(|t| t.as_secs()).unwrap_or(0),
1✔
604
            ..Default::default()
1✔
605
        };
1✔
606

1✔
607
        let start = Instant::now();
1✔
608
        self.framed.send(ack.to_encoded_bytes().into()).await?;
1✔
609

610
        trace!(
1✔
611
            target: LOG_TARGET,
×
612
            "(stream={}) Ping (protocol {}) sent in {:.2?}",
×
613
            self.stream_id(),
×
614
            self.protocol_name(),
×
615
            start.elapsed()
×
616
        );
617
        let mut reader = RpcResponseReader::new(&mut self.framed, self.config, 0);
1✔
618
        let resp = match reader.read_ack().await {
1✔
619
            Ok(resp) => resp,
1✔
620
            Err(RpcError::ReplyTimeout) => {
621
                debug!(
×
622
                    target: LOG_TARGET,
×
623
                    "(stream={}) Ping timed out after {:.0?}",
×
624
                    self.stream_id(),
×
625
                    start.elapsed()
×
626
                );
627
                #[cfg(feature = "metrics")]
628
                metrics::client_timeouts(&self.protocol_id).inc();
×
629
                let _result = reply.send(Err(RpcStatus::timed_out("Response timed out")));
×
630
                return Ok(());
×
631
            },
632
            Err(err) => return Err(err),
×
633
        };
634

635
        let status = RpcStatus::from(&resp);
1✔
636
        if !status.is_ok() {
1✔
637
            let _result = reply.send(Err(status.clone()));
×
638
            return Err(status.into());
×
639
        }
1✔
640

641
        let resp_flags =
1✔
642
            RpcMessageFlags::from_bits(u8::try_from(resp.flags).map_err(|_| {
1✔
643
                RpcStatus::protocol_error(&format!("invalid message flag: must be less than {}", u8::MAX))
×
644
            })?)
1✔
645
            .ok_or(RpcStatus::protocol_error(&format!(
1✔
646
                "invalid message flag, does not match any flags ({})",
1✔
647
                resp.flags
1✔
648
            )))?;
1✔
649
        if !resp_flags.contains(RpcMessageFlags::ACK) {
1✔
650
            warn!(
×
651
                target: LOG_TARGET,
×
652
                "(stream={}) Invalid ping response {:?}",
×
653
                self.stream_id(),
×
654
                resp
655
            );
656
            let _result = reply.send(Err(RpcStatus::protocol_error(&format!(
×
657
                "Received invalid ping response on protocol '{}'",
×
658
                self.protocol_name()
×
659
            ))));
×
660
            return Err(RpcError::InvalidPingResponse);
×
661
        }
1✔
662

1✔
663
        let _result = reply.send(Ok(start.elapsed()));
1✔
664
        Ok(())
1✔
665
    }
1✔
666

667
    #[allow(clippy::too_many_lines)]
668
    async fn do_request_response(
134✔
669
        &mut self,
134✔
670
        request: BaseRequest<Bytes>,
134✔
671
        reply: oneshot::Sender<mpsc::Receiver<Result<Response<Bytes>, RpcStatus>>>,
134✔
672
    ) -> Result<(), RpcError> {
134✔
673
        #[cfg(feature = "metrics")]
134✔
674
        metrics::outbound_request_bytes(&self.protocol_id).observe(request.get_ref().len() as f64);
134✔
675

134✔
676
        let request_id = self.next_request_id();
134✔
677
        let method = request.method.into();
134✔
678
        let req = proto::rpc::RpcRequest {
134✔
679
            request_id: u32::from(request_id),
134✔
680
            method,
134✔
681
            deadline: self.config.deadline.map(|t| t.as_secs()).unwrap_or(0),
134✔
682
            flags: 0,
134✔
683
            payload: request.message.to_vec(),
134✔
684
        };
134✔
685

134✔
686
        trace!(target: LOG_TARGET, "Sending request: {req}");
134✔
687

688
        if reply.is_closed() {
134✔
689
            warn!(
×
690
                target: LOG_TARGET,
×
691
                "Client request was cancelled before request was sent"
×
692
            );
693
        }
134✔
694

695
        let (response_tx, response_rx) = mpsc::channel(5);
134✔
696
        if let Err(mut rx) = reply.send(response_rx) {
134✔
697
            warn!(
×
698
                target: LOG_TARGET,
×
699
                "Client request was cancelled after request was sent. This means that you are making an RPC request \
×
700
                 and then immediately dropping the response! (protocol = {})",
×
701
                self.protocol_name(),
×
702
            );
703
            rx.close();
×
704
            return Ok(());
×
705
        }
134✔
706

134✔
707
        #[cfg(feature = "metrics")]
134✔
708
        let latency = metrics::request_response_latency(&self.protocol_id);
134✔
709
        #[cfg(feature = "metrics")]
134✔
710
        let mut metrics_timer = Some(latency.start_timer());
134✔
711

134✔
712
        let timer = Instant::now();
134✔
713
        if let Err(err) = self.send_request(req).await {
134✔
714
            warn!(target: LOG_TARGET, "{err}");
3✔
715
            #[cfg(feature = "metrics")]
716
            metrics::client_errors(&self.protocol_id).inc();
3✔
717
            let _result = response_tx.send(Err(err.into())).await;
3✔
718
            return Ok(());
3✔
719
        }
131✔
720
        let partial_latency = timer.elapsed();
131✔
721

722
        loop {
723
            if self.shutdown_signal.is_triggered() {
103,088✔
724
                debug!(
×
725
                    target: LOG_TARGET,
×
726
                    "[peer: {}, protocol: {}, stream_id: {}, req_id: {}] Client connector closed. Quitting stream \
×
727
                     early",
×
728
                    self.node_id,
×
729
                    self.protocol_name(),
×
730
                    self.stream_id(),
×
731
                    request_id
732
                );
733
                break;
×
734
            }
103,088✔
735

736
            // Check if the response receiver has been dropped while receiving messages
737
            let resp_result = {
103,087✔
738
                let resp_fut = self.read_response(request_id);
103,088✔
739
                tokio::pin!(resp_fut);
103,088✔
740
                let closed_fut = response_tx.closed();
103,088✔
741
                tokio::pin!(closed_fut);
103,088✔
742
                match future::select(resp_fut, closed_fut).await {
103,088✔
743
                    Either::Left((r, _)) => Some(r),
103,084✔
744
                    Either::Right(_) => None,
3✔
745
                }
746
            };
747
            let resp_result = match resp_result {
103,087✔
748
                Some(r) => r,
103,084✔
749
                None => {
750
                    self.premature_close(request_id, method).await?;
3✔
751
                    break;
3✔
752
                },
753
            };
754

755
            let resp = match resp_result {
103,079✔
756
                Ok((resp, time_to_first_msg)) => {
103,079✔
757
                    if let Some(t) = time_to_first_msg {
103,079✔
758
                        let _ = self.last_request_latency_tx.send(Some(partial_latency + t));
103,079✔
759
                    }
103,079✔
760
                    trace!(
103,079✔
761
                        target: LOG_TARGET,
×
762
                        "Received response ({} byte(s)) from request #{} (protocol = {}, method={})",
×
763
                        resp.payload.len(),
×
764
                        request_id,
×
765
                        self.protocol_name(),
×
766
                        method,
767
                    );
768

769
                    #[cfg(feature = "metrics")]
770
                    if let Some(t) = metrics_timer.take() {
103,079✔
771
                        t.observe_duration();
125✔
772
                    }
102,954✔
773
                    resp
103,079✔
774
                },
775
                Err(RpcError::ReplyTimeout) => {
776
                    debug!(
1✔
777
                        target: LOG_TARGET,
×
778
                        "Request {request_id} (method={method}) timed out"
×
779
                    );
780
                    #[cfg(feature = "metrics")]
781
                    metrics::client_timeouts(&self.protocol_id).inc();
1✔
782
                    if response_tx.is_closed() {
1✔
783
                        self.premature_close(request_id, method).await?;
×
784
                    } else {
785
                        let _result = response_tx.send(Err(RpcStatus::timed_out("Response timed out"))).await;
1✔
786
                    }
787
                    break;
1✔
788
                },
789
                Err(RpcError::ClientClosed) => {
790
                    debug!(
×
791
                        target: LOG_TARGET,
×
792
                        "Request {request_id} (method={method}) was closed (read_reply)"
×
793
                    );
794
                    self.request_rx.close();
×
795
                    break;
×
796
                },
797
                Err(err) => {
4✔
798
                    return Err(err);
4✔
799
                },
800
            };
801

802
            match Self::convert_to_result(resp) {
103,079✔
803
                Ok(Ok(resp)) => {
103,074✔
804
                    let is_finished = resp.is_finished();
103,074✔
805
                    // The consumer may drop the receiver before all responses are received.
103,074✔
806
                    // We handle this by sending a 'FIN' message to the server.
103,074✔
807
                    if response_tx.is_closed() {
103,074✔
808
                        self.premature_close(request_id, method).await?;
×
809
                        break;
×
810
                    } else {
811
                        let _result = response_tx.send(Ok(resp)).await;
103,074✔
812
                    }
813
                    if is_finished {
103,074✔
814
                        break;
117✔
815
                    }
102,957✔
816
                },
817
                Ok(Err(err)) => {
5✔
818
                    debug!(target: LOG_TARGET, "Remote service returned error: {err}");
5✔
819
                    if !response_tx.is_closed() {
5✔
820
                        let _result = response_tx.send(Err(err)).await;
5✔
821
                    }
×
822
                    break;
5✔
823
                },
824
                Err(err @ RpcError::ResponseIdDidNotMatchRequest { .. }) |
×
825
                Err(err @ RpcError::UnexpectedAckResponse) => {
×
826
                    warn!(target: LOG_TARGET, "{err}");
×
827
                    // Ignore the response, this can happen when there is excessive latency. The server sends back a
828
                    // reply before the deadline but it is only received after the client has timed
829
                    // out
830
                    continue;
×
831
                },
832
                Err(err) => return Err(err),
×
833
            }
834
        }
835

836
        Ok(())
126✔
837
    }
133✔
838

839
    async fn premature_close(&mut self, request_id: u16, method: u32) -> Result<(), RpcError> {
3✔
840
        info!(
3✔
841
            target: LOG_TARGET,
×
842
            "(stream={}) Response receiver was dropped before the response/stream could complete for protocol {}, \
×
843
             interrupting the stream. ",
×
844
            self.stream_id(),
×
845
            self.protocol_name()
×
846
        );
847
        let req = proto::rpc::RpcRequest {
3✔
848
            request_id: u32::from(request_id),
3✔
849
            method,
3✔
850
            flags: RpcMessageFlags::FIN.bits().into(),
3✔
851
            deadline: self.config.deadline.map(|d| d.as_secs()).unwrap_or(0),
3✔
852
            ..Default::default()
3✔
853
        };
3✔
854

855
        // If we cannot set FIN quickly, just exit
856
        if let Ok(res) = time::timeout(Duration::from_secs(2), self.send_request(req)).await {
3✔
857
            res?;
3✔
858
        }
×
859
        Ok(())
3✔
860
    }
3✔
861

862
    async fn send_request(&mut self, req: proto::rpc::RpcRequest) -> Result<(), RpcError> {
137✔
863
        let payload = req.to_encoded_bytes();
137✔
864
        if payload.len() > rpc::max_request_size() {
137✔
865
            return Err(RpcError::MaxRequestSizeExceeded {
×
866
                got: payload.len(),
×
867
                expected: rpc::max_request_size(),
×
868
            });
×
869
        }
137✔
870
        self.framed.send(payload.into()).await?;
137✔
871
        Ok(())
134✔
872
    }
137✔
873

874
    async fn read_response(
103,088✔
875
        &mut self,
103,088✔
876
        request_id: u16,
103,088✔
877
    ) -> Result<(proto::rpc::RpcResponse, Option<Duration>), RpcError> {
103,088✔
878
        let stream_id = self.stream_id();
103,088✔
879
        let protocol_name = self.protocol_name().to_string();
103,088✔
880

103,088✔
881
        let mut reader = RpcResponseReader::new(&mut self.framed, self.config, request_id);
103,088✔
882
        let mut num_ignored = 0;
103,088✔
883
        let resp = loop {
103,079✔
884
            match reader.read_response().await {
103,090✔
885
                Ok(resp) => {
103,079✔
886
                    trace!(
103,079✔
887
                        target: LOG_TARGET,
×
888
                        "(stream: {}, {}) Received body len = {}",
×
889
                        stream_id,
×
890
                        protocol_name,
×
891
                        reader.bytes_read()
×
892
                    );
893
                    #[cfg(feature = "metrics")]
894
                    metrics::inbound_response_bytes(&self.protocol_id).observe(reader.bytes_read() as f64);
103,079✔
895
                    let time_to_first_msg = reader.time_to_first_msg();
103,079✔
896
                    break (resp, time_to_first_msg);
103,079✔
897
                },
898
                Err(RpcError::ResponseIdDidNotMatchRequest { actual, expected })
2✔
899
                    if actual.wrapping_add(1) == request_id =>
2✔
900
                {
2✔
901
                    warn!(
2✔
902
                        target: LOG_TARGET,
×
903
                        "Possible delayed response received for previous request {actual}"
×
904
                    );
905
                    num_ignored += 1;
2✔
906

907
                    // Be lenient for a number of messages that may have been buffered to come through for the previous
908
                    // request.
909
                    const MAX_ALLOWED_IGNORED: usize = 20;
910
                    if num_ignored > MAX_ALLOWED_IGNORED {
2✔
911
                        return Err(RpcError::ResponseIdDidNotMatchRequest { actual, expected });
×
912
                    }
2✔
913
                    continue;
2✔
914
                },
915
                Err(err) => return Err(err),
5✔
916
            }
917
        };
918
        Ok(resp)
103,079✔
919
    }
103,084✔
920

921
    fn next_request_id(&mut self) -> u16 {
134✔
922
        let mut next_id = self.next_request_id;
134✔
923
        // request_id is allowed to wrap around back to 0
134✔
924
        self.next_request_id = self.next_request_id.wrapping_add(1);
134✔
925
        // We dont want request id of zero because that is the default for varint on protobuf, so it is possible for the
134✔
926
        // entire message to be zero bytes (WriteZero IO error)
134✔
927
        if next_id == 0 {
134✔
928
            next_id += 1;
67✔
929
            self.next_request_id += 1;
67✔
930
        }
67✔
931
        next_id
134✔
932
    }
134✔
933

934
    fn convert_to_result(resp: proto::rpc::RpcResponse) -> Result<Result<Response<Bytes>, RpcStatus>, RpcError> {
103,079✔
935
        let status = RpcStatus::from(&resp);
103,079✔
936
        if !status.is_ok() {
103,079✔
937
            return Ok(Err(status));
5✔
938
        }
103,074✔
939
        let flags = match resp.flags() {
103,074✔
940
            Ok(flags) => flags,
103,074✔
941
            Err(e) => return Ok(Err(RpcError::ServerError(RpcServerError::ProtocolError(e)).into())),
×
942
        };
943
        let resp = Response {
103,074✔
944
            flags,
103,074✔
945
            payload: resp.payload.into(),
103,074✔
946
        };
103,074✔
947

103,074✔
948
        Ok(Ok(resp))
103,074✔
949
    }
103,079✔
950
}
951

952
pub enum ClientRequest {
953
    SendRequest {
954
        request: BaseRequest<Bytes>,
955
        reply: oneshot::Sender<mpsc::Receiver<Result<Response<Bytes>, RpcStatus>>>,
956
    },
957
    SendPing(oneshot::Sender<Result<Duration, RpcStatus>>),
958
}
959

960
struct RpcResponseReader<'a, TSubstream> {
961
    framed: &'a mut CanonicalFraming<TSubstream>,
962
    config: RpcClientConfig,
963
    request_id: u16,
964
    bytes_read: usize,
965
    time_to_first_msg: Option<Duration>,
966
}
967

968
impl<'a, TSubstream> RpcResponseReader<'a, TSubstream>
969
where TSubstream: AsyncRead + AsyncWrite + Unpin
970
{
971
    pub fn new(framed: &'a mut CanonicalFraming<TSubstream>, config: RpcClientConfig, request_id: u16) -> Self {
103,089✔
972
        Self {
103,089✔
973
            framed,
103,089✔
974
            config,
103,089✔
975
            request_id,
103,089✔
976
            bytes_read: 0,
103,089✔
977
            time_to_first_msg: None,
103,089✔
978
        }
103,089✔
979
    }
103,089✔
980

981
    pub fn bytes_read(&self) -> usize {
103,079✔
982
        self.bytes_read
103,079✔
983
    }
103,079✔
984

985
    pub fn time_to_first_msg(&self) -> Option<Duration> {
103,079✔
986
        self.time_to_first_msg
103,079✔
987
    }
103,079✔
988

989
    pub async fn read_response(&mut self) -> Result<proto::rpc::RpcResponse, RpcError> {
103,090✔
990
        let timer = Instant::now();
103,090✔
991
        let resp = self.next().await?;
103,090✔
992
        self.time_to_first_msg = Some(timer.elapsed());
103,081✔
993
        self.check_response(&resp)?;
103,081✔
994
        self.bytes_read = resp.payload.len();
103,079✔
995
        trace!(
103,079✔
996
            target: LOG_TARGET,
×
997
            "Received {} bytes in {:.2?}",
×
998
            resp.payload.len(),
×
999
            self.time_to_first_msg.unwrap_or_default()
×
1000
        );
1001
        Ok(resp)
103,079✔
1002
    }
103,086✔
1003

1004
    pub async fn read_ack(&mut self) -> Result<proto::rpc::RpcResponse, RpcError> {
1✔
1005
        let resp = self.next().await?;
1✔
1006
        Ok(resp)
1✔
1007
    }
1✔
1008

1009
    fn check_response(&self, resp: &proto::rpc::RpcResponse) -> Result<(), RpcError> {
103,081✔
1010
        let resp_id = u16::try_from(resp.request_id)
103,081✔
1011
            .map_err(|_| RpcStatus::protocol_error(&format!("invalid request_id: must be less than {}", u16::MAX)))?;
103,081✔
1012

1013
        let flags =
103,081✔
1014
            RpcMessageFlags::from_bits(u8::try_from(resp.flags).map_err(|_| {
103,081✔
1015
                RpcStatus::protocol_error(&format!("invalid message flag: must be less than {}", u8::MAX))
×
1016
            })?)
103,081✔
1017
            .ok_or(RpcStatus::protocol_error(&format!(
103,081✔
1018
                "invalid message flag, does not match any flags ({})",
103,081✔
1019
                resp.flags
103,081✔
1020
            )))?;
103,081✔
1021
        if flags.contains(RpcMessageFlags::ACK) {
103,081✔
1022
            return Err(RpcError::UnexpectedAckResponse);
×
1023
        }
103,081✔
1024

103,081✔
1025
        if resp_id != self.request_id {
103,081✔
1026
            return Err(RpcError::ResponseIdDidNotMatchRequest {
1027
                expected: self.request_id,
2✔
1028
                actual: u16::try_from(resp.request_id).map_err(|_| {
2✔
1029
                    RpcStatus::protocol_error(&format!("invalid request_id: must be less than {}", u16::MAX))
×
1030
                })?,
2✔
1031
            });
1032
        }
103,079✔
1033

103,079✔
1034
        Ok(())
103,079✔
1035
    }
103,081✔
1036

1037
    async fn next(&mut self) -> Result<proto::rpc::RpcResponse, RpcError> {
103,091✔
1038
        // Wait until the timeout, allowing an extra grace period to account for latency
1039
        let next_msg_fut = match self.config.timeout_with_grace_period() {
103,091✔
1040
            Some(timeout) => Either::Left(time::timeout(timeout, self.framed.next())),
103,091✔
1041
            None => Either::Right(self.framed.next().map(Ok)),
×
1042
        };
1043

1044
        match next_msg_fut.await {
103,091✔
1045
            Ok(Some(Ok(resp))) => Ok(proto::rpc::RpcResponse::decode(resp)?),
103,082✔
1046
            Ok(Some(Err(err))) => Err(err.into()),
×
1047
            Ok(None) => Err(RpcError::ServerClosedRequest),
4✔
1048
            Err(_) => Err(RpcError::ReplyTimeout),
1✔
1049
        }
1050
    }
103,087✔
1051
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc