Skip to content

Commit

Permalink
fix(core): Rollback stream_impl
Browse files Browse the repository at this point in the history
  • Loading branch information
pedronauck committed Mar 11, 2025
1 parent 4171d99 commit 3d01b42
Showing 1 changed file with 53 additions and 52 deletions.
105 changes: 53 additions & 52 deletions crates/core/src/stream/stream_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@ use futures::{
};
use tokio::{sync::OnceCell, task::spawn_blocking, time::sleep};

use super::{
config::{STREAM_THROTTLE_HISTORICAL, STREAM_THROTTLE_LIVE},
StreamError,
};
use super::{config, StreamError};
use crate::{server::DeliverPolicy, types::StreamResponse};

pub type BoxedStoreItem = Result<StreamResponse, StreamError>;
Expand Down Expand Up @@ -90,7 +87,7 @@ impl<R: Record> Stream<R> {
pub async fn publish(
&self,
subject: &str,
response: &Arc<StreamResponse>,
response: &StreamResponse,
) -> Result<(), StreamError> {
let broker = self.broker.clone();
let response = response.clone();
Expand All @@ -117,33 +114,41 @@ impl<R: Record> Stream<R> {
api_key_role: &ApiKeyRole,
) -> BoxStream<'static, Result<StreamResponse, StreamError>> {
let broker = self.broker.clone();
let subject_ref = subject.clone();
let subject = subject.clone();
let stream = self.clone();
let role = api_key_role.clone();
let has_historical =
role.has_scopes(&[ApiKeyRoleScope::HistoricalData]).is_ok();
let has_live = role.has_scopes(&[ApiKeyRoleScope::LiveData]).is_ok();
let throttle_historical = *STREAM_THROTTLE_HISTORICAL as u64;
let throttle_live = *STREAM_THROTTLE_LIVE as u64;
let stream = async_stream::try_stream! {
if has_historical {
if let DeliverPolicy::FromBlock { block_height } = deliver_policy {
let mut historical = stream.historical_streaming(subject_ref.to_owned(), Some(block_height), &role);
while let Some(result) = historical.next().await {
yield result?;
sleep(Duration::from_millis(throttle_historical)).await;
match role.has_scopes(&[ApiKeyRoleScope::HistoricalData]) {
Ok(_) => {
if let DeliverPolicy::FromBlock { block_height } = deliver_policy {
let mut historical = stream.historical_streaming(subject.to_owned(), Some(block_height), &role);
while let Some(result) = historical.next().await {
yield result?;
let throttle_time = *config::STREAM_THROTTLE_HISTORICAL;
sleep(Duration::from_millis(throttle_time as u64)).await;
}
}
}
Err(e) => {
tracing::error!("Error subscribing to stream: {}", e);
Err(StreamError::from(e))?;
}
}

if has_live {
let mut live = broker.subscribe(&subject_ref.parse()).await?;
while let Some(msg) = live.next().await {
let msg = msg?;
let stream_response = spawn_blocking(move || StreamResponse::decode_json(&msg))
.await?;
yield stream_response?;
sleep(Duration::from_millis(throttle_live)).await;
match role.has_scopes(&[ApiKeyRoleScope::LiveData]) {
Ok(_) => {
let mut live = broker.subscribe(&subject.parse()).await?;
while let Some(msg) = live.next().await {
let msg = msg?;
let stream_response = spawn_blocking(move || StreamResponse::decode_json(&msg)).await??;
yield stream_response;
let throttle_time = *config::STREAM_THROTTLE_LIVE;
sleep(Duration::from_millis(throttle_time as u64)).await;
}
}
Err(e) => {
tracing::error!("Error subscribing to stream: {}", e);
Err(StreamError::from(e))?;
}
}
};
Expand All @@ -156,8 +161,8 @@ impl<R: Record> Stream<R> {
from_block: Option<BlockHeight>,
role: &ApiKeyRole,
) -> BoxStream<'static, Result<StreamResponse, StreamError>> {
let store = self.store();
let db = store.db.clone();
let store = self.store().clone();
let db = self.store().db.clone();
let role = role.clone();
let opts = if cfg!(any(test, feature = "test-helpers")) {
QueryOptions::default()
Expand All @@ -167,39 +172,35 @@ impl<R: Record> Stream<R> {

let stream = async_stream::try_stream! {
let mut current_height = from_block.unwrap_or_default();
let mut last_height = find_last_block_height(&db, opts.clone()).await?;
if let Err(e) = role.validate_historical_limit(last_height, current_height) {
tracing::error!("Historical limit validation failed: {}", e);
Err(StreamError::from(e))?;
}

let mut opts = opts.with_from_block(Some(current_height));
let mut last_height = find_last_block_height(&db, opts.clone()).await?;
while current_height <= last_height {
let items = store.find_many_by_subject(&subject, opts.clone()).await?;
if items.is_empty() {
for item in items {
let subject = item.subject_str();
let subject_id = item.subject_id();
let block_height = item.block_height();
role.validate_historical_limit(last_height, block_height)?;
let value = item.encoded_value().to_vec();
let pointer = item.into();
let response = StreamResponse::new(subject, subject_id, &value, pointer.to_owned(), None)?;
yield response;
current_height = pointer.block_height;
}
opts.increment_offset();
// When we reach the last known height, we need to check if any new blocks
// were produced while we were processing the previous ones
if current_height == last_height {
let new_last_height = find_last_block_height(&db, opts.clone()).await?;
if new_last_height > last_height {
// Reset current_height back to process the blocks we haven't seen yet
current_height = last_height;
last_height = new_last_height;
continue;
} else {
tracing::debug!("No new blocks found, stopping historical streaming on block {}", current_height);
break
}
tracing::debug!("No new blocks found, stopping historical streaming on block {}", current_height);
break;
}

for item in items {
let block_height = item.block_height();
let record_pointer = item.to_owned().into();
let response = StreamResponse::new(
item.subject_str(),
item.subject_id(),
item.encoded_value(),
record_pointer,
None,
)?;
yield response;
current_height = block_height;
}
opts = opts.with_from_block(Some(current_height));
}
};
Box::pin(stream)
Expand Down

0 comments on commit 3d01b42

Please sign in to comment.