• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

oasisprotocol / oasis-core / #5315

30 Sep 2024 07:43AM UTC coverage: 47.384% (+0.2%) from 47.14%
#5315

Pull #5872

peternose
runtime/src/enclave_rpc: Support peer feedback for concurrent requests
Pull Request #5872: runtime/src/enclave_rpc: Support peer feedback for concurrent requests

69 of 84 new or added lines in 2 files covered. (82.14%)

5 existing lines in 2 files now uncovered.

4393 of 9271 relevant lines covered (47.38%)

1.07 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.57
/runtime/src/enclave_rpc/client.rs
1
//! Enclave RPC client.
2
use std::{collections::HashSet, mem, sync::Arc};
3

4
#[cfg(not(test))]
5
use rand::{rngs::OsRng, RngCore};
6
use thiserror::Error;
7
use tokio::sync::{mpsc, oneshot};
8

9
use crate::{
10
    common::{
11
        crypto::signature,
12
        namespace::Namespace,
13
        sgx::{EnclaveIdentity, QuotePolicy},
14
    },
15
    enclave_rpc::{
16
        session::{Builder, Session},
17
        types,
18
    },
19
    protocol::Protocol,
20
};
21

22
use super::transport::{RuntimeTransport, Transport};
23

24
/// Internal command queue backlog.
25
const CMDQ_BACKLOG: usize = 32;
26
/// Maximum number of retries on transport errors.
27
const MAX_TRANSPORT_ERROR_RETRIES: usize = 3;
28

29
/// RPC client error.
30
#[derive(Error, Debug)]
31
pub enum RpcClientError {
32
    #[error("call failed: {0}")]
33
    CallFailed(String),
34
    #[error("expected response message, received: {0:?}")]
35
    ExpectedResponseMessage(types::Message),
36
    #[error("expected close message, received: {0:?}")]
37
    ExpectedCloseMessage(types::Message),
38
    #[error("transport error")]
39
    Transport,
40
    #[error("unsupported RPC kind")]
41
    UnsupportedRpcKind,
42
    #[error("client dropped")]
43
    Dropped,
44
    #[error("decode error: {0}")]
45
    DecodeError(#[from] cbor::DecodeError),
46
    #[error("unknown error: {0}")]
47
    Unknown(#[from] anyhow::Error),
48
}
49

50
/// A command sent to the client controller task.
51
#[derive(Debug)]
52
enum Command {
53
    Call(
54
        types::Request,
55
        types::Kind,
56
        Vec<signature::PublicKey>,
57
        oneshot::Sender<Result<(u64, types::Response), RpcClientError>>,
58
    ),
59
    PeerFeedback(u64, types::PeerFeedback, types::Kind),
60
    UpdateEnclaves(Option<HashSet<EnclaveIdentity>>),
61
    UpdateQuotePolicy(QuotePolicy),
62
    UpdateRuntimeID(Option<Namespace>),
63
    #[cfg(test)]
64
    Ping(oneshot::Sender<()>),
65
}
66

67
struct MultiplexedSession {
68
    /// Session builder for resetting sessions.
69
    builder: Builder,
70
    /// Unique session identifier.
71
    id: types::SessionID,
72
    /// Current underlying protocol session.
73
    inner: Session,
74
}
75

76
impl MultiplexedSession {
77
    fn new(builder: Builder) -> Self {
1✔
78
        Self {
79
            builder: builder.clone(),
1✔
80
            id: types::SessionID::random(),
1✔
81
            inner: builder.build_initiator(),
1✔
82
        }
83
    }
84

85
    fn reset(&mut self) {
1✔
86
        self.id = types::SessionID::random();
1✔
87
        self.inner = self.builder.clone().build_initiator();
2✔
88
    }
89
}
90

91
struct Controller {
92
    /// Multiplexed session.
93
    session: MultiplexedSession,
94
    /// Used transport.
95
    transport: Box<dyn Transport>,
96
    /// Internal command queue (receiver part).
97
    cmdq: mpsc::Receiver<Command>,
98
    /// The ID of the last request.
99
    last_request_id: u64,
100
}
101

102
impl Controller {
103
    async fn run(mut self) {
5✔
104
        while let Some(cmd) = self.cmdq.recv().await {
6✔
105
            match cmd {
1✔
106
                Command::Call(request, kind, nodes, sender) => {
2✔
107
                    self.call(request, kind, nodes, sender).await
4✔
108
                }
109
                Command::PeerFeedback(request_id, peer_feedback, kind) => {
1✔
110
                    let _ = self
3✔
111
                        .transport
112
                        .submit_peer_feedback(request_id, peer_feedback)
113
                        .await; // Ignore error.
4✔
114

115
                    // In case the peer feedback is bad, reset the session so a new peer can be
116
                    // selected for a subsequent session.
117
                    if !matches!(peer_feedback, types::PeerFeedback::Success)
1✔
118
                        && kind == types::Kind::NoiseSession
×
119
                    {
120
                        self.reset().await;
×
121
                    }
122
                }
123
                Command::UpdateEnclaves(enclaves) => {
×
124
                    if self.session.builder.get_remote_enclaves() == &enclaves {
×
125
                        continue;
126
                    }
127

128
                    self.session.builder =
×
129
                        mem::take(&mut self.session.builder).remote_enclaves(enclaves);
×
130
                    self.reset().await;
×
131
                }
132
                Command::UpdateQuotePolicy(policy) => {
×
133
                    let policy = Some(Arc::new(policy));
×
134
                    if self.session.builder.get_quote_policy() == &policy {
×
135
                        continue;
136
                    }
137

138
                    self.session.builder =
×
139
                        mem::take(&mut self.session.builder).quote_policy(policy);
×
140
                    self.reset().await;
×
141
                }
142
                Command::UpdateRuntimeID(id) => {
×
143
                    if self.session.builder.get_remote_runtime_id() == &id {
×
144
                        continue;
145
                    }
146

147
                    self.session.builder =
×
148
                        mem::take(&mut self.session.builder).remote_runtime_id(id);
×
149
                    self.reset().await;
×
150
                }
151
                #[cfg(test)]
152
                Command::Ping(sender) => {
153
                    let _ = sender.send(());
154
                }
155
            }
156
        }
157

158
        // Close stream after the client is dropped.
159
        let _ = self.close().await;
×
160
    }
161

162
    async fn call(
1✔
163
        &mut self,
164
        request: types::Request,
165
        kind: types::Kind,
166
        nodes: Vec<signature::PublicKey>,
167
        sender: oneshot::Sender<Result<(u64, types::Response), RpcClientError>>,
168
    ) {
169
        let result = async {
6✔
170
            match kind {
1✔
171
                types::Kind::NoiseSession => {
172
                    // Attempt to establish a connection. This will not do anything in case the
173
                    // session has already been established.
174
                    self.connect(nodes).await?;
2✔
175

176
                    // Perform the call.
177
                    self.secure_call_raw(request).await
2✔
178
                }
179
                types::Kind::InsecureQuery => {
180
                    // Perform the call.
181
                    self.insecure_call_raw(request, nodes).await
2✔
182
                }
183
                _ => Err(RpcClientError::UnsupportedRpcKind),
×
184
            }
185
        }
186
        .await;
3✔
187

188
        if result.is_err() {
2✔
189
            // Set peer feedback immediately so retries can try new peers.
190
            let _ = self
4✔
191
                .transport
192
                .submit_peer_feedback(self.last_request_id, types::PeerFeedback::Failure)
1✔
193
                .await; // Ignore error.
3✔
194

195
            // In case there was a transport error we need to reset the session immediately as no
196
            // progress is possible.
197
            if kind == types::Kind::NoiseSession {
1✔
198
                self.reset().await;
1✔
199
            }
200
        }
201

202
        let _ = sender.send(result.map(|rsp| (self.last_request_id, rsp)));
3✔
203
    }
204

205
    async fn connect(&mut self, nodes: Vec<signature::PublicKey>) -> Result<(), RpcClientError> {
5✔
206
        // No need to create a new session if we are connected to one of the nodes.
207
        if self.session.inner.is_connected()
2✔
208
            && (nodes.is_empty() || self.session.inner.is_connected_to(&nodes))
1✔
209
        {
210
            return Ok(());
1✔
211
        }
212
        // Make sure the session is reset for a new connection.
213
        self.reset().await;
2✔
214

215
        // Handshake1 -> Handshake2
216
        let mut buffer = vec![];
1✔
217
        self.session
5✔
218
            .inner
219
            .process_data(vec![], &mut buffer)
2✔
220
            .await
3✔
221
            .expect("initiation must always succeed");
222
        let session_id = self.session.id;
1✔
223

224
        self.last_request_id += 1;
1✔
225
        let rsp = self
5✔
226
            .transport
227
            .write_noise_session(
228
                self.last_request_id,
1✔
229
                session_id,
1✔
230
                buffer,
1✔
231
                String::new(),
1✔
232
                nodes,
1✔
233
            )
234
            .await
3✔
235
            .map_err(|_| RpcClientError::Transport)?;
×
236

237
        // Update the session with the identity of the remote node. The latter still needs to be
238
        // verified using the RAK from the consensus layer.
239
        self.session.inner.set_remote_node(rsp.node)?;
2✔
240

241
        // Handshake2 -> Transport
242
        let mut buffer = vec![];
1✔
243
        self.session
5✔
244
            .inner
245
            .process_data(rsp.data, &mut buffer)
1✔
246
            .await
3✔
247
            .map_err(|_| RpcClientError::Transport)?;
1✔
248

249
        let _ = self
4✔
250
            .transport
251
            .submit_peer_feedback(self.last_request_id, types::PeerFeedback::Success)
1✔
252
            .await; // Ignore error.
3✔
253

254
        self.last_request_id += 1;
1✔
255
        self.transport
5✔
256
            .write_noise_session(
257
                self.last_request_id,
1✔
258
                session_id,
1✔
259
                buffer,
1✔
260
                String::new(),
1✔
261
                vec![rsp.node],
2✔
262
            )
263
            .await
3✔
264
            .map_err(|_| RpcClientError::Transport)?;
1✔
265

266
        // Check if the session has failed authentication. In this case, notify the other side
267
        // (returning an error here will do that in `call`).
268
        if self.session.inner.is_unauthenticated() {
1✔
269
            return Err(RpcClientError::Transport);
×
270
        }
271

272
        let _ = self
4✔
273
            .transport
274
            .submit_peer_feedback(self.last_request_id, types::PeerFeedback::Success)
1✔
275
            .await; // Ignore error.
3✔
276

277
        Ok(())
1✔
278
    }
279

280
    async fn secure_call_raw(
1✔
281
        &mut self,
282
        request: types::Request,
283
    ) -> Result<types::Response, RpcClientError> {
284
        let method = request.method.clone();
2✔
285
        let msg = types::Message::Request(request);
1✔
286

287
        // Prepare the request message.
288
        let mut buffer = vec![];
1✔
289
        self.session
3✔
290
            .inner
291
            .write_message(msg, &mut buffer)
1✔
292
            .map_err(|_| RpcClientError::Transport)?;
×
293
        let node = self.session.inner.get_node()?;
2✔
294

295
        // Send the request and receive the response.
296
        self.last_request_id += 1;
2✔
297
        let rsp = self
7✔
298
            .transport
299
            .write_noise_session(
300
                self.last_request_id,
1✔
301
                self.session.id,
1✔
302
                buffer,
1✔
303
                method,
1✔
304
                vec![node],
2✔
305
            )
306
            .await
3✔
307
            .map_err(|_| RpcClientError::Transport)?;
3✔
308

309
        // Process the response.
310
        let msg = self
6✔
311
            .session
312
            .inner
313
            .process_data(rsp.data, vec![])
2✔
314
            .await?
4✔
315
            .expect("message must be decoded if there is no error");
316

317
        match msg {
1✔
318
            types::Message::Response(rsp) => Ok(rsp),
1✔
319
            msg => Err(RpcClientError::ExpectedResponseMessage(msg)),
×
320
        }
321
    }
322

323
    async fn insecure_call_raw(
1✔
324
        &mut self,
325
        request: types::Request,
326
        nodes: Vec<signature::PublicKey>,
327
    ) -> Result<types::Response, RpcClientError> {
328
        self.last_request_id += 1;
2✔
329
        let rsp = self
6✔
330
            .transport
331
            .write_insecure_query(self.last_request_id, cbor::to_vec(request), nodes)
2✔
332
            .await
3✔
333
            .map_err(|_| RpcClientError::Transport)?;
3✔
334

335
        cbor::from_slice(&rsp.data).map_err(RpcClientError::DecodeError)
2✔
336
    }
337

338
    async fn reset(&mut self) {
5✔
339
        // Notify the other end (if any) of session closure.
340
        let _ = self.close_notify().await;
2✔
341
        // Reset the session.
342
        self.session.reset();
1✔
343
    }
344

345
    async fn close_notify(&mut self) -> Result<Vec<u8>, RpcClientError> {
5✔
346
        let node = self.session.inner.get_node()?;
4✔
347

348
        let mut buffer = vec![];
1✔
349
        self.session
4✔
350
            .inner
351
            .write_message(types::Message::Close, &mut buffer)
1✔
352
            .map_err(|_| RpcClientError::Transport)?;
3✔
353

354
        self.last_request_id += 1;
2✔
355
        let result = self
5✔
356
            .transport
357
            .write_noise_session(
358
                self.last_request_id,
1✔
359
                self.session.id,
1✔
360
                buffer,
1✔
361
                String::new(),
1✔
362
                vec![node],
2✔
363
            )
364
            .await
3✔
365
            .map_err(|_| RpcClientError::Transport)
2✔
NEW
366
            .map(|rsp| rsp.data);
×
367

368
        let pf = if result.is_err() {
2✔
369
            types::PeerFeedback::Failure
1✔
370
        } else {
NEW
371
            types::PeerFeedback::Success
×
372
        };
373

374
        let _ = self
4✔
375
            .transport
376
            .submit_peer_feedback(self.last_request_id, pf)
1✔
377
            .await; // Ignore error.
3✔
378

379
        result
1✔
380
    }
381

382
    async fn close(&mut self) -> Result<(), RpcClientError> {
×
383
        if !self.session.inner.is_connected() {
×
384
            return Ok(());
×
385
        }
386

387
        let data = self.close_notify().await?;
×
388

389
        // Close the session and check the received message.
390
        let msg = self
×
391
            .session
392
            .inner
393
            .process_data(data, vec![])
×
394
            .await?
×
395
            .expect("message must be decoded if there is no error");
396
        self.session.inner.close();
×
397

398
        match msg {
×
399
            types::Message::Close => Ok(()),
×
400
            msg => Err(RpcClientError::ExpectedCloseMessage(msg)),
×
401
        }
402
    }
403
}
404

405
/// An EnclaveRPC response that can be used to provide peer feedback.
406
pub struct Response<T> {
407
    inner: Result<T, RpcClientError>,
408
    kind: types::Kind,
409
    cmdq: mpsc::WeakSender<Command>,
410
    request_id: Option<u64>,
411
}
412

413
impl<T> Response<T> {
414
    /// Report success if result was `Ok(_)` and failure if result was `Err(_)`, then return the
415
    /// inner result consuming the response instance.
416
    pub async fn into_result_with_feedback(mut self) -> Result<T, RpcClientError> {
5✔
417
        match self.inner {
1✔
418
            Ok(_) => self.success().await,
2✔
419
            Err(_) => self.failure().await,
×
420
        }
421

422
        self.inner
1✔
423
    }
424

425
    /// Reference to inner result.
426
    pub fn result(&self) -> &Result<T, RpcClientError> {
×
427
        &self.inner
×
428
    }
429

430
    /// Consume the response instance returning the inner result.
431
    pub fn into_result(self) -> Result<T, RpcClientError> {
×
432
        self.inner
×
433
    }
434

435
    /// Report success as peer feedback.
436
    pub async fn success(&mut self) {
5✔
437
        self.send_peer_feedback(types::PeerFeedback::Success).await;
2✔
438
    }
439

440
    /// Report failure as peer feedback.
441
    pub async fn failure(&mut self) {
×
442
        self.send_peer_feedback(types::PeerFeedback::Failure).await;
×
443
    }
444

445
    /// Report bad peer as peer feedback.
446
    pub async fn bad_peer(&mut self) {
×
447
        self.send_peer_feedback(types::PeerFeedback::BadPeer).await;
×
448
    }
449

450
    /// Send peer feedback.
451
    async fn send_peer_feedback(&mut self, pf: types::PeerFeedback) {
5✔
452
        if let Some(request_id) = self.request_id.take() {
3✔
453
            // Only count feedback once.
454
            if let Some(cmdq) = self.cmdq.upgrade() {
2✔
455
                let _ = cmdq
4✔
456
                    .send(Command::PeerFeedback(request_id, pf, self.kind))
1✔
457
                    .await;
3✔
458
            }
459
        }
460
    }
461
}
462

463
/// RPC client.
464
pub struct RpcClient {
465
    /// Internal command queue (sender part).
466
    cmdq: mpsc::Sender<Command>,
467
}
468

469
impl RpcClient {
470
    fn new(transport: Box<dyn Transport>, builder: Builder) -> Self {
1✔
471
        // Create the command channel.
472
        let (tx, rx) = mpsc::channel(CMDQ_BACKLOG);
2✔
473

474
        // Ensure every client generates unique request IDs.
475
        let last_request_id = Self::generate_random_request_id();
2✔
476

477
        // Create the controller task and start it.
478
        let controller = Controller {
479
            session: MultiplexedSession::new(builder),
1✔
480
            transport,
481
            cmdq: rx,
482
            last_request_id,
483
        };
484
        tokio::spawn(controller.run());
1✔
485

486
        Self { cmdq: tx }
487
    }
488

489
    /// Construct an unconnected RPC client with runtime-internal transport.
490
    pub fn new_runtime(builder: Builder, protocol: Arc<Protocol>, endpoint: &str) -> Self {
×
491
        Self::new(Box::new(RuntimeTransport::new(protocol, endpoint)), builder)
×
492
    }
493

494
    /// Call a remote method using an encrypted and authenticated Noise session.
495
    pub async fn secure_call<C, O>(
1✔
496
        &self,
497
        method: &'static str,
498
        args: C,
499
        nodes: Vec<signature::PublicKey>,
500
    ) -> Response<O>
501
    where
502
        C: cbor::Encode,
503
        O: cbor::Decode + Send + 'static,
504
    {
505
        self.call(method, args, types::Kind::NoiseSession, nodes)
3✔
506
            .await
4✔
507
    }
508

509
    /// Call a remote method over an insecure channel where messages are sent in plain text.
510
    pub async fn insecure_call<C, O>(
1✔
511
        &self,
512
        method: &'static str,
513
        args: C,
514
        nodes: Vec<signature::PublicKey>,
515
    ) -> Response<O>
516
    where
517
        C: cbor::Encode,
518
        O: cbor::Decode + Send + 'static,
519
    {
520
        self.call(method, args, types::Kind::InsecureQuery, nodes)
3✔
521
            .await
4✔
522
    }
523

524
    async fn call<C, O>(
1✔
525
        &self,
526
        method: &'static str,
527
        args: C,
528
        kind: types::Kind,
529
        nodes: Vec<signature::PublicKey>,
530
    ) -> Response<O>
531
    where
532
        C: cbor::Encode,
533
        O: cbor::Decode + Send + 'static,
534
    {
535
        let request = types::Request {
536
            method: method.to_owned(),
1✔
537
            args: cbor::to_value(args),
1✔
538
        };
539

540
        // In case the `execute_call` method returns an outer error, this means that there was a
541
        // problem with the transport itself and we can retry.
542
        let retry_strategy = tokio_retry::strategy::ExponentialBackoff::from_millis(2)
3✔
543
            .factor(25)
544
            .max_delay(std::time::Duration::from_millis(250))
1✔
545
            .take(MAX_TRANSPORT_ERROR_RETRIES);
×
546

547
        let result = tokio_retry::Retry::spawn(retry_strategy, || {
4✔
548
            self.execute_call(request.clone(), kind, nodes.clone())
1✔
549
        })
550
        .await;
4✔
551

552
        let (request_id, inner) = match result {
2✔
553
            Ok((request_id, response)) => match response.body {
1✔
554
                types::Body::Success(value) => (
2✔
555
                    Some(request_id),
1✔
556
                    cbor::from_value(value).map_err(Into::into),
2✔
557
                ),
NEW
558
                types::Body::Error(error) => {
×
NEW
559
                    (Some(request_id), Err(RpcClientError::CallFailed(error)))
×
560
                }
561
            },
UNCOV
562
            Err(err) => (None, Err(err)),
×
563
        };
564

565
        Response {
566
            inner,
567
            kind,
568
            cmdq: self.cmdq.downgrade(),
1✔
569
            request_id,
570
        }
571
    }
572

573
    async fn execute_call(
1✔
574
        &self,
575
        request: types::Request,
576
        kind: types::Kind,
577
        nodes: Vec<signature::PublicKey>,
578
    ) -> Result<(u64, types::Response), RpcClientError> {
579
        let (tx, rx) = oneshot::channel();
2✔
580
        self.cmdq
4✔
581
            .send(Command::Call(request, kind, nodes, tx))
1✔
582
            .await
3✔
583
            .map_err(|_| RpcClientError::Dropped)?;
×
584

585
        rx.await.map_err(|_| RpcClientError::Dropped)?
4✔
586
    }
587

588
    /// Update allowed remote enclave identities.
589
    ///
590
    /// Useful if the key manager's policy has changed.
591
    ///
592
    /// # Panics
593
    ///
594
    /// This function panics if called within an asynchronous execution context.
595
    pub fn update_enclaves(&self, enclaves: Option<HashSet<EnclaveIdentity>>) {
×
596
        self.cmdq
×
597
            .blocking_send(Command::UpdateEnclaves(enclaves))
×
598
            .unwrap();
599
    }
600

601
    /// Update key manager's quote policy.
602
    ///
603
    /// # Panics
604
    ///
605
    /// This function panics if called within an asynchronous execution context.
606
    pub fn update_quote_policy(&self, policy: QuotePolicy) {
×
607
        self.cmdq
×
608
            .blocking_send(Command::UpdateQuotePolicy(policy))
×
609
            .unwrap();
610
    }
611

612
    /// Update remote runtime id.
613
    ///
614
    /// # Panics
615
    ///
616
    /// This function panics if called within an asynchronous execution context.
617
    pub fn update_runtime_id(&self, id: Option<Namespace>) {
×
618
        self.cmdq
×
619
            .blocking_send(Command::UpdateRuntimeID(id))
×
620
            .unwrap();
621
    }
622

623
    /// Generate a random request ID.
624
    #[cfg(not(test))]
NEW
625
    fn generate_random_request_id() -> u64 {
×
626
        let mut rng = OsRng {};
NEW
627
        rng.next_u64() & (!(1 << 63))
×
628
    }
629

630
    /// Generate a random request ID.
631
    #[cfg(test)]
632
    fn generate_random_request_id() -> u64 {
633
        0
634
    }
635

636
    /// Wait for the controller to process all queued messages.
637
    #[cfg(test)]
638
    async fn flush_cmd_queue(&self) -> Result<(), RpcClientError> {
5✔
639
        let (tx, rx) = oneshot::channel();
2✔
640
        self.cmdq
4✔
641
            .send(Command::Ping(tx))
1✔
642
            .await
3✔
643
            .map_err(|_| RpcClientError::Dropped)?;
×
644

645
        rx.await.map_err(|_| RpcClientError::Dropped)
4✔
646
    }
647
}
648

649
#[cfg(test)]
650
mod test {
651
    use std::sync::{
652
        atomic::{AtomicBool, Ordering},
653
        Arc, Mutex,
654
    };
655

656
    use anyhow::anyhow;
657
    use async_trait::async_trait;
658

659
    use crate::{
660
        common::crypto::signature,
661
        enclave_rpc::{demux::Demux, session, transport::EnclaveResponse, types},
662
    };
663

664
    use super::{super::transport::Transport, RpcClient};
665

666
    #[derive(Clone)]
667
    struct MockTransport {
668
        demux: Arc<Demux>,
669
        next_error: Arc<AtomicBool>,
670
        peer_feedback_history: Arc<Mutex<Vec<(u64, types::PeerFeedback)>>>,
671
    }
672

673
    impl MockTransport {
674
        fn new() -> Self {
675
            Self {
676
                demux: Arc::new(Demux::new(session::Builder::default(), 4, 4, 60)),
677
                next_error: Arc::new(AtomicBool::new(false)),
678
                peer_feedback_history: Arc::new(Mutex::new(Vec::new())),
679
            }
680
        }
681

682
        fn reset(&self) {
683
            self.demux.reset();
684
        }
685

686
        fn induce_transport_error(&self) {
687
            self.next_error.store(true, Ordering::SeqCst);
688
        }
689

690
        fn take_peer_feedback_history(&self) -> Vec<(u64, types::PeerFeedback)> {
691
            let mut pfh = self.peer_feedback_history.lock().unwrap();
692
            pfh.drain(..).collect()
693
        }
694
    }
695

696
    #[async_trait]
697
    impl Transport for MockTransport {
698
        async fn write_message_impl(
699
            &self,
700
            _request_id: u64,
701
            request: Vec<u8>,
702
            kind: types::Kind,
703
            _nodes: Vec<signature::PublicKey>,
704
        ) -> Result<EnclaveResponse, anyhow::Error> {
705
            // Induce error when configured to do so.
706
            if self
707
                .next_error
708
                .compare_exchange(true, false, Ordering::SeqCst, Ordering::SeqCst)
709
                .is_ok()
710
            {
711
                return Err(anyhow!("transport error"));
712
            }
713

714
            match kind {
715
                types::Kind::NoiseSession => {
716
                    // Deliver directly to the multiplexer.
717
                    let mut buffer = Vec::new();
718
                    let (mut session, message) = self
719
                        .demux
720
                        .process_frame(vec![], request, &mut buffer)
721
                        .await?;
722

723
                    match message {
724
                        Some(message) => {
725
                            // Message, process and write reply.
726
                            let body = match message {
727
                                types::Message::Request(rq) => {
728
                                    // Just echo back what was given.
729
                                    types::Body::Success(rq.args)
730
                                }
731
                                _ => panic!("unhandled message type"),
732
                            };
733
                            let response = types::Message::Response(types::Response { body });
734

735
                            let mut buffer = Vec::new();
736
                            session.write_message(response, &mut buffer)?;
737

738
                            let rsp = EnclaveResponse {
739
                                data: buffer,
740
                                node: Default::default(),
741
                            };
742
                            Ok(rsp)
743
                        }
744
                        None => {
745
                            // Handshake.
746
                            let rsp = EnclaveResponse {
747
                                data: buffer,
748
                                node: Default::default(),
749
                            };
750
                            Ok(rsp)
751
                        }
752
                    }
753
                }
754
                types::Kind::InsecureQuery => {
755
                    // Just echo back what was given.
756
                    let rq: types::Request = cbor::from_slice(&request).unwrap();
757
                    let body = types::Body::Success(rq.args);
758
                    let response = types::Response { body };
759
                    let rsp = EnclaveResponse {
760
                        data: cbor::to_vec(response),
761
                        node: Default::default(),
762
                    };
763
                    return Ok(rsp);
764
                }
765
                types::Kind::LocalQuery => {
766
                    panic!("unhandled RPC kind")
767
                }
768
            }
769
        }
770

771
        async fn submit_peer_feedback(
772
            &self,
773
            request_id: u64,
774
            peer_feedback: types::PeerFeedback,
775
        ) -> Result<(), anyhow::Error> {
776
            self.peer_feedback_history
777
                .lock()
778
                .unwrap()
779
                .push((request_id, peer_feedback));
780

781
            Ok(())
782
        }
783
    }
784

785
    #[test]
786
    fn test_rpc_client() {
787
        let rt = tokio::runtime::Runtime::new().unwrap();
788
        let _guard = rt.enter(); // Ensure Tokio runtime is available.
789
        let transport = MockTransport::new();
790
        let builder = session::Builder::default();
791
        let client = RpcClient::new(Box::new(transport.clone()), builder);
792

793
        // Basic secure call.
794
        let result: u64 = rt
795
            .block_on(async {
796
                client
797
                    .secure_call("test", 42, vec![])
798
                    .await
799
                    .into_result_with_feedback()
800
                    .await
801
            })
802
            .unwrap();
803
        rt.block_on(client.flush_cmd_queue()).unwrap(); // Flush cmd queue to get peer feedback.
804
        assert_eq!(result, 42, "secure call should work");
805
        assert_eq!(
806
            transport.take_peer_feedback_history(),
807
            vec![
808
                (1, types::PeerFeedback::Success), // Handshake.
809
                (2, types::PeerFeedback::Success), // Handshake.
810
                (3, types::PeerFeedback::Success), // Handled call.
811
            ]
812
        );
813

814
        // Reset all sessions on the server and make sure that we can still get a response.
815
        transport.reset();
816

817
        let result: u64 = rt
818
            .block_on(async {
819
                client
820
                    .secure_call("test", 43, vec![])
821
                    .await
822
                    .into_result_with_feedback()
823
                    .await
824
            })
825
            .unwrap();
826
        rt.block_on(client.flush_cmd_queue()).unwrap(); // Flush cmd queue to get peer feedback.
827
        assert_eq!(result, 43, "secure call should work");
828
        assert_eq!(
829
            transport.take_peer_feedback_history(),
830
            vec![
831
                (4, types::PeerFeedback::Failure), // Failed call due to session reset.
832
                (5, types::PeerFeedback::Success), // New handshake.
833
                (6, types::PeerFeedback::Success), // New handshake.
834
                (7, types::PeerFeedback::Success), // Handled call.
835
            ]
836
        );
837

838
        // Induce a single transport error without resetting the server sessions and make sure we
839
        // can still get a response.
840
        transport.induce_transport_error();
841

842
        let result: u64 = rt
843
            .block_on(async {
844
                client
845
                    .secure_call("test", 44, vec![])
846
                    .await
847
                    .into_result_with_feedback()
848
                    .await
849
            })
850
            .unwrap();
851
        rt.block_on(client.flush_cmd_queue()).unwrap(); // Flush cmd queue to get peer feedback.
852
        assert_eq!(result, 44, "secure call should work");
853
        assert_eq!(
854
            transport.take_peer_feedback_history(),
855
            vec![
856
                (8, types::PeerFeedback::Failure), // Handshake failed due to induced error.
857
                (9, types::PeerFeedback::Failure), // Session close failed due to decrypt error (handshake not completed).
858
                (10, types::PeerFeedback::Success), // New handshake.
859
                (11, types::PeerFeedback::Success), // New handshake.
860
                (12, types::PeerFeedback::Success), // Handled call.
861
            ]
862
        );
863

864
        // Basic insecure call.
865
        let result: u64 = rt
866
            .block_on(async {
867
                client
868
                    .insecure_call("test", 45, vec![])
869
                    .await
870
                    .into_result_with_feedback()
871
                    .await
872
            })
873
            .unwrap();
874
        rt.block_on(client.flush_cmd_queue()).unwrap(); // Flush cmd queue to get peer feedback.
875
        assert_eq!(result, 45, "insecure call should work");
876
        assert_eq!(
877
            transport.take_peer_feedback_history(),
878
            vec![
879
                (13, types::PeerFeedback::Success), // Handled call.
880
            ]
881
        );
882

883
        // Induce a single transport error and make sure we can still get a response.
884
        transport.induce_transport_error();
885

886
        let result: u64 = rt
887
            .block_on(async {
888
                client
889
                    .insecure_call("test", 46, vec![])
890
                    .await
891
                    .into_result_with_feedback()
892
                    .await
893
            })
894
            .unwrap();
895
        rt.block_on(client.flush_cmd_queue()).unwrap(); // Flush cmd queue to get peer feedback.
896
        assert_eq!(result, 46, "insecure call should work");
897
        assert_eq!(
898
            transport.take_peer_feedback_history(),
899
            vec![
900
                (14, types::PeerFeedback::Failure), // Failed call due to induced error.
901
                (15, types::PeerFeedback::Success), // Handled call.
902
            ]
903
        );
904
    }
905
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc