Skip to content

Commit 02978b5

Browse files
committed
runtime-sdk: Add support for incoming messages
1 parent e709852 commit 02978b5

24 files changed

Lines changed: 648 additions & 27 deletions

File tree

runtime-sdk/src/dispatcher.rs

Lines changed: 89 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ use crate::{
3131
error::{Error as _, RuntimeError},
3232
event::IntoTags,
3333
keymanager::{KeyManagerClient, KeyManagerError},
34-
module::{self, BlockHandler, MethodHandler, TransactionHandler},
34+
module::{self, BlockHandler, InMsgHandler, InMsgResult, MethodHandler, TransactionHandler},
3535
modules,
3636
modules::core::API as _,
3737
runtime::Runtime,
@@ -537,7 +537,7 @@ impl<R: Runtime> Dispatcher<R> {
537537
messages,
538538
block_tags: block_tags.into_tags(),
539539
tx_reject_hashes: vec![],
540-
in_msgs_count: 0, // TODO: Support processing incoming messages.
540+
in_msgs_count: 0,
541541
})
542542
}
543543
}
@@ -547,17 +547,63 @@ impl<R: Runtime + Send + Sync> transaction::dispatcher::Dispatcher for Dispatche
547547
&self,
548548
rt_ctx: transaction::Context<'_>,
549549
batch: &TxnBatch,
550-
_in_msgs: &[roothash::IncomingMessage],
550+
in_msgs: &[roothash::IncomingMessage],
551551
) -> Result<ExecuteBatchResult, RuntimeError> {
552-
self.execute_batch_common(
552+
let mut in_msgs_count = 0;
553+
554+
let mut result = self.execute_batch_common(
553555
rt_ctx,
554556
|ctx| -> Result<Vec<ExecuteTxResult>, RuntimeError> {
555557
// If prefetch limit is set enable prefetch.
556558
let prefetch_enabled = R::PREFETCH_LIMIT > 0;
559+
let mut results = Vec::with_capacity(batch.len());
560+
561+
// Process incoming messages first.
562+
let mut batch_it = batch.iter();
563+
'inmsg: for in_msg in in_msgs {
564+
match R::IncomingMessagesHandler::process_in_msg(ctx, &in_msg) {
565+
InMsgResult::Skip => {
566+
// Skip, but treat as processed.
567+
in_msgs_count += 1;
568+
}
569+
InMsgResult::Execute(raw_tx, tx) => {
570+
// Verify that the transaction has been included in the batch.
571+
match batch_it.next() {
572+
None => {
573+
// Nothing in the batch when there should be an incoming message.
574+
return Err(Error::MalformedTransactionInBatch(anyhow!(
575+
"missing incoming message"
576+
))
577+
.into());
578+
}
579+
Some(batch_tx) if batch_tx != raw_tx => {
580+
// Incoming message does not match what is in the batch.
581+
return Err(Error::MalformedTransactionInBatch(anyhow!(
582+
"mismatched incoming message"
583+
))
584+
.into());
585+
}
586+
_ => {
587+
// Everything is ok.
588+
}
589+
}
590+
591+
// Further execute the inner transaction. The transaction has already
592+
// passed checks so it is ok to include in a block.
593+
let tx_size = raw_tx.len().try_into().unwrap();
594+
let index = results.len();
595+
results.push(Self::execute_tx(ctx, tx_size, tx, index)?);
557596

597+
in_msgs_count += 1;
598+
}
599+
InMsgResult::Stop => break 'inmsg,
600+
}
601+
}
602+
603+
let inmsg_txs = results.len();
558604
let mut txs = Vec::with_capacity(batch.len());
559605
let mut prefixes: BTreeSet<Prefix> = BTreeSet::new();
560-
for tx in batch.iter() {
606+
for tx in batch.iter().skip(inmsg_txs) {
561607
let tx_size = tx.len().try_into().map_err(|_| {
562608
Error::MalformedTransactionInBatch(anyhow!("transaction too large"))
563609
})?;
@@ -580,24 +626,29 @@ impl<R: Runtime + Send + Sync> transaction::dispatcher::Dispatcher for Dispatche
580626
}
581627

582628
// Execute the batch.
583-
let mut results = Vec::with_capacity(batch.len());
584-
for (index, (tx_size, tx)) in txs.into_iter().enumerate() {
629+
for (index, (tx_size, tx)) in txs.into_iter().skip(inmsg_txs).enumerate() {
585630
results.push(Self::execute_tx(ctx, tx_size, tx, index)?);
586631
}
587632

588633
Ok(results)
589634
},
590-
)
635+
)?;
636+
637+
// Include number of processed incoming messages in the final result.
638+
result.in_msgs_count = in_msgs_count;
639+
640+
Ok(result)
591641
}
592642

593643
fn schedule_and_execute_batch(
594644
&self,
595645
rt_ctx: transaction::Context<'_>,
596646
batch: &mut TxnBatch,
597-
_in_msgs: &[roothash::IncomingMessage],
647+
in_msgs: &[roothash::IncomingMessage],
598648
) -> Result<ExecuteBatchResult, RuntimeError> {
599649
let cfg = R::SCHEDULE_CONTROL;
600650
let mut tx_reject_hashes = Vec::new();
651+
let mut in_msgs_count = 0;
601652

602653
let mut result = self.execute_batch_common(
603654
rt_ctx,
@@ -607,13 +658,35 @@ impl<R: Runtime + Send + Sync> transaction::dispatcher::Dispatcher for Dispatche
607658
// The idea is to keep scheduling transactions as long as we have some space
608659
// available in the block as determined by gas use.
609660
let mut new_batch = Vec::new();
610-
let mut results = Vec::with_capacity(batch.len());
661+
let mut results = Vec::with_capacity(in_msgs.len() + batch.len());
611662
let mut requested_batch_len = cfg.initial_batch_size;
663+
664+
// Process incoming messages first.
665+
'inmsg: for in_msg in in_msgs {
666+
match R::IncomingMessagesHandler::process_in_msg(ctx, &in_msg) {
667+
InMsgResult::Skip => {
668+
// Skip, but treat as processed.
669+
in_msgs_count += 1;
670+
}
671+
InMsgResult::Execute(raw_tx, tx) => {
672+
// Further execute the inner transaction. The transaction has already
673+
// passed checks so it is ok to include in a block.
674+
let tx_size = raw_tx.len().try_into().unwrap();
675+
let index = new_batch.len();
676+
new_batch.push(raw_tx.to_owned());
677+
results.push(Self::execute_tx(ctx, tx_size, tx, index)?);
678+
679+
in_msgs_count += 1;
680+
}
681+
InMsgResult::Stop => break 'inmsg,
682+
}
683+
}
684+
685+
// Process regular transactions.
612686
'batch: loop {
613687
// Remember length of last batch.
614688
let last_batch_len = batch.len();
615689
let last_batch_tx_hash = batch.last().map(|raw_tx| Hash::digest_bytes(raw_tx));
616-
617690
for raw_tx in batch.drain(..) {
618691
// If we don't have enough gas for processing even the cheapest transaction
619692
// we are done. Same if we reached the runtime-imposed maximum tx count.
@@ -689,8 +762,10 @@ impl<R: Runtime + Send + Sync> transaction::dispatcher::Dispatcher for Dispatche
689762
},
690763
)?;
691764

692-
// Include rejected transaction hashes in the final result.
765+
// Include rejected transaction hashes and number of processed incoming messages in the
766+
// final result.
693767
result.tx_reject_hashes = tx_reject_hashes;
768+
result.in_msgs_count = in_msgs_count;
694769

695770
Ok(result)
696771
}
@@ -877,6 +952,7 @@ mod test {
877952
core::Genesis {
878953
parameters: core::Parameters {
879954
max_batch_gas: u64::MAX,
955+
max_inmsg_gas: 0,
880956
max_tx_size: 32 * 1024,
881957
max_tx_signers: 1,
882958
max_multisig_signers: 8,
@@ -885,6 +961,7 @@ mod test {
885961
auth_signature: 0,
886962
auth_multisig_signer: 0,
887963
callformat_x25519_deoxysii: 0,
964+
inmsg_base: 0,
888965
},
889966
min_gas_price: BTreeMap::from([(token::Denomination::NATIVE, 0)]),
890967
},

runtime-sdk/src/error.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,24 @@ impl Error for std::convert::Infallible {
6868
}
6969
}
7070

71+
/// A standardized serialized implementation for an error.
72+
#[derive(Debug, Default, Clone, cbor::Encode, cbor::Decode)]
73+
pub struct SerializableError {
74+
pub module: String,
75+
pub code: u32,
76+
pub message: String,
77+
}
78+
79+
impl<E: Error> From<E> for SerializableError {
80+
fn from(e: E) -> Self {
81+
Self {
82+
module: e.module_name().to_owned(),
83+
code: e.code(),
84+
message: e.to_string(),
85+
}
86+
}
87+
}
88+
7189
#[cfg(test)]
7290
mod test {
7391
use super::*;

runtime-sdk/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#![deny(rust_2018_idioms, unreachable_pub)]
44
#![forbid(unsafe_code)]
55
#![feature(int_log)]
6+
#![feature(associated_type_defaults)]
67

78
pub mod callformat;
89
pub mod config;

runtime-sdk/src/module.rs

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use impl_trait_for_tuples::impl_for_tuples;
99

1010
use crate::{
1111
context::{Context, TxContext},
12+
core::consensus::roothash,
1213
dispatcher, error,
1314
error::Error as _,
1415
event, modules,
@@ -565,6 +566,39 @@ impl ModuleInfoHandler for Tuple {
565566
}
566567
}
567568

569+
/// Incoming message handler.
570+
pub trait InMsgHandler {
571+
/// Process an incoming message.
572+
fn process_in_msg<'a, C: Context>(
573+
ctx: &mut C,
574+
in_msg: &'a roothash::IncomingMessage,
575+
) -> InMsgResult<'a>;
576+
}
577+
578+
/// Result of processing an incoming message.
579+
#[derive(Debug)]
580+
pub enum InMsgResult<'a> {
581+
/// Skip to next incoming message, but count as processed.
582+
Skip,
583+
/// Add to batch/verify inclusion and execute.
584+
Execute(&'a [u8], Transaction),
585+
/// Stop processing incoming messages.
586+
Stop,
587+
}
588+
589+
/// An incoming message handler which discards all incoming messages.
590+
pub struct InMsgDiscard;
591+
592+
impl InMsgHandler for InMsgDiscard {
593+
fn process_in_msg<'a, C: Context>(
594+
_ctx: &mut C,
595+
_in_msg: &'a roothash::IncomingMessage,
596+
) -> InMsgResult<'a> {
597+
// Just skip all messages without doing anything.
598+
InMsgResult::Skip
599+
}
600+
}
601+
568602
/// A runtime module.
569603
pub trait Module {
570604
/// Module name.
@@ -591,6 +625,11 @@ pub trait Module {
591625

592626
/// Set the module's parameters.
593627
fn set_params<S: Store>(store: S, params: Self::Parameters) {
628+
params
629+
.validate_basic()
630+
.map_err(|_| ())
631+
.expect("module parameters are invalid");
632+
594633
let store = storage::PrefixStore::new(store, &Self::NAME);
595634
let mut store = storage::TypedStore::new(store);
596635
store.insert(Self::Parameters::STORE_KEY, params);

runtime-sdk/src/modules/consensus/mod.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
//! Consensus module.
22
//!
33
//! Low level consensus module for communicating with the consensus layer.
4-
use std::str::FromStr;
5-
64
use thiserror::Error;
75

86
use oasis_core_runtime::{
@@ -44,7 +42,7 @@ pub struct Parameters {
4442
impl Default for Parameters {
4543
fn default() -> Self {
4644
Self {
47-
consensus_denomination: token::Denomination::from_str("TEST").unwrap(),
45+
consensus_denomination: "TEST".parse().unwrap(),
4846
consensus_scaling_factor: 1,
4947
}
5048
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
use crate::modules;
2+
3+
/// Incoming message handler configuration.
4+
pub trait Config: 'static {
5+
/// The accounts module to use.
6+
type Accounts: modules::accounts::API;
7+
/// The consensus module to use.
8+
type Consensus: modules::consensus::API;
9+
10+
/// Maximum number of outgoing consensus message slots that an incoming message can claim.
11+
///
12+
/// When this is configured to be greater than zero it allows incoming messages to also emit
13+
/// consensus messages as a result of executing a transaction.
14+
const MAX_CONSENSUS_MSG_SLOTS_PER_TX: u32 = 1;
15+
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
use super::MODULE_NAME;
2+
use crate::error;
3+
4+
/// Events emitted by the consensus incoming message handler module.
5+
#[derive(Debug, cbor::Encode, oasis_runtime_sdk_macros::Event)]
6+
#[cbor(untagged)]
7+
pub enum Event {
8+
#[sdk_event(code = 1)]
9+
Processed {
10+
id: u64,
11+
#[cbor(optional)]
12+
tag: u64,
13+
#[cbor(optional)]
14+
error: Option<error::SerializableError>,
15+
},
16+
}

0 commit comments

Comments
 (0)