Skip to content

Commit

Permalink
Merge branch 'feature/move-blocks-to-importer' into feature/move-offc…
Browse files Browse the repository at this point in the history
…hain-logic-from-executor
  • Loading branch information
xgreenx authored Jan 5, 2024
2 parents 26f268d + c1ee5f8 commit 4b36869
Show file tree
Hide file tree
Showing 7 changed files with 64 additions and 49 deletions.
6 changes: 3 additions & 3 deletions crates/fuel-core/src/service/adapters/block_importer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,11 @@ impl ImporterDatabase for Database {
}

impl ExecutorDatabase for Database {
fn store_block(
fn store_new_block(
&mut self,
chain_id: &ChainId,
block: &SealedBlock,
) -> StorageResult<Option<()>> {
) -> StorageResult<bool> {
let block_id = block.entity.id();
let mut found = self
.storage::<FuelBlocks>()
Expand All @@ -148,7 +148,7 @@ impl ExecutorDatabase for Database {
.insert(&tx.id(chain_id), tx)?
.is_some();
}
Ok(found.then_some(()))
Ok(!found)
}
}

Expand Down
29 changes: 16 additions & 13 deletions crates/fuel-core/src/service/adapters/consensus_module/poa.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use fuel_core_poa::{
BlockImporter,
P2pPort,
TransactionPool,
TransactionsSource,
},
service::{
Mode,
Expand All @@ -25,10 +26,7 @@ use fuel_core_services::stream::BoxStream;
use fuel_core_storage::transactional::StorageTransaction;
use fuel_core_types::{
fuel_asm::Word,
fuel_tx::{
Transaction,
TxId,
},
fuel_tx::TxId,
fuel_types::BlockHeight,
services::{
block_importer::{
Expand Down Expand Up @@ -104,17 +102,22 @@ impl fuel_core_poa::ports::BlockProducer for BlockProducerAdapter {
&self,
height: BlockHeight,
block_time: Tai64,
txs: Option<Vec<Transaction>>,
source: TransactionsSource,
max_gas: Word,
) -> anyhow::Result<UncommittedResult<StorageTransaction<Database>>> {
if let Some(txs) = txs {
self.block_producer
.produce_and_execute_block_transactions(height, block_time, txs, max_gas)
.await
} else {
self.block_producer
.produce_and_execute_block_txpool(height, block_time, max_gas)
.await
match source {
TransactionsSource::TxPool => {
self.block_producer
.produce_and_execute_block_txpool(height, block_time, max_gas)
.await
}
TransactionsSource::SpecificTransactions(txs) => {
self.block_producer
.produce_and_execute_block_transactions(
height, block_time, txs, max_gas,
)
.await
}
}
}
}
Expand Down
10 changes: 9 additions & 1 deletion crates/services/consensus_module/poa/src/ports.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,14 @@ pub trait TransactionPool: Send + Sync {
#[cfg(test)]
use fuel_core_storage::test_helpers::EmptyStorage;

/// The source of transactions for the block.
pub enum TransactionsSource {
/// The source of transactions for the block is the `TxPool`.
TxPool,
/// Use specific transactions for the block.
SpecificTransactions(Vec<Transaction>),
}

#[cfg_attr(test, mockall::automock(type Database=EmptyStorage;))]
#[async_trait::async_trait]
pub trait BlockProducer: Send + Sync {
Expand All @@ -52,7 +60,7 @@ pub trait BlockProducer: Send + Sync {
&self,
height: BlockHeight,
block_time: Tai64,
txs: Option<Vec<Transaction>>,
source: TransactionsSource,
max_gas: Word,
) -> anyhow::Result<UncommittedExecutionResult<StorageTransaction<Self::Database>>>;
}
Expand Down
15 changes: 8 additions & 7 deletions crates/services/consensus_module/poa/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::{
BlockProducer,
P2pPort,
TransactionPool,
TransactionsSource,
},
sync::{
SyncState,
Expand Down Expand Up @@ -255,18 +256,18 @@ where
&self,
height: BlockHeight,
block_time: Tai64,
txs: Option<Vec<Transaction>>,
source: TransactionsSource,
) -> anyhow::Result<UncommittedExecutionResult<StorageTransaction<D>>> {
self.block_producer
.produce_and_execute_block(height, block_time, txs, self.block_gas_limit)
.produce_and_execute_block(height, block_time, source, self.block_gas_limit)
.await
}

pub(crate) async fn produce_next_block(&mut self) -> anyhow::Result<()> {
self.produce_block(
self.next_height(),
self.next_time(RequestType::Trigger)?,
None,
TransactionsSource::TxPool,
RequestType::Trigger,
)
.await
Expand All @@ -287,7 +288,7 @@ where
self.produce_block(
self.next_height(),
block_time,
None,
TransactionsSource::TxPool,
RequestType::Manual,
)
.await?;
Expand All @@ -298,7 +299,7 @@ where
self.produce_block(
self.next_height(),
block_time,
Some(txs),
TransactionsSource::SpecificTransactions(txs),
RequestType::Manual,
)
.await?;
Expand All @@ -311,7 +312,7 @@ where
&mut self,
height: BlockHeight,
block_time: Tai64,
txs: Option<Vec<Transaction>>,
source: TransactionsSource,
request_type: RequestType,
) -> anyhow::Result<()> {
let last_block_created = Instant::now();
Expand All @@ -333,7 +334,7 @@ where
},
db_transaction,
) = self
.signal_produce_block(height, block_time, txs)
.signal_produce_block(height, block_time, source)
.await?
.into();

Expand Down
6 changes: 3 additions & 3 deletions crates/services/importer/src/importer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,9 +282,9 @@ where
))
}

db_after_execution
.store_block(&self.chain_id, &result.sealed_block)?
.should_be_unique(&expected_next_height)?;
if !db_after_execution.store_new_block(&self.chain_id, &result.sealed_block)? {
return Err(Error::NotUnique(expected_next_height))
}

// Update the total tx count in chain metadata
let total_txs = db_after_execution
Expand Down
39 changes: 20 additions & 19 deletions crates/services/importer/src/importer/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,11 @@ mockall::mock! {
}

impl ExecutorDatabase for Database {
fn store_block(
fn store_new_block(
&mut self,
chain_id: &ChainId,
block: &SealedBlock,
) -> StorageResult<Option<()>>;
) -> StorageResult<bool>;
}

impl TransactionTrait<MockDatabase> for Database {
Expand Down Expand Up @@ -129,15 +129,16 @@ fn executor_db<H, B>(
) -> impl Fn() -> MockDatabase
where
H: Fn() -> StorageResult<Option<u32>> + Send + Clone + 'static,
B: Fn() -> StorageResult<Option<()>> + Send + Clone + 'static,
B: Fn() -> StorageResult<bool> + Send + Clone + 'static,
{
move || {
let height = height.clone();
let store_block = store_block.clone();
let mut db = MockDatabase::default();
db.expect_latest_block_height()
.returning(move || height().map(|v| v.map(Into::into)));
db.expect_store_block().returning(move |_, _| store_block());
db.expect_store_new_block()
.returning(move |_, _| store_block());
db.expect_commit().times(commits).returning(|| Ok(()));
db.expect_increase_tx_count().returning(Ok);
db
Expand Down Expand Up @@ -221,42 +222,42 @@ where
#[test_case(
genesis(0),
underlying_db(ok(None)),
executor_db(ok(None), ok(None), 1)
executor_db(ok(None), ok(true), 1)
=> Ok(());
"successfully imports genesis block when latest block not found"
)]
#[test_case(
genesis(113),
underlying_db(ok(None)),
executor_db(ok(None), ok(None), 1)
executor_db(ok(None), ok(true), 1)
=> Ok(());
"successfully imports block at arbitrary height when executor db expects it and last block not found"
)]
#[test_case(
genesis(0),
underlying_db(storage_failure),
executor_db(ok(Some(0)), ok(None), 0)
executor_db(ok(Some(0)), ok(true), 0)
=> Err(storage_failure_error());
"fails to import genesis when underlying database fails"
)]
#[test_case(
genesis(0),
underlying_db(ok(Some(0))),
executor_db(ok(Some(0)), ok(None), 0)
executor_db(ok(Some(0)), ok(true), 0)
=> Err(Error::InvalidUnderlyingDatabaseGenesisState);
"fails to import genesis block when already exists"
)]
#[test_case(
genesis(1),
underlying_db(ok(None)),
executor_db(ok(Some(0)), ok(None), 0)
executor_db(ok(Some(0)), ok(true), 0)
=> Err(Error::InvalidDatabaseStateAfterExecution(None, Some(0u32.into())));
"fails to import genesis block when next height is not 0"
)]
#[test_case(
genesis(0),
underlying_db(ok(None)),
executor_db(ok(None), ok(Some(())), 0)
executor_db(ok(None), ok(false), 0)
=> Err(Error::NotUnique(0u32.into()));
"fails to import genesis block when block exists for height 0"
)]
Expand All @@ -273,56 +274,56 @@ async fn commit_result_genesis(
#[test_case(
poa_block(1),
underlying_db(ok(Some(0))),
executor_db(ok(Some(0)), ok(None), 1)
executor_db(ok(Some(0)), ok(true), 1)
=> Ok(());
"successfully imports block at height 1 when latest block is genesis"
)]
#[test_case(
poa_block(113),
underlying_db(ok(Some(112))),
executor_db(ok(Some(112)), ok(None), 1)
executor_db(ok(Some(112)), ok(true), 1)
=> Ok(());
"successfully imports block at arbitrary height when latest block height is one fewer and executor db expects it"
)]
#[test_case(
poa_block(0),
underlying_db(ok(Some(0))),
executor_db(ok(Some(1)), ok(None), 0)
executor_db(ok(Some(1)), ok(true), 0)
=> Err(Error::ZeroNonGenericHeight);
"fails to import PoA block with height 0"
)]
#[test_case(
poa_block(113),
underlying_db(ok(Some(111))),
executor_db(ok(Some(113)), ok(None), 0)
executor_db(ok(Some(113)), ok(true), 0)
=> Err(Error::IncorrectBlockHeight(112u32.into(), 113u32.into()));
"fails to import block at height 113 when latest block height is 111"
)]
#[test_case(
poa_block(113),
underlying_db(ok(Some(114))),
executor_db(ok(Some(113)), ok(None), 0)
executor_db(ok(Some(113)), ok(true), 0)
=> Err(Error::IncorrectBlockHeight(115u32.into(), 113u32.into()));
"fails to import block at height 113 when latest block height is 114"
)]
#[test_case(
poa_block(113),
underlying_db(ok(Some(112))),
executor_db(ok(Some(114)), ok(None), 0)
executor_db(ok(Some(114)), ok(true), 0)
=> Err(Error::InvalidDatabaseStateAfterExecution(Some(112u32.into()), Some(114u32.into())));
"fails to import block 113 when executor db expects height 114"
)]
#[test_case(
poa_block(113),
underlying_db(ok(Some(112))),
executor_db(storage_failure, ok(None), 0)
executor_db(storage_failure, ok(true), 0)
=> Err(storage_failure_error());
"fails to import block when executor db fails to find latest block"
)]
#[test_case(
poa_block(113),
underlying_db(ok(Some(112))),
executor_db(ok(Some(112)), ok(Some(())), 0)
executor_db(ok(Some(112)), ok(false), 0)
=> Err(Error::NotUnique(113u32.into()));
"fails to import block when block exists"
)]
Expand Down Expand Up @@ -520,7 +521,7 @@ where
underlying_db(ok(Some(previous_height)))(),
executor(
block_after_execution,
executor_db(ok(Some(previous_height)), ok(None), commits)(),
executor_db(ok(Some(previous_height)), ok(true), commits)(),
),
verifier(verifier_result),
)
Expand Down
8 changes: 5 additions & 3 deletions crates/services/importer/src/ports.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,16 @@ pub trait ImporterDatabase: Send + Sync {

/// The port for returned database from the executor.
pub trait ExecutorDatabase: ImporterDatabase {
/// Inserts the `SealedBlock` under the `block_id`.
/// Inserts the `SealedBlock`.
///
/// The method returns `true` if the block is a new, otherwise `false`.
// TODO: Remove `chain_id` from the signature, but for that transactions inside
// the block should have `cached_id`. We need to guarantee that from the Rust-type system.
fn store_block(
fn store_new_block(
&mut self,
chain_id: &ChainId,
block: &SealedBlock,
) -> StorageResult<Option<()>>;
) -> StorageResult<bool>;
}

#[cfg_attr(test, mockall::automock)]
Expand Down

0 comments on commit 4b36869

Please sign in to comment.