Skip to content

Commit

Permalink
fix(sv-publisher): Add find_next_block_to_save query
Browse files Browse the repository at this point in the history
  • Loading branch information
pedronauck committed Mar 5, 2025
1 parent 5451af3 commit d13a5ac
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 18 deletions.
33 changes: 33 additions & 0 deletions crates/store/src/store/store_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,39 @@ pub async fn find_last_block_height(
Ok(record.map(|(height,)| height.into()).unwrap_or_default())
}

pub async fn find_next_block_to_save(
db: &Db,
options: QueryOptions,
) -> StoreResult<BlockHeight> {
let select = r#"
SELECT block_height + 1 as next_height
FROM blocks b1
WHERE NOT EXISTS (
SELECT 1
FROM blocks b2
WHERE b2.block_height = b1.block_height + 1
)
"#
.to_string();

let mut query_builder = sqlx::QueryBuilder::new(select);

if let Some(ns) = options.namespace {
query_builder
.push(" AND subject LIKE ")
.push_bind(format!("{}%", ns));
}

query_builder.push(" ORDER BY block_height LIMIT 1");
let query = query_builder.build_query_as::<(i64,)>();

let record: Option<(i64,)> = query
.fetch_optional(&db.pool)
.await
.map_err(StoreError::from)?;
Ok(record.map(|(height,)| height.into()).unwrap_or(1.into()))
}

pub async fn update_block_propagation_ms(
tx: &mut DbTransaction,
block_height: BlockHeight,
Expand Down
37 changes: 19 additions & 18 deletions services/publisher/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use fuel_streams_core::types::*;
use fuel_streams_store::{
db::{Db, DbConnectionOpts},
record::QueryOptions,
store::find_last_block_height,
store::find_next_block_to_save,
};
use fuel_web_utils::{
server::api::build_and_spawn_web_server,
Expand Down Expand Up @@ -42,11 +42,12 @@ async fn main() -> anyhow::Result<()> {
build_and_spawn_web_server(cli.telemetry_port, server_state).await?;

let last_block_height = Arc::new(fuel_core.get_latest_block_height()?);
let last_published = Arc::new(find_last_published_height(&db).await?);
let next_block_to_process =
Arc::new(find_next_block_to_process(&db).await?);
let shutdown = Arc::new(ShutdownController::new());
shutdown.clone().spawn_signal_handler();

tracing::info!("Last published height: {}", last_published);
tracing::info!("Next block to process: {}", next_block_to_process);
tracing::info!("Last block height: {}", last_block_height);
tokio::select! {
result = async {
Expand All @@ -55,7 +56,7 @@ async fn main() -> anyhow::Result<()> {
&message_broker,
&fuel_core,
&last_block_height,
&last_published,
&next_block_to_process,
shutdown.token().clone(),
&telemetry,
).await.map_err(|e| PublishError::Historical(e.to_string()))?;
Expand Down Expand Up @@ -92,38 +93,39 @@ async fn setup_db(db_url: &str) -> Result<Arc<Db>, PublishError> {
Ok(db)
}

async fn find_last_published_height(
async fn find_next_block_to_process(
db: &Db,
) -> Result<BlockHeight, PublishError> {
let opts = QueryOptions::default();
let height = find_last_block_height(db, opts).await?;
let height = find_next_block_to_save(db, opts).await?;
Ok(height)
}

fn get_historical_block_range(
from_height: BlockHeight,
last_published_height: BlockHeight,
next_block_to_process: BlockHeight,
last_block_height: BlockHeight,
) -> Option<Vec<u64>> {
let last_published_height = if last_published_height > from_height {
last_published_height
let start_height = if next_block_to_process > from_height {
next_block_to_process
} else {
from_height
};

let last_block_height = if last_block_height > from_height {
let end_height = if last_block_height > start_height {
*last_block_height
} else {
tracing::error!("Last block height is less than from height");
*last_block_height
tracing::error!("Last block height is less than start height");
return None;
};

let start_height = *last_published_height + 1;
let end_height = last_block_height;
let end_height = end_height.into();
let start_height = start_height.into();
if start_height > end_height {
tracing::info!("No historical blocks to process");
return None;
}

let block_count = end_height - start_height + 1;
let heights: Vec<u64> = (start_height..=end_height).collect();
tracing::info!(
Expand All @@ -137,20 +139,20 @@ fn process_historical_blocks(
message_broker: &Arc<NatsMessageBroker>,
fuel_core: &Arc<dyn FuelCoreLike>,
last_block_height: &Arc<BlockHeight>,
last_published_height: &Arc<BlockHeight>,
next_block_to_process: &Arc<BlockHeight>,
token: CancellationToken,
telemetry: &Arc<Telemetry<Metrics>>,
) -> tokio::task::JoinHandle<Result<(), PublishError>> {
let message_broker = message_broker.clone();
let fuel_core = fuel_core.clone();
tokio::spawn({
let last_published_height = *last_published_height.clone();
let next_block_to_process = *next_block_to_process.clone();
let last_block_height = *last_block_height.clone();
let telemetry = telemetry.clone();
async move {
let Some(heights) = get_historical_block_range(
from_height,
last_published_height,
next_block_to_process,
last_block_height,
) else {
return Ok(());
Expand Down Expand Up @@ -215,7 +217,6 @@ async fn process_live_blocks(
telemetry: &Arc<Telemetry<Metrics>>,
) -> Result<(), PublishError> {
let mut subscription = fuel_core.blocks_subscription();

loop {
tokio::select! {
_ = token.cancelled() => {
Expand Down

0 comments on commit d13a5ac

Please sign in to comment.