Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion coprocessor/fhevm-engine/host-listener/src/cmd/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -922,7 +922,15 @@ async fn db_insert_block_no_retry(
is_allowed,
..tfhe_log
};
db.insert_tfhe_event(&mut tx, &tfhe_log).await?;
let inserted = db.insert_tfhe_event(&mut tx, &tfhe_log).await?;
if block_logs.catchup && inserted {
Copy link
Collaborator

@obatirou obatirou Nov 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be done only once outside the loop tfhe_log in tfhe_event_log?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Depends if you want a log for each event (but maybe change the field and message to be event specific)

warn!(
tfhe_log = ?tfhe_log,
block = ?block_logs.summary,
nb_events = block_logs.logs.len(),
"Missed event detected by catchup",
);
}
}
db.mark_block_as_valid(&mut tx, &block_logs.summary).await?;
tx.commit().await
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ impl Database {
fhe_operation: FheOperation,
scalar_byte: &FixedBytes<1>,
log: &LogTfhe,
) -> Result<(), SqlxError> {
) -> Result<bool, SqlxError> {
let bucket = self
.sort_computation_into_bucket(
result,
Expand Down Expand Up @@ -236,7 +236,7 @@ impl Database {
fhe_operation: FheOperation,
scalar_byte: &FixedBytes<1>,
log: &LogTfhe,
) -> Result<(), SqlxError> {
) -> Result<bool, SqlxError> {
let bucket = self
.sort_computation_into_bucket(
result,
Expand Down Expand Up @@ -270,7 +270,7 @@ impl Database {
scalar_byte: &FixedBytes<1>,
log: &LogTfhe,
bucket: &Handle,
) -> Result<(), SqlxError> {
) -> Result<bool, SqlxError> {
let is_scalar = !scalar_byte.is_zero();
let output_handle = result.to_vec();
let query = sqlx::query!(
Expand All @@ -297,7 +297,10 @@ impl Database {
log.transaction_hash.map(|txh| txh.to_vec()),
log.is_allowed,
);
query.execute(tx.deref_mut()).await.map(|_| ())
query
.execute(tx.deref_mut())
.await
.map(|result| result.rows_affected() > 0)
}

async fn sort_computation_into_bucket(
Expand Down Expand Up @@ -353,7 +356,7 @@ impl Database {
&self,
tx: &mut Transaction<'_>,
log: &LogTfhe,
) -> Result<(), SqlxError> {
) -> Result<bool, SqlxError> {
use TfheContract as C;
use TfheContractEvents as E;
const HAS_SCALAR : FixedBytes::<1> = FixedBytes([1]); // if any dependency is a scalar.
Expand Down Expand Up @@ -437,7 +440,7 @@ impl Database {
| E::Initialized(_)
| E::Upgraded(_)
| E::VerifyInput(_)
=> Ok(()),
=> Ok(false),
}
}

Expand Down Expand Up @@ -482,7 +485,7 @@ impl Database {
event: &Log<AclContractEvents>,
transaction_hash: &Option<Handle>,
block_number: &Option<u64>,
) -> Result<(), SqlxError> {
) -> Result<bool, SqlxError> {
let data = &event.data;

let transaction_hash = transaction_hash.map(|h| h.to_vec());
Expand All @@ -498,26 +501,28 @@ impl Database {
self.record_transaction_begin(&transaction_hash, block_number)
.await;
}

let mut inserted = false;
match data {
AclContractEvents::Allowed(allowed) => {
let handle = allowed.handle.to_vec();

self.insert_allowed_handle(
tx,
handle.clone(),
allowed.account.to_string(),
AllowEvents::AllowedAccount,
transaction_hash.clone(),
)
.await?;

self.insert_pbs_computations(
tx,
&vec![handle],
transaction_hash,
)
.await?;
inserted |= self
.insert_allowed_handle(
tx,
handle.clone(),
allowed.account.to_string(),
AllowEvents::AllowedAccount,
transaction_hash.clone(),
)
.await?;

inserted |= self
.insert_pbs_computations(
tx,
&vec![handle],
transaction_hash,
)
.await?;
}
AclContractEvents::AllowedForDecryption(allowed_for_decryption) => {
let handles = allowed_for_decryption
Expand All @@ -532,22 +537,24 @@ impl Database {
"Allowed for public decryption"
);

self.insert_allowed_handle(
inserted |= self
.insert_allowed_handle(
tx,
handle,
"".to_string(),
AllowEvents::AllowedForDecryption,
transaction_hash.clone(),
)
.await?;
}

inserted |= self
.insert_pbs_computations(
tx,
handle,
"".to_string(),
AllowEvents::AllowedForDecryption,
&handles,
transaction_hash.clone(),
)
.await?;
}

self.insert_pbs_computations(
tx,
&handles,
transaction_hash.clone(),
)
.await?;
}
AclContractEvents::Initialized(initialized) => {
warn!(event = ?initialized, "unhandled Acl::Initialized event");
Expand Down Expand Up @@ -612,7 +619,7 @@ impl Database {
}
}
self.tick.update();
Ok(())
Ok(inserted)
}

/// Adds handles to the pbs_computations table and alerts the SnS worker
Expand All @@ -622,8 +629,9 @@ impl Database {
tx: &mut Transaction<'_>,
handles: &Vec<Vec<u8>>,
transaction_id: Option<Vec<u8>>,
) -> Result<(), SqlxError> {
) -> Result<bool, SqlxError> {
let tenant_id = self.tenant_id;
let mut inserted = false;
for handle in handles {
let query = sqlx::query!(
"INSERT INTO pbs_computations(tenant_id, handle, transaction_id) VALUES($1, $2, $3)
Expand All @@ -632,9 +640,10 @@ impl Database {
handle,
transaction_id
);
query.execute(tx.deref_mut()).await?;
inserted |=
query.execute(tx.deref_mut()).await?.rows_affected() > 0;
}
Ok(())
Ok(inserted)
}

/// Add the handle to the allowed_handles table
Expand All @@ -645,7 +654,7 @@ impl Database {
account_address: String,
event_type: AllowEvents,
transaction_id: Option<Vec<u8>>,
) -> Result<(), SqlxError> {
) -> Result<bool, SqlxError> {
let tenant_id = self.tenant_id;
let query = sqlx::query!(
"INSERT INTO allowed_handles(tenant_id, handle, account_address, event_type, transaction_id) VALUES($1, $2, $3, $4, $5)
Expand All @@ -656,8 +665,8 @@ impl Database {
event_type as i16,
transaction_id
);
query.execute(tx.deref_mut()).await?;
Ok(())
let inserted = query.execute(tx.deref_mut()).await?.rows_affected() > 0;
Ok(inserted)
}

async fn record_transaction_begin(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ async fn insert_tfhe_event(
tx: &mut Transaction<'_>,
log: alloy::rpc::types::Log<TfheContractEvents>,
is_allowed: bool,
) -> Result<(), sqlx::Error> {
) -> Result<bool, sqlx::Error> {
let event = LogTfhe {
event: log.inner,
transaction_hash: log.transaction_hash,
Expand All @@ -50,7 +50,7 @@ pub async fn allow_handle(
db: &ListenerDatabase,
tx: &mut Transaction<'_>,
handle: &[u8],
) -> Result<(), sqlx::Error> {
) -> Result<bool, sqlx::Error> {
let account_address = String::new();
let event_type = AllowEvents::AllowedForDecryption;
db.insert_allowed_handle(tx, handle.to_owned(), account_address, event_type, None)
Expand Down
Loading