Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(gas_price_service_v1): strictly ensure last_recorded_height is set, to avoid initial poll of da source #2523

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
cbc6dee
chore(gas_price_service_v1): strictly ensure last_recorded_height is …
rymnc Dec 30, 2024
dba77da
fix
rymnc Dec 30, 2024
3b60422
chore: add test for gas price service
rymnc Dec 30, 2024
b151207
fix: remove unnecessary storage set, add test for uninit task instead
rymnc Dec 31, 2024
0a86e08
Merge branch 'chore/add-tests-for-v1-gas-service' into chore/gas-pric…
rymnc Jan 7, 2025
62d81ed
chore: force a recorded height to be passed into da source service up…
rymnc Jan 7, 2025
73cf35b
fix: recorded height test in da source service
rymnc Jan 8, 2025
4ca7a94
fix: idiomatic set
rymnc Jan 8, 2025
159eb84
fix: remove unnecessary test now that it is enforced
rymnc Jan 8, 2025
b11e03f
fix: integ test
rymnc Jan 8, 2025
8a544b9
fix: remove dbg log
rymnc Jan 8, 2025
53a96cf
fix: disambiguate the l2 block range start and recorded height start
rymnc Jan 8, 2025
64385ad
Merge branch 'chore/add-tests-for-v1-gas-service' into chore/gas-pric…
rymnc Jan 9, 2025
dce9367
Merge branch 'chore/add-tests-for-v1-gas-service' into chore/gas-pric…
rymnc Jan 9, 2025
687a23b
chore: remove unused trait bound
rymnc Jan 10, 2025
2df50bf
fix: unused first_run
rymnc Jan 10, 2025
2fe4efe
fix: comments
rymnc Jan 10, 2025
f9bfc8a
Merge branch 'chore/add-tests-for-v1-gas-service' into chore/gas-pric…
rymnc Jan 10, 2025
8a0764e
Merge branch 'chore/add-tests-for-v1-gas-service' into chore/gas-pric…
MitchTurner Jan 14, 2025
efc3676
Fix bad merge
MitchTurner Jan 14, 2025
b12c9e9
Merge branch 'chore/add-tests-for-v1-gas-service' into chore/gas-pric…
MitchTurner Jan 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 17 additions & 10 deletions crates/services/gas_price_service/src/v1/da_source_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,12 @@ mod tests {
let da_block_costs_source =
DummyDaBlockCosts::new(Ok(expected_da_cost.clone()), notifier.clone());
let latest_l2_height = Arc::new(Mutex::new(BlockHeight::new(10u32)));
let recorded_height = BlockHeight::new(0);
let service = new_da_service(
da_block_costs_source,
Some(Duration::from_millis(1)),
latest_l2_height,
recorded_height,
);
let mut shared_state = &mut service.shared.subscribe();

Expand All @@ -84,10 +86,12 @@ mod tests {
let da_block_costs_source =
DummyDaBlockCosts::new(Err(anyhow::anyhow!("boo!")), notifier.clone());
let latest_l2_height = latest_l2_height(0);
let recorded_height = BlockHeight::new(0);
let service = new_da_service(
da_block_costs_source,
Some(Duration::from_millis(1)),
latest_l2_height,
recorded_height,
);
let mut shared_state = &mut service.shared.subscribe();

Expand Down Expand Up @@ -117,10 +121,12 @@ mod tests {
let da_block_costs_source =
DummyDaBlockCosts::new(Ok(unexpected_costs.clone()), notifier.clone());
let latest_l2_height = latest_l2_height(l2_height);
let recorded_height = BlockHeight::new(0);
let service = new_da_service(
da_block_costs_source,
Some(Duration::from_millis(1)),
latest_l2_height,
recorded_height,
);
let mut shared_state = &mut service.shared.subscribe();

Expand Down Expand Up @@ -148,11 +154,12 @@ mod tests {
let da_block_costs_source =
DummyDaBlockCosts::new(Ok(unexpected_costs.clone()), notifier.clone());
let latest_l2_height = latest_l2_height(l2_height);
let recorded_height = BlockHeight::new(0);
let mut service = DaSourceService::new(
da_block_costs_source,
Some(Duration::from_millis(1)),
latest_l2_height,
None,
recorded_height,
);
let mut watcher = StateWatcher::started();

Expand All @@ -161,31 +168,32 @@ mod tests {

// then
let recorded_height = service.recorded_height();
assert!(recorded_height.is_none())
assert_eq!(*recorded_height, 0);
}

#[tokio::test]
async fn run__recorded_height_updated_by_da_costs() {
// given
let l2_height = 10;
let recorded_height = 9;
let unexpected_costs = DaBlockCosts {
let l2_block_range_start = 2;
let expected_recorded_height = 9;
let costs = DaBlockCosts {
bundle_id: 1,
l2_blocks: 2..=recorded_height,
l2_blocks: l2_block_range_start..=expected_recorded_height,
bundle_size_bytes: 1024 * 128,
blob_cost_wei: 2,
};
let notifier = Arc::new(tokio::sync::Notify::new());
let da_block_costs_source =
DummyDaBlockCosts::new(Ok(unexpected_costs.clone()), notifier.clone());
DummyDaBlockCosts::new(Ok(costs.clone()), notifier.clone());
let latest_l2_height = latest_l2_height(l2_height);
let (sender, mut receiver) =
tokio::sync::broadcast::channel(DA_BLOCK_COSTS_CHANNEL_SIZE);
let mut service = DaSourceService::new_with_sender(
da_block_costs_source,
Some(Duration::from_millis(1)),
latest_l2_height,
None,
(l2_block_range_start - 1).into(), /* we want to start polling from before the l2 block range */
sender,
);
let mut watcher = StateWatcher::started();
Expand All @@ -194,8 +202,7 @@ mod tests {
let next = service.run(&mut watcher).await;

// then
let actual = service.recorded_height().unwrap();
let expected = BlockHeight::from(recorded_height);
assert_eq!(expected, actual);
let actual = service.recorded_height();
assert_eq!(expected_recorded_height, *actual);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ use std::ops::Deref;

#[async_trait::async_trait]
pub trait BlockCommitterApi: Send + Sync {
/// Used on first run to get the latest costs and seqno
async fn get_latest_costs(&self) -> DaBlockCostsResult<Option<RawDaBlockCosts>>;
/// Used to get the costs for a specific seqno
async fn get_costs_by_l2_block_number(
&self,
Expand Down Expand Up @@ -90,17 +88,17 @@ where
{
async fn request_da_block_costs(
&mut self,
last_recorded_height: &Option<BlockHeight>,
last_recorded_height: &BlockHeight,
) -> DaBlockCostsResult<Vec<DaBlockCosts>> {
let raw_da_block_costs: Vec<_> = match last_recorded_height.and_then(|x| x.succ())
{
Some(ref next_height) => {
self.client
.get_costs_by_l2_block_number(*next_height.deref())
.await?
}
None => self.client.get_latest_costs().await?.into_iter().collect(),
};
let next_height = last_recorded_height.succ().ok_or(anyhow!(
"Failed to increment the last recorded height: {:?}",
last_recorded_height
))?;

let raw_da_block_costs: Vec<_> = self
.client
.get_costs_by_l2_block_number(*next_height.deref())
.await?;

let da_block_costs: Vec<_> =
raw_da_block_costs.iter().map(DaBlockCosts::from).collect();
Expand Down Expand Up @@ -141,19 +139,6 @@ impl BlockCommitterApi for BlockCommitterHttpApi {
Ok(vec![])
}
}

async fn get_latest_costs(&self) -> DaBlockCostsResult<Option<RawDaBlockCosts>> {
// Latest: http://localhost:8080/v1/costs?variant=latest&limit=5
if let Some(url) = &self.url {
let formatted_url = format!("{url}/v1/costs?variant=latest&limit=1");
let response = self.client.get(formatted_url).send().await?;
let raw_da_block_costs = response.json::<Vec<RawDaBlockCosts>>().await?;
// only take the first element, since we are only looking for the most recent
Ok(raw_da_block_costs.first().cloned())
} else {
Ok(None)
}
}
}

#[cfg(test)]
Expand Down Expand Up @@ -269,56 +254,6 @@ mod test_block_committer_http_api {
// then
assert_eq!(actual, expected);
}

#[test]
fn get_latest_costs__when_url_is_none__then_returns_none() {
let rt = tokio::runtime::Runtime::new().unwrap();

// given
let block_committer = BlockCommitterHttpApi::new(None);

// when
let actual =
rt.block_on(async { block_committer.get_latest_costs().await.unwrap() });

// then
assert_eq!(actual, None);
}

#[test]
fn get_latest_costs__when_url_is_some__then_returns_expected_costs() {
let rt = tokio::runtime::Runtime::new().unwrap();
let mut mock = FakeServer::new();
let url = mock.url();

// given
let block_committer = BlockCommitterHttpApi::new(Some(url));
let not_expected = RawDaBlockCosts {
id: 1,
start_height: 1,
end_height: 10,
da_block_height: 1u64.into(),
cost: 1,
size: 1,
};
mock.add_response(not_expected);
let expected = RawDaBlockCosts {
id: 2,
start_height: 11,
end_height: 20,
da_block_height: 2u64.into(),
cost: 2,
size: 2,
};
mock.add_response(expected.clone());

// when
let actual =
rt.block_on(async { block_committer.get_latest_costs().await.unwrap() });

// then
assert_eq!(actual, Some(expected));
}
}
#[cfg(any(test, feature = "test-helpers"))]
pub mod fake_server {
Expand Down Expand Up @@ -450,9 +385,6 @@ mod tests {

#[async_trait::async_trait]
impl BlockCommitterApi for MockBlockCommitterApi {
async fn get_latest_costs(&self) -> DaBlockCostsResult<Option<RawDaBlockCosts>> {
Ok(self.value.clone())
}
async fn get_costs_by_l2_block_number(
&self,
l2_block_number: u32,
Expand Down Expand Up @@ -482,22 +414,6 @@ mod tests {
}
}

#[tokio::test]
async fn request_da_block_cost__when_last_value_is_none__then_get_latest_costs_is_called(
) {
// given
let da_block_costs = test_da_block_costs();
let expected = vec![(&da_block_costs).into()];
let mock_api = MockBlockCommitterApi::new(Some(da_block_costs));
let mut block_committer = BlockCommitterDaBlockCosts::new(mock_api);

// when
let actual = block_committer.request_da_block_costs(&None).await.unwrap();

// then
assert_eq!(actual, expected);
}

#[tokio::test]
async fn request_da_block_cost__when_last_value_is_some__then_get_costs_by_l2_block_number_is_called(
) {
Expand All @@ -510,7 +426,7 @@ mod tests {

// when
let actual = block_committer
.request_da_block_costs(&Some(latest_height))
.request_da_block_costs(&latest_height)
.await
.unwrap();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ impl DummyDaBlockCosts {
impl DaBlockCostsSource for DummyDaBlockCosts {
async fn request_da_block_costs(
&mut self,
_latest_recorded_height: &Option<BlockHeight>,
_latest_recorded_height: &BlockHeight,
) -> DaBlockCostsResult<Vec<DaBlockCosts>> {
match &self.value {
Ok(da_block_costs) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,15 @@ pub struct DaSourceService<Source> {
poll_interval: Interval,
source: Source,
shared_state: SharedState,
// This is the latest L2 height that is shared between this service
// and the block importer
// This is done for filtering out da block costs which reference
// a height greater than the latest L2 height
// This is a situation that occurs during syncing of the node
latest_l2_height: Arc<Mutex<BlockHeight>>,
recorded_height: Option<BlockHeight>,
// This is the last recorded height of the da block costs
// This is used to fetch the da block costs from the source
recorded_height: BlockHeight,
}

pub(crate) const DA_BLOCK_COSTS_CHANNEL_SIZE: usize = 16 * 1024;
Expand All @@ -58,7 +65,7 @@ where
source: Source,
poll_interval: Option<Duration>,
latest_l2_height: Arc<Mutex<BlockHeight>>,
recorded_height: Option<BlockHeight>,
recorded_height: BlockHeight,
) -> Self {
let (sender, _) = tokio::sync::broadcast::channel(DA_BLOCK_COSTS_CHANNEL_SIZE);
#[allow(clippy::arithmetic_side_effects)]
Expand All @@ -78,7 +85,7 @@ where
source: Source,
poll_interval: Option<Duration>,
latest_l2_height: Arc<Mutex<BlockHeight>>,
recorded_height: Option<BlockHeight>,
recorded_height: BlockHeight,
sender: Sender<DaBlockCosts>,
) -> Self {
Self {
Expand Down Expand Up @@ -109,12 +116,8 @@ where
tracing::debug!("Sending block costs: {:?}", da_block_costs);
let end = BlockHeight::from(*da_block_costs.l2_blocks.end());
self.shared_state.0.send(da_block_costs)?;
if let Some(recorded_height) = self.recorded_height {
if end > recorded_height {
self.recorded_height = Some(end)
}
} else {
self.recorded_height = Some(end)
if end > self.recorded_height {
self.recorded_height = end
}
}
Ok(())
Expand All @@ -136,7 +139,7 @@ where
}

#[cfg(test)]
pub fn recorded_height(&self) -> Option<BlockHeight> {
pub fn recorded_height(&self) -> BlockHeight {
self.recorded_height
}
}
Expand All @@ -147,7 +150,7 @@ where
pub trait DaBlockCostsSource: Send + Sync {
async fn request_da_block_costs(
&mut self,
recorded_height: &Option<BlockHeight>,
recorded_height: &BlockHeight,
) -> Result<Vec<DaBlockCosts>>;
}

Expand Down Expand Up @@ -211,11 +214,12 @@ pub fn new_da_service<S: DaBlockCostsSource>(
da_source: S,
poll_interval: Option<Duration>,
latest_l2_height: Arc<Mutex<BlockHeight>>,
recorded_height: BlockHeight,
) -> ServiceRunner<DaSourceService<S>> {
ServiceRunner::new(DaSourceService::new(
da_source,
poll_interval,
latest_l2_height,
None,
recorded_height,
))
}
Loading
Loading