• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

oasisprotocol / oasis-core / #4460

13 Feb 2024 08:53AM UTC coverage: 45.523% (+0.04%) from 45.481%
#4460

Pull #5554

peternose
go/p2p/peermgmt: Debugging storage-early-state-sync test
Pull Request #5554: keymanager/runtime: Use insecure RPC requests for ephemeral public keys

0 of 4 new or added lines in 3 files covered. (0.0%)

5 existing lines in 3 files now uncovered.

2969 of 6522 relevant lines covered (45.52%)

0.98 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/runtime/src/dispatcher.rs
1
//! Runtime call dispatcher.
2
use std::{
3
    convert::TryInto,
4
    sync::{Arc, Condvar, Mutex},
5
    thread,
6
};
7

8
use anyhow::Result as AnyResult;
9
use rustc_hex::ToHex;
10
use slog::{debug, error, info, warn, Logger};
11
use tokio::sync::mpsc;
12

13
use crate::{
14
    attestation, cache,
15
    common::{
16
        crypto::{hash::Hash, signature::Signer},
17
        logger::get_logger,
18
        process,
19
        sgx::QuotePolicy,
20
    },
21
    consensus::{
22
        beacon::EpochTime,
23
        roothash::{self, ComputeResultsHeader, Header, COMPUTE_RESULTS_HEADER_SIGNATURE_CONTEXT},
24
        state::keymanager::Status as KeyManagerStatus,
25
        verifier::Verifier,
26
        LightBlock,
27
    },
28
    enclave_rpc::{
29
        demux::Demux as RpcDemux,
30
        dispatcher::Dispatcher as RpcDispatcher,
31
        session::{self, SessionInfo},
32
        types::{
33
            Kind as RpcKind, Message as RpcMessage, Request as RpcRequest, Response as RpcResponse,
34
        },
35
        Context as RpcContext,
36
    },
37
    future::block_on,
38
    identity::Identity,
39
    policy::PolicyVerifier,
40
    protocol::{Protocol, ProtocolUntrustedLocalStorage},
41
    storage::mkvs::{sync::NoopReadSyncer, OverlayTree, Root, RootType},
42
    transaction::{
43
        dispatcher::{Dispatcher as TxnDispatcher, NoopDispatcher as TxnNoopDispatcher},
44
        tree::Tree as TxnTree,
45
        types::TxnBatch,
46
        Context as TxnContext,
47
    },
48
    types::{Body, ComputedBatch, Error, ExecutionMode},
49
};
50

51
/// Maximum amount of requests that can be in the dispatcher queue.
52
const BACKLOG_SIZE: usize = 1000;
53

54
/// Maximum total number of EnclaveRPC sessions.
55
const RPC_MAX_SESSIONS: usize = 1024;
56
/// Maximum concurrent EnclaveRPC sessions per peer. In case more sessions are open, old sessions
57
/// will be closed to make room for new sessions.
58
const RPC_MAX_SESSIONS_PER_PEER: usize = 8;
59
/// EnclaveRPC sessions without any processed frame for more than RPC_STALE_SESSION_TIMEOUT_SECS
60
/// seconds can be closed to make room for new sessions.
61
const RPC_STALE_SESSION_TIMEOUT_SECS: i64 = 10;
62

63
/// Interface for dispatcher initializers.
64
pub trait Initializer: Send + Sync {
65
    /// Initializes the dispatcher(s).
66
    fn init(&self, state: PreInitState<'_>) -> PostInitState;
67
}
68

69
impl<F> Initializer for F
70
where
71
    F: Fn(PreInitState<'_>) -> PostInitState + Send + Sync,
72
{
73
    fn init(&self, state: PreInitState<'_>) -> PostInitState {
×
74
        (*self)(state)
×
75
    }
76
}
77

78
/// State available before initialization.
79
pub struct PreInitState<'a> {
80
    /// Protocol instance.
81
    pub protocol: &'a Arc<Protocol>,
82
    /// Runtime Attestation Key instance.
83
    pub identity: &'a Arc<Identity>,
84
    /// RPC demultiplexer instance.
85
    pub rpc_demux: &'a mut RpcDemux,
86
    /// RPC dispatcher instance.
87
    pub rpc_dispatcher: &'a mut RpcDispatcher,
88
    /// Consensus verifier instance.
89
    pub consensus_verifier: &'a Arc<dyn Verifier>,
90
}
91

92
/// State returned by the initializer.
93
#[derive(Default)]
94
pub struct PostInitState {
95
    /// Optional transaction dispatcher that should be used.
96
    pub txn_dispatcher: Option<Box<dyn TxnDispatcher>>,
97
}
98

99
/// A guard that will abort the process if dropped while panicking.
100
///
101
/// This is to ensure that the runtime will terminate in case there is
102
/// a panic encountered during dispatch and the runtime is built with
103
/// a non-abort panic handler.
104
struct AbortOnPanic;
105

106
impl Drop for AbortOnPanic {
107
    fn drop(&mut self) {
×
108
        if thread::panicking() {
×
109
            process::abort();
×
110
        }
111
    }
112
}
113

114
impl From<tokio::task::JoinError> for Error {
115
    fn from(e: tokio::task::JoinError) -> Self {
×
116
        Error::new(
117
            "dispatcher",
118
            1,
119
            &format!("error while processing request: {e}"),
×
120
        )
121
    }
122
}
123

124
/// State related to dispatching a runtime transaction.
125
struct TxDispatchState {
126
    mode: ExecutionMode,
127
    consensus_block: LightBlock,
128
    consensus_verifier: Arc<dyn Verifier>,
129
    header: Header,
130
    epoch: EpochTime,
131
    round_results: roothash::RoundResults,
132
    max_messages: u32,
133
    check_only: bool,
134
}
135

136
/// State provided by the protocol upon successful initialization.
137
struct ProtocolState {
138
    protocol: Arc<Protocol>,
139
    consensus_verifier: Arc<dyn Verifier>,
140
}
141

142
/// State held by the dispatcher, shared between all async tasks.
143
#[derive(Clone)]
144
struct State {
145
    protocol: Arc<Protocol>,
146
    consensus_verifier: Arc<dyn Verifier>,
147
    dispatcher: Arc<Dispatcher>,
148
    rpc_demux: Arc<RpcDemux>,
149
    rpc_dispatcher: Arc<RpcDispatcher>,
150
    txn_dispatcher: Arc<dyn TxnDispatcher>,
151
    #[cfg_attr(not(target_env = "sgx"), allow(unused))]
152
    attestation_handler: attestation::Handler,
153
    policy_verifier: Arc<PolicyVerifier>,
154
    cache_set: cache::CacheSet,
155
}
156

157
#[derive(Debug)]
158
enum Command {
159
    Request(u64, Body),
160
}
161

162
/// Runtime call dispatcher.
163
pub struct Dispatcher {
164
    logger: Logger,
165
    queue_tx: mpsc::Sender<Command>,
166
    identity: Arc<Identity>,
167

168
    state: Mutex<Option<ProtocolState>>,
169
    state_cond: Condvar,
170

171
    tokio_runtime: tokio::runtime::Handle,
172
}
173

174
impl Dispatcher {
175
    /// Create a new runtime call dispatcher.
176
    pub fn new(
×
177
        tokio_runtime: tokio::runtime::Handle,
178
        initializer: Box<dyn Initializer>,
179
        identity: Arc<Identity>,
180
    ) -> Arc<Self> {
181
        let (tx, rx) = mpsc::channel(BACKLOG_SIZE);
×
182

183
        let dispatcher = Arc::new(Dispatcher {
×
184
            logger: get_logger("runtime/dispatcher"),
×
185
            queue_tx: tx,
×
186
            identity,
×
187
            state: Mutex::new(None),
×
188
            state_cond: Condvar::new(),
×
189
            tokio_runtime,
×
190
        });
191

192
        // Spawn the dispatcher processing thread.
193
        let d = dispatcher.clone();
×
194
        thread::spawn(move || {
×
195
            let _guard = AbortOnPanic;
196
            d.run(initializer, rx);
×
197
        });
198

199
        dispatcher
×
200
    }
201

202
    /// Start the dispatcher.
203
    pub fn start(&self, protocol: Arc<Protocol>, consensus_verifier: Box<dyn Verifier>) {
×
204
        let consensus_verifier = Arc::from(consensus_verifier);
×
205
        let mut s = self.state.lock().unwrap();
×
206
        *s = Some(ProtocolState {
×
207
            protocol,
×
208
            consensus_verifier,
×
209
        });
210
        self.state_cond.notify_one();
×
211
    }
212

213
    /// Queue a new request to be dispatched.
214
    pub fn queue_request(&self, id: u64, body: Body) -> AnyResult<()> {
×
215
        self.queue_tx.blocking_send(Command::Request(id, body))?;
×
216
        Ok(())
×
217
    }
218

219
    fn run(self: &Arc<Self>, initializer: Box<dyn Initializer>, mut rx: mpsc::Receiver<Command>) {
×
220
        // Wait for the state to be available.
221
        let ProtocolState {
×
222
            protocol,
×
223
            consensus_verifier,
×
224
        } = {
225
            let mut guard = self.state.lock().unwrap();
×
226
            while guard.is_none() {
×
227
                guard = self.state_cond.wait(guard).unwrap();
×
228
            }
229

230
            guard.take().unwrap()
×
231
        };
232

233
        // Ensure Tokio runtime is available during dispatcher initialization.
234
        let _guard = self.tokio_runtime.enter();
×
235

236
        // Create actual dispatchers for RPCs and transactions.
237
        info!(self.logger, "Starting the runtime dispatcher");
×
238
        let mut rpc_demux = RpcDemux::new(
239
            session::Builder::default().local_identity(self.identity.clone()),
×
240
            RPC_MAX_SESSIONS,
241
            RPC_MAX_SESSIONS_PER_PEER,
242
            RPC_STALE_SESSION_TIMEOUT_SECS,
243
        );
244
        let mut rpc_dispatcher = RpcDispatcher::default();
×
245
        let pre_init_state = PreInitState {
246
            protocol: &protocol,
247
            identity: &self.identity,
×
248
            rpc_demux: &mut rpc_demux,
249
            rpc_dispatcher: &mut rpc_dispatcher,
250
            consensus_verifier: &consensus_verifier,
251
        };
252
        let post_init_state = initializer.init(pre_init_state);
×
253
        let txn_dispatcher = post_init_state
×
254
            .txn_dispatcher
255
            .unwrap_or_else(|| Box::<TxnNoopDispatcher>::default());
×
256

257
        let state = State {
258
            protocol: protocol.clone(),
×
259
            consensus_verifier: consensus_verifier.clone(),
×
260
            dispatcher: self.clone(),
×
261
            rpc_demux: Arc::new(rpc_demux),
×
262
            rpc_dispatcher: Arc::new(rpc_dispatcher),
×
263
            txn_dispatcher: Arc::from(txn_dispatcher),
×
264
            attestation_handler: attestation::Handler::new(
×
265
                self.identity.clone(),
266
                protocol.clone(),
267
                consensus_verifier.clone(),
268
                protocol.get_runtime_id(),
269
                protocol.get_config().version,
270
            ),
271
            policy_verifier: Arc::new(PolicyVerifier::new(consensus_verifier)),
×
272
            cache_set: cache::CacheSet::new(protocol.clone()),
×
273
        };
274

275
        // Start the async message processing task.
276
        self.tokio_runtime.block_on(async move {
×
277
            while let Some(cmd) = rx.recv().await {
×
278
                // Process received command.
279
                match cmd {
280
                    Command::Request(id, request) => {
×
281
                        // Process request in its own task.
282
                        let state = state.clone();
×
283

284
                        tokio::spawn(async move {
×
285
                            let protocol = state.protocol.clone();
×
286
                            let dispatcher = state.dispatcher.clone();
×
287
                            let result = dispatcher.handle_request(state, request).await;
×
288

289
                            // Send response.
290
                            let response = match result {
×
291
                                Ok(body) => body,
×
292
                                Err(error) => Body::Error(error),
×
293
                            };
294
                            protocol.send_response(id, response).unwrap();
×
295
                        });
296
                    }
297
                }
298
            }
299
        });
300

301
        info!(self.logger, "Runtime call dispatcher is terminating");
×
302
    }
303

304
    async fn handle_request(self: &Arc<Self>, state: State, request: Body) -> Result<Body, Error> {
×
305
        match request {
×
306
            // Attestation-related requests.
307
            #[cfg(target_env = "sgx")]
308
            Body::RuntimeCapabilityTEERakInitRequest { .. }
309
            | Body::RuntimeCapabilityTEERakReportRequest {}
310
            | Body::RuntimeCapabilityTEERakAvrRequest { .. }
311
            | Body::RuntimeCapabilityTEERakQuoteRequest { .. } => {
312
                Ok(state.attestation_handler.handle(request).await?)
313
            }
314

315
            // RPC and transaction requests.
316
            Body::RuntimeRPCCallRequest {
×
317
                request,
318
                kind,
319
                peer_id,
320
            } => {
321
                debug!(self.logger, "Received RPC call request";
×
322
                    "kind" => ?kind,
323
                    "peer_id" => peer_id.to_hex::<String>(),
×
324
                );
325

326
                match kind {
×
327
                    RpcKind::NoiseSession => {
328
                        self.dispatch_secure_rpc(state, request, peer_id).await
×
329
                    }
330
                    RpcKind::InsecureQuery => self.dispatch_insecure_rpc(state, request).await,
×
331
                    RpcKind::LocalQuery => self.dispatch_local_rpc(state, request).await,
×
332
                }
333
            }
334
            Body::RuntimeLocalRPCCallRequest { request } => {
×
335
                debug!(self.logger, "Received RPC call request";
×
336
                    "kind" => ?RpcKind::LocalQuery,
337
                );
338

339
                self.dispatch_local_rpc(state, request).await
×
340
            }
341
            Body::RuntimeExecuteTxBatchRequest {
×
342
                mode,
343
                consensus_block,
344
                round_results,
345
                io_root,
346
                inputs,
347
                in_msgs,
348
                block,
349
                epoch,
350
                max_messages,
351
            } => {
352
                // Transaction execution.
353
                self.dispatch_txn(
×
354
                    state.cache_set,
×
355
                    &state.txn_dispatcher,
×
356
                    &state.protocol,
357
                    io_root,
358
                    inputs.unwrap_or_default(),
×
359
                    in_msgs,
×
360
                    TxDispatchState {
×
361
                        mode,
362
                        consensus_block,
×
363
                        consensus_verifier: state.consensus_verifier,
×
364
                        header: block.header,
×
365
                        epoch,
366
                        round_results,
×
367
                        max_messages,
368
                        check_only: false,
369
                    },
370
                )
371
                .await
×
372
            }
373
            Body::RuntimeCheckTxBatchRequest {
×
374
                consensus_block,
375
                inputs,
376
                block,
377
                epoch,
378
                max_messages,
379
            } => {
380
                // Transaction check.
381
                self.dispatch_txn(
×
382
                    state.cache_set,
×
383
                    &state.txn_dispatcher,
×
384
                    &state.protocol,
385
                    Hash::default(),
×
386
                    inputs,
×
387
                    vec![],
×
388
                    TxDispatchState {
×
389
                        mode: ExecutionMode::Execute,
×
390
                        consensus_block,
×
391
                        consensus_verifier: state.consensus_verifier,
×
392
                        header: block.header,
×
393
                        epoch,
394
                        round_results: Default::default(),
×
395
                        max_messages,
396
                        check_only: true,
397
                    },
398
                )
399
                .await
×
400
            }
401
            Body::RuntimeQueryRequest {
×
402
                consensus_block,
403
                header,
404
                epoch,
405
                max_messages,
406
                method,
407
                args,
408
            } => {
409
                // Query.
410
                self.dispatch_query(
×
411
                    state.cache_set,
×
412
                    &state.txn_dispatcher,
×
413
                    &state.protocol,
414
                    method,
×
415
                    args,
×
416
                    TxDispatchState {
×
417
                        mode: ExecutionMode::Execute,
×
418
                        consensus_block,
×
419
                        consensus_verifier: state.consensus_verifier,
×
420
                        header,
421
                        epoch,
422
                        round_results: Default::default(),
×
423
                        max_messages,
424
                        check_only: true,
425
                    },
426
                )
427
                .await
×
428
            }
429

430
            // Other requests.
431
            Body::RuntimeKeyManagerStatusUpdateRequest { status } => {
×
432
                // Key manager status update local RPC call.
433
                self.handle_km_status_update(state, status).await
×
434
            }
435
            Body::RuntimeKeyManagerQuotePolicyUpdateRequest {
×
436
                policy: quote_policy,
437
            } => {
438
                // Key manager quote policy update local RPC call.
439
                self.handle_km_quote_policy_update(state, quote_policy)
×
440
                    .await
×
441
            }
442
            Body::RuntimeConsensusSyncRequest { height } => state
×
443
                .consensus_verifier
444
                .sync(height)
445
                .await
×
446
                .map_err(Into::into)
447
                .map(|_| Body::RuntimeConsensusSyncResponse {}),
×
448

449
            _ => {
450
                error!(self.logger, "Unsupported request type");
×
451
                Err(Error::new("dispatcher", 1, "Unsupported request type"))
×
452
            }
453
        }
454
    }
455

456
    #[allow(clippy::too_many_arguments)]
457
    async fn dispatch_query(
×
458
        &self,
459
        cache_set: cache::CacheSet,
460
        txn_dispatcher: &Arc<dyn TxnDispatcher>,
461
        protocol: &Arc<Protocol>,
462
        method: String,
463
        args: Vec<u8>,
464
        state: TxDispatchState,
465
    ) -> Result<Body, Error> {
466
        debug!(self.logger, "Received query request";
×
467
            "method" => &method,
×
468
            "state_root" => ?state.header.state_root,
469
            "round" => ?state.header.round,
470
        );
471

472
        // Verify that the runtime ID matches the block's namespace. This is a protocol violation
473
        // as the compute node should never change the runtime ID.
474
        if state.header.namespace != protocol.get_runtime_id() {
×
475
            return Err(Error::new(
×
476
                "dispatcher",
477
                1,
478
                &format!(
×
479
                    "block namespace does not match runtime id (namespace: {:?} runtime ID: {:?})",
480
                    state.header.namespace,
481
                    protocol.get_runtime_id(),
×
482
                ),
483
            ));
484
        }
485

486
        let protocol = protocol.clone();
×
487
        let txn_dispatcher = txn_dispatcher.clone();
×
488

489
        // For queries we don't do any consensus layer integrity verification by default and it
490
        // is up to the runtime to decide whether this is critical on a query-by-query basis.
491
        let consensus_state = state
×
492
            .consensus_verifier
493
            .unverified_state(state.consensus_block.clone())
×
494
            .await?;
×
495

496
        tokio::task::spawn_blocking(move || {
×
497
            let cache = cache_set.query(Root {
×
498
                namespace: state.header.namespace,
×
499
                version: state.header.round,
×
500
                root_type: RootType::State,
×
501
                hash: state.header.state_root,
×
502
            });
503
            let mut cache = cache.borrow_mut();
×
504
            let mut overlay = OverlayTree::new(cache.tree_mut());
×
505

506
            let txn_ctx = TxnContext::new(
×
507
                protocol,
×
508
                &state.consensus_block,
509
                consensus_state,
×
510
                &mut overlay,
511
                &state.header,
×
512
                state.epoch,
×
513
                &state.round_results,
×
514
                state.max_messages,
×
515
                state.check_only,
×
516
            );
517

518
            txn_dispatcher
×
519
                .query(txn_ctx, &method, args)
×
520
                .map(|data| Body::RuntimeQueryResponse { data })
×
521
        })
522
        .await?
×
523
    }
524

525
    fn txn_check_batch(
×
526
        &self,
527
        protocol: Arc<Protocol>,
528
        cache_set: cache::CacheSet,
529
        txn_dispatcher: &dyn TxnDispatcher,
530
        inputs: TxnBatch,
531
        state: TxDispatchState,
532
    ) -> Result<Body, Error> {
533
        // For check-only we don't do any consensus layer integrity verification.
534
        // TODO: Make this async.
535
        let consensus_state = block_on(
536
            state
×
537
                .consensus_verifier
538
                .unverified_state(state.consensus_block.clone()),
×
539
        )?;
540

541
        let mut cache = cache_set.check(Root {
×
542
            namespace: state.header.namespace,
×
543
            version: state.header.round,
×
544
            root_type: RootType::State,
×
545
            hash: state.header.state_root,
×
546
        });
547
        let mut overlay = OverlayTree::new(cache.tree_mut());
×
548

549
        let txn_ctx = TxnContext::new(
550
            protocol.clone(),
×
551
            &state.consensus_block,
552
            consensus_state,
×
553
            &mut overlay,
554
            &state.header,
×
555
            state.epoch,
×
556
            &state.round_results,
×
557
            state.max_messages,
×
558
            state.check_only,
×
559
        );
560
        let results = txn_dispatcher.check_batch(txn_ctx, &inputs);
×
561

562
        if protocol.get_config().persist_check_tx_state {
×
563
            // Commit results to in-memory tree so they persist for subsequent batches that are
564
            // based on the same block.
565
            let _ = overlay.commit().unwrap();
×
566
        }
567

568
        debug!(self.logger, "Transaction batch check complete");
×
569

570
        results.map(|results| Body::RuntimeCheckTxBatchResponse { results })
×
571
    }
572

573
    #[allow(clippy::too_many_arguments)]
574
    fn txn_execute_batch(
×
575
        &self,
576
        protocol: Arc<Protocol>,
577
        cache_set: cache::CacheSet,
578
        txn_dispatcher: &dyn TxnDispatcher,
579
        mut inputs: TxnBatch,
580
        in_msgs: Vec<roothash::IncomingMessage>,
581
        io_root: Hash,
582
        state: TxDispatchState,
583
    ) -> Result<Body, Error> {
584
        // Verify consensus state and runtime state root integrity before execution.
585
        // TODO: Make this async.
586
        let consensus_state = block_on(state.consensus_verifier.verify(
×
587
            state.consensus_block.clone(),
×
588
            state.header.clone(),
×
589
            state.epoch,
×
590
        ))?;
591
        // Ensure the runtime is still ready to process requests.
592
        protocol.ensure_initialized()?;
×
593

594
        let header = &state.header;
×
595

596
        let mut cache = cache_set.execute(Root {
×
597
            namespace: state.header.namespace,
×
598
            version: state.header.round,
×
599
            root_type: RootType::State,
×
600
            hash: state.header.state_root,
×
601
        });
602
        let mut overlay = OverlayTree::new(cache.tree_mut());
×
603

604
        let txn_ctx = TxnContext::new(
605
            protocol,
×
606
            &state.consensus_block,
607
            consensus_state,
×
608
            &mut overlay,
609
            header,
610
            state.epoch,
×
611
            &state.round_results,
×
612
            state.max_messages,
×
613
            state.check_only,
×
614
        );
615

616
        // Perform execution based on the passed mode.
617
        let mut results = match state.mode {
×
618
            ExecutionMode::Execute => {
619
                // Just execute the batch.
620
                txn_dispatcher.execute_batch(txn_ctx, &inputs, &in_msgs)?
×
621
            }
622
            ExecutionMode::Schedule => {
623
                // Allow the runtime to arbitrarily update the batch.
624
                txn_dispatcher.schedule_and_execute_batch(txn_ctx, &mut inputs, &in_msgs)?
×
625
            }
626
        };
627

628
        // Finalize state.
629
        let (state_write_log, new_state_root) = overlay
×
630
            .commit_both(header.namespace, header.round + 1)
×
631
            .expect("state commit must succeed");
632

633
        txn_dispatcher.finalize(new_state_root);
×
634
        cache.commit(header.round + 1, new_state_root);
×
635

636
        // Generate I/O root. Since we already fetched the inputs we avoid the need
637
        // to fetch them again by generating the previous I/O tree (generated by the
638
        // transaction scheduler) from the inputs.
639
        let mut txn_tree = TxnTree::new(
640
            Box::new(NoopReadSyncer),
×
641
            Root {
×
642
                namespace: header.namespace,
×
643
                version: header.round + 1,
×
644
                root_type: RootType::IO,
×
645
                hash: Hash::empty_hash(),
×
646
            },
647
        );
648
        let mut hashes = Vec::new();
×
649
        for (batch_order, input) in inputs.drain(..).enumerate() {
×
650
            hashes.push(Hash::digest_bytes(&input));
×
651
            txn_tree
×
652
                .add_input(input, batch_order.try_into().unwrap())
×
653
                .expect("add transaction must succeed");
654
        }
655

656
        let (input_write_log, input_io_root) = txn_tree.commit().expect("io commit must succeed");
×
657

658
        assert!(
×
659
            state.mode != ExecutionMode::Execute || input_io_root == io_root,
×
660
            "dispatcher: I/O root inconsistent with inputs (expected: {:?} got: {:?})",
661
            io_root,
662
            input_io_root
663
        );
664

665
        for (tx_hash, result) in hashes.iter().zip(results.results.drain(..)) {
×
666
            txn_tree
×
667
                .add_output(*tx_hash, result.output, result.tags)
×
668
                .expect("add transaction must succeed");
669
        }
670

671
        txn_tree
×
672
            .add_block_tags(results.block_tags)
×
673
            .expect("adding block tags must succeed");
674

675
        let (io_write_log, io_root) = txn_tree.commit().expect("io commit must succeed");
×
676

677
        let header = ComputeResultsHeader {
678
            round: header.round + 1,
×
679
            previous_hash: header.encoded_hash(),
×
680
            io_root: Some(io_root),
×
681
            state_root: Some(new_state_root),
×
682
            messages_hash: Some(roothash::Message::messages_hash(&results.messages)),
×
683
            in_msgs_hash: Some(roothash::IncomingMessage::in_messages_hash(
×
684
                &in_msgs[..results.in_msgs_count],
685
            )),
686
            in_msgs_count: results.in_msgs_count.try_into().unwrap(),
×
687
        };
688

689
        debug!(self.logger, "Transaction batch execution complete";
×
690
            "previous_hash" => ?header.previous_hash,
691
            "io_root" => ?header.io_root,
692
            "state_root" => ?header.state_root,
693
            "messages_hash" => ?header.messages_hash,
694
            "in_msgs_hash" => ?header.in_msgs_hash,
695
        );
696

697
        let rak_sig = self
×
698
            .identity
699
            .sign(
700
                COMPUTE_RESULTS_HEADER_SIGNATURE_CONTEXT,
701
                &cbor::to_vec(header.clone()),
×
702
            )
703
            .unwrap();
704

705
        Ok(Body::RuntimeExecuteTxBatchResponse {
×
706
            batch: ComputedBatch {
×
707
                header,
×
708
                io_write_log,
×
709
                state_write_log,
×
710
                rak_sig,
711
                messages: results.messages,
×
712
            },
713
            tx_hashes: hashes,
×
714
            tx_reject_hashes: results.tx_reject_hashes,
×
715
            tx_input_root: input_io_root,
×
716
            tx_input_write_log: input_write_log,
×
717
        })
718
    }
719

720
    #[allow(clippy::too_many_arguments)]
721
    async fn dispatch_txn(
×
722
        self: &Arc<Self>,
723
        cache_set: cache::CacheSet,
724
        txn_dispatcher: &Arc<dyn TxnDispatcher>,
725
        protocol: &Arc<Protocol>,
726
        io_root: Hash,
727
        inputs: TxnBatch,
728
        in_msgs: Vec<roothash::IncomingMessage>,
729
        state: TxDispatchState,
730
    ) -> Result<Body, Error> {
731
        // Make sure to abort the process on panic during transaction processing as that indicates
732
        // a serious problem and should make sure to clean up the process.
733
        let _guard = AbortOnPanic;
734

735
        debug!(self.logger, "Received transaction batch request";
×
736
            "state_root" => ?state.header.state_root,
737
            "round" => state.header.round + 1,
×
738
            "round_results" => ?state.round_results,
739
            "tx_count" => inputs.len(),
×
740
            "in_msg_count" => in_msgs.len(),
×
741
            "check_only" => state.check_only,
×
742
        );
743

744
        // Verify that the runtime ID matches the block's namespace. This is a protocol violation
745
        // as the compute node should never change the runtime ID.
746
        assert!(
×
747
            state.header.namespace == protocol.get_runtime_id(),
×
748
            "block namespace does not match runtime id (namespace: {:?} runtime ID: {:?})",
749
            state.header.namespace,
750
            protocol.get_runtime_id(),
×
751
        );
752

753
        let protocol = protocol.clone();
×
754
        let dispatcher = self.clone();
×
755
        let txn_dispatcher = txn_dispatcher.clone();
×
756

757
        tokio::task::spawn_blocking(move || {
×
758
            if state.check_only {
×
759
                dispatcher.txn_check_batch(protocol, cache_set, &txn_dispatcher, inputs, state)
×
760
            } else {
761
                dispatcher.txn_execute_batch(
×
762
                    protocol,
×
763
                    cache_set,
×
764
                    &txn_dispatcher,
×
765
                    inputs,
×
766
                    in_msgs,
×
767
                    io_root,
×
768
                    state,
×
769
                )
770
            }
771
        })
772
        .await
×
773
        .unwrap() // Propagate panics during transaction dispatch.
774
    }
775

776
    async fn dispatch_secure_rpc(
×
777
        &self,
778
        state: State,
779
        request: Vec<u8>,
780
        peer_id: Vec<u8>,
781
    ) -> Result<Body, Error> {
782
        // Make sure to abort the process on panic during RPC processing as that indicates a
783
        // serious problem and should make sure to clean up the process.
784
        let _guard = AbortOnPanic;
×
785

786
        // Process frame.
787
        let mut buffer = vec![];
×
788
        let (mut session, message) = state
×
789
            .rpc_demux
790
            .process_frame(peer_id, request, &mut buffer)
×
791
            .await?;
×
792

793
        if let Some(message) = message {
×
794
            // Dispatch request.
795
            assert!(
×
796
                buffer.is_empty(),
×
797
                "must have no handshake data in transport mode"
798
            );
799

800
            match message {
×
801
                RpcMessage::Request(req) => {
×
802
                    // Request, dispatch.
803
                    let response = self
×
804
                        .dispatch_rpc(req, RpcKind::NoiseSession, session.info(), &state)
×
805
                        .await?;
×
806
                    let response = RpcMessage::Response(response);
×
807

808
                    // Note: MKVS commit is omitted, this MUST be global side-effect free.
809

810
                    debug!(self.logger, "RPC call dispatch complete";
×
811
                        "kind" => ?RpcKind::NoiseSession,
812
                    );
813

814
                    let mut buffer = vec![];
×
815
                    session
×
816
                        .write_message(response, &mut buffer)
×
817
                        .map_err(|err| {
×
818
                            error!(self.logger, "Error while writing response"; "err" => %err);
×
819
                            Error::new("rhp/dispatcher", 1, &format!("{err}"))
×
820
                        })
821
                        .map(|_| Body::RuntimeRPCCallResponse { response: buffer })
×
822
                }
823
                RpcMessage::Close => {
824
                    // Session close.
825
                    let mut buffer = vec![];
×
826
                    state
×
827
                        .rpc_demux
828
                        .close(session, &mut buffer)
×
829
                        .map_err(|err| {
×
830
                            error!(self.logger, "Error while closing session"; "err" => %err);
×
831
                            Error::new("rhp/dispatcher", 1, &format!("{err}"))
×
832
                        })
833
                        .map(|_| Body::RuntimeRPCCallResponse { response: buffer })
×
834
                }
835
                msg => {
×
836
                    warn!(self.logger, "Ignoring invalid RPC message type"; "msg" => ?msg);
×
837
                    Err(Error::new("rhp/dispatcher", 1, "invalid RPC message type"))
×
838
                }
839
            }
840
        } else {
841
            // Send back any handshake frames.
842
            Ok(Body::RuntimeRPCCallResponse { response: buffer })
×
843
        }
844
    }
845

846
    async fn dispatch_insecure_rpc(&self, state: State, request: Vec<u8>) -> Result<Body, Error> {
×
847
        // Make sure to abort the process on panic during RPC processing as that indicates a
848
        // serious problem and should make sure to clean up the process.
849
        let _guard = AbortOnPanic;
×
850

851
        let request: RpcRequest = cbor::from_slice(&request)
×
852
            .map_err(|_| Error::new("rhp/dispatcher", 1, "malformed request"))?;
×
853

854
        // Request, dispatch.
855
        let response = self
×
856
            .dispatch_rpc(request, RpcKind::InsecureQuery, None, &state)
×
857
            .await?;
×
858
        let response = cbor::to_vec(response);
×
859

860
        // Note: MKVS commit is omitted, this MUST be global side-effect free.
861

862
        debug!(self.logger, "RPC call dispatch complete";
×
863
            "kind" => ?RpcKind::InsecureQuery,
864
        );
865

866
        Ok(Body::RuntimeRPCCallResponse { response })
×
867
    }
868

869
    async fn dispatch_local_rpc(&self, state: State, request: Vec<u8>) -> Result<Body, Error> {
×
870
        // Make sure to abort the process on panic during local RPC processing as that indicates a
871
        // serious problem and should make sure to clean up the process.
872
        let _guard = AbortOnPanic;
×
873

874
        let request = cbor::from_slice(&request)
×
875
            .map_err(|_| Error::new("rhp/dispatcher", 1, "malformed request"))?;
×
876

877
        // Request, dispatch.
878
        let response = self
×
879
            .dispatch_rpc(request, RpcKind::LocalQuery, None, &state)
×
880
            .await?;
×
881
        let response = RpcMessage::Response(response);
×
882
        let response = cbor::to_vec(response);
×
883

884
        debug!(self.logger, "RPC call dispatch complete";
×
885
            "kind" => ?RpcKind::LocalQuery,
886
        );
887

888
        Ok(Body::RuntimeLocalRPCCallResponse { response })
×
889
    }
890

891
    async fn dispatch_rpc(
×
892
        &self,
893
        request: RpcRequest,
894
        kind: RpcKind,
895
        session_info: Option<Arc<SessionInfo>>,
896
        state: &State,
897
    ) -> Result<RpcResponse, Error> {
898
        let identity = self.identity.clone();
×
899
        let protocol = state.protocol.clone();
×
900
        let consensus_verifier = state.consensus_verifier.clone();
×
901
        let rpc_dispatcher = state.rpc_dispatcher.clone();
×
902

UNCOV
903
        let response = tokio::task::spawn_blocking(move || {
×
904
            let untrusted_local = Arc::new(ProtocolUntrustedLocalStorage::new(protocol.clone()));
×
905
            let rpc_ctx =
NEW
906
                RpcContext::new(identity, session_info, consensus_verifier, &untrusted_local);
×
907

908
            rpc_dispatcher.dispatch(rpc_ctx, request, kind)
×
909
        })
910
        .await?;
×
911

912
        Ok(response)
×
913
    }
914

915
    async fn handle_km_status_update(
×
916
        &self,
917
        state: State,
918
        status: KeyManagerStatus,
919
    ) -> Result<Body, Error> {
920
        // Make sure to abort the process on panic during policy processing as that indicates a
921
        // serious problem and should make sure to clean up the process.
922
        let _guard = AbortOnPanic;
923

924
        debug!(self.logger, "Received km status update request");
×
925

926
        // Verify and decode the status.
927
        let runtime_id = state.protocol.get_host_info().runtime_id;
×
928

929
        tokio::task::spawn_blocking(move || -> Result<(), Error> {
×
930
            let key_manager = state.policy_verifier.key_manager(&runtime_id)?;
×
931
            let published_status = state
×
932
                .policy_verifier
933
                .verify_key_manager_status(status, key_manager)?;
×
934

935
            // Dispatch the local RPC call.
936
            state
×
937
                .rpc_dispatcher
938
                .handle_km_status_update(published_status);
×
939

940
            Ok(())
×
941
        })
942
        .await??;
×
943

944
        debug!(self.logger, "KM status update request complete");
×
945

946
        Ok(Body::RuntimeKeyManagerStatusUpdateResponse {})
×
947
    }
948

949
    async fn handle_km_quote_policy_update(
×
950
        &self,
951
        state: State,
952
        quote_policy: QuotePolicy,
953
    ) -> Result<Body, Error> {
954
        // Make sure to abort the process on panic during quote policy processing as that indicates
955
        // a serious problem and should make sure to clean up the process.
956
        let _guard = AbortOnPanic;
957

958
        debug!(self.logger, "Received km quote policy update request");
×
959

960
        // Verify and decode the policy.
961
        let runtime_id = state.protocol.get_host_info().runtime_id;
×
962

963
        tokio::task::spawn_blocking(move || -> Result<(), Error> {
×
964
            let key_manager = state.policy_verifier.key_manager(&runtime_id)?;
×
965
            let policy =
966
                state
×
967
                    .policy_verifier
968
                    .verify_quote_policy(quote_policy, &key_manager, None)?;
×
969

970
            // Dispatch the local RPC call.
971
            state.rpc_dispatcher.handle_km_quote_policy_update(policy);
×
972

973
            Ok(())
×
974
        })
975
        .await??;
×
976

977
        debug!(self.logger, "KM quote policy update request complete");
×
978

979
        Ok(Body::RuntimeKeyManagerQuotePolicyUpdateResponse {})
×
980
    }
981
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc