diff --git a/coprocessor/fhevm-engine/host-listener/src/cmd/mod.rs b/coprocessor/fhevm-engine/host-listener/src/cmd/mod.rs index 5c5c5ff19..acc88046e 100644 --- a/coprocessor/fhevm-engine/host-listener/src/cmd/mod.rs +++ b/coprocessor/fhevm-engine/host-listener/src/cmd/mod.rs @@ -922,7 +922,15 @@ async fn db_insert_block_no_retry( is_allowed, ..tfhe_log }; - db.insert_tfhe_event(&mut tx, &tfhe_log).await?; + let inserted = db.insert_tfhe_event(&mut tx, &tfhe_log).await?; + if block_logs.catchup && inserted { + warn!( + tfhe_log = ?tfhe_log, + block = ?block_logs.summary, + nb_events = block_logs.logs.len(), + "Missed event detected by catchup", + ); + } } db.mark_block_as_valid(&mut tx, &block_logs.summary).await?; tx.commit().await diff --git a/coprocessor/fhevm-engine/host-listener/src/database/tfhe_event_propagate.rs b/coprocessor/fhevm-engine/host-listener/src/database/tfhe_event_propagate.rs index e4f43a21a..60af242b3 100644 --- a/coprocessor/fhevm-engine/host-listener/src/database/tfhe_event_propagate.rs +++ b/coprocessor/fhevm-engine/host-listener/src/database/tfhe_event_propagate.rs @@ -200,7 +200,7 @@ impl Database { fhe_operation: FheOperation, scalar_byte: &FixedBytes<1>, log: &LogTfhe, - ) -> Result<(), SqlxError> { + ) -> Result { let bucket = self .sort_computation_into_bucket( result, @@ -236,7 +236,7 @@ impl Database { fhe_operation: FheOperation, scalar_byte: &FixedBytes<1>, log: &LogTfhe, - ) -> Result<(), SqlxError> { + ) -> Result { let bucket = self .sort_computation_into_bucket( result, @@ -270,7 +270,7 @@ impl Database { scalar_byte: &FixedBytes<1>, log: &LogTfhe, bucket: &Handle, - ) -> Result<(), SqlxError> { + ) -> Result { let is_scalar = !scalar_byte.is_zero(); let output_handle = result.to_vec(); let query = sqlx::query!( @@ -297,7 +297,10 @@ impl Database { log.transaction_hash.map(|txh| txh.to_vec()), log.is_allowed, ); - query.execute(tx.deref_mut()).await.map(|_| ()) + query + .execute(tx.deref_mut()) + .await + .map(|result| result.rows_affected() > 0) } async fn sort_computation_into_bucket( @@ -353,7 +356,7 @@ impl Database { &self, tx: &mut Transaction<'_>, log: &LogTfhe, - ) -> Result<(), SqlxError> { + ) -> Result { use TfheContract as C; use TfheContractEvents as E; const HAS_SCALAR : FixedBytes::<1> = FixedBytes([1]); // if any dependency is a scalar. @@ -437,7 +440,7 @@ impl Database { | E::Initialized(_) | E::Upgraded(_) | E::VerifyInput(_) - => Ok(()), + => Ok(false), } } @@ -482,7 +485,7 @@ impl Database { event: &Log, transaction_hash: &Option, block_number: &Option, - ) -> Result<(), SqlxError> { + ) -> Result { let data = &event.data; let transaction_hash = transaction_hash.map(|h| h.to_vec()); @@ -498,26 +501,28 @@ impl Database { self.record_transaction_begin(&transaction_hash, block_number) .await; } - + let mut inserted = false; match data { AclContractEvents::Allowed(allowed) => { let handle = allowed.handle.to_vec(); - self.insert_allowed_handle( - tx, - handle.clone(), - allowed.account.to_string(), - AllowEvents::AllowedAccount, - transaction_hash.clone(), - ) - .await?; - - self.insert_pbs_computations( - tx, - &vec![handle], - transaction_hash, - ) - .await?; + inserted |= self + .insert_allowed_handle( + tx, + handle.clone(), + allowed.account.to_string(), + AllowEvents::AllowedAccount, + transaction_hash.clone(), + ) + .await?; + + inserted |= self + .insert_pbs_computations( + tx, + &vec![handle], + transaction_hash, + ) + .await?; } AclContractEvents::AllowedForDecryption(allowed_for_decryption) => { let handles = allowed_for_decryption @@ -532,22 +537,24 @@ impl Database { "Allowed for public decryption" ); - self.insert_allowed_handle( + inserted |= self + .insert_allowed_handle( + tx, + handle, + "".to_string(), + AllowEvents::AllowedForDecryption, + transaction_hash.clone(), + ) + .await?; + } + + inserted |= self + .insert_pbs_computations( tx, - handle, - "".to_string(), - AllowEvents::AllowedForDecryption, + &handles, transaction_hash.clone(), ) .await?; - } - - self.insert_pbs_computations( - tx, - &handles, - transaction_hash.clone(), - ) - .await?; } AclContractEvents::Initialized(initialized) => { warn!(event = ?initialized, "unhandled Acl::Initialized event"); @@ -612,7 +619,7 @@ impl Database { } } self.tick.update(); - Ok(()) + Ok(inserted) } /// Adds handles to the pbs_computations table and alerts the SnS worker @@ -622,8 +629,9 @@ impl Database { tx: &mut Transaction<'_>, handles: &Vec>, transaction_id: Option>, - ) -> Result<(), SqlxError> { + ) -> Result { let tenant_id = self.tenant_id; + let mut inserted = false; for handle in handles { let query = sqlx::query!( "INSERT INTO pbs_computations(tenant_id, handle, transaction_id) VALUES($1, $2, $3) @@ -632,9 +640,10 @@ impl Database { handle, transaction_id ); - query.execute(tx.deref_mut()).await?; + inserted |= + query.execute(tx.deref_mut()).await?.rows_affected() > 0; } - Ok(()) + Ok(inserted) } /// Add the handle to the allowed_handles table @@ -645,7 +654,7 @@ impl Database { account_address: String, event_type: AllowEvents, transaction_id: Option>, - ) -> Result<(), SqlxError> { + ) -> Result { let tenant_id = self.tenant_id; let query = sqlx::query!( "INSERT INTO allowed_handles(tenant_id, handle, account_address, event_type, transaction_id) VALUES($1, $2, $3, $4, $5) @@ -656,8 +665,8 @@ impl Database { event_type as i16, transaction_id ); - query.execute(tx.deref_mut()).await?; - Ok(()) + let inserted = query.execute(tx.deref_mut()).await?.rows_affected() > 0; + Ok(inserted) } async fn record_transaction_begin( diff --git a/coprocessor/fhevm-engine/tfhe-worker/src/tests/operators_from_events.rs b/coprocessor/fhevm-engine/tfhe-worker/src/tests/operators_from_events.rs index ac36e7da0..8f5b464bf 100644 --- a/coprocessor/fhevm-engine/tfhe-worker/src/tests/operators_from_events.rs +++ b/coprocessor/fhevm-engine/tfhe-worker/src/tests/operators_from_events.rs @@ -36,7 +36,7 @@ async fn insert_tfhe_event( tx: &mut Transaction<'_>, log: alloy::rpc::types::Log, is_allowed: bool, -) -> Result<(), sqlx::Error> { +) -> Result { let event = LogTfhe { event: log.inner, transaction_hash: log.transaction_hash, @@ -50,7 +50,7 @@ pub async fn allow_handle( db: &ListenerDatabase, tx: &mut Transaction<'_>, handle: &[u8], -) -> Result<(), sqlx::Error> { +) -> Result { let account_address = String::new(); let event_type = AllowEvents::AllowedForDecryption; db.insert_allowed_handle(tx, handle.to_owned(), account_address, event_type, None)