Skip to content

Commit

Permalink
chore(gas_price_service_v1): strictly ensure last_recorded_height is …
Browse files Browse the repository at this point in the history
…set, to avoid initial poll of da source (#2523)

> [!WARNING]
> Needs to be merged into master after #2469 is merged

## Linked Issues/PRs
<!-- List of related issues/PRs -->
- #2469 

## Description
<!-- List of detailed changes -->
When starting a node, we request the da source service for the latest
costs. This results in large negative profit / loss influencing the da
gas price. We want to avoid this behaviour, and therefore, we ensure
that the last_recorded_height is set in the block committer.

## Checklist
- [x] Breaking changes are clearly marked as such in the PR description
and changelog
- [x] New behavior is reflected in tests
- [x] [The specification](https://github.com/FuelLabs/fuel-specs/)
matches the implemented behavior (link update PR if changes are needed)

### Before requesting review
- [x] I have reviewed the code myself
- [x] I have created follow-up issues caused by this PR and linked them
here

### After merging, notify other teams

[Add or remove entries as needed]

- [ ] [Rust SDK](https://github.com/FuelLabs/fuels-rs/)
- [ ] [Sway compiler](https://github.com/FuelLabs/sway/)
- [ ] [Platform
documentation](https://github.com/FuelLabs/devrel-requests/issues/new?assignees=&labels=new+request&projects=&template=NEW-REQUEST.yml&title=%5BRequest%5D%3A+)
(for out-of-organization contributors, the person merging the PR will do
this)
- [ ] Someone else?

---------

Co-authored-by: Rafał Chabowski <[email protected]>
Co-authored-by: Mitchell Turner <[email protected]>
  • Loading branch information
3 people authored Jan 14, 2025
1 parent 40b28d7 commit 4ac8593
Show file tree
Hide file tree
Showing 8 changed files with 108 additions and 154 deletions.
27 changes: 17 additions & 10 deletions crates/services/gas_price_service/src/v1/da_source_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,12 @@ mod tests {
let da_block_costs_source =
DummyDaBlockCosts::new(Ok(expected_da_cost.clone()), notifier.clone());
let latest_l2_height = Arc::new(Mutex::new(BlockHeight::new(10u32)));
let recorded_height = BlockHeight::new(0);
let service = new_da_service(
da_block_costs_source,
Some(Duration::from_millis(1)),
latest_l2_height,
recorded_height,
);
let mut shared_state = &mut service.shared.subscribe();

Expand All @@ -84,10 +86,12 @@ mod tests {
let da_block_costs_source =
DummyDaBlockCosts::new(Err(anyhow::anyhow!("boo!")), notifier.clone());
let latest_l2_height = latest_l2_height(0);
let recorded_height = BlockHeight::new(0);
let service = new_da_service(
da_block_costs_source,
Some(Duration::from_millis(1)),
latest_l2_height,
recorded_height,
);
let mut shared_state = &mut service.shared.subscribe();

Expand Down Expand Up @@ -117,10 +121,12 @@ mod tests {
let da_block_costs_source =
DummyDaBlockCosts::new(Ok(unexpected_costs.clone()), notifier.clone());
let latest_l2_height = latest_l2_height(l2_height);
let recorded_height = BlockHeight::new(0);
let service = new_da_service(
da_block_costs_source,
Some(Duration::from_millis(1)),
latest_l2_height,
recorded_height,
);
let mut shared_state = &mut service.shared.subscribe();

Expand Down Expand Up @@ -148,11 +154,12 @@ mod tests {
let da_block_costs_source =
DummyDaBlockCosts::new(Ok(unexpected_costs.clone()), notifier.clone());
let latest_l2_height = latest_l2_height(l2_height);
let recorded_height = BlockHeight::new(0);
let mut service = DaSourceService::new(
da_block_costs_source,
Some(Duration::from_millis(1)),
latest_l2_height,
None,
recorded_height,
);
let mut watcher = StateWatcher::started();

Expand All @@ -161,31 +168,32 @@ mod tests {

// then
let recorded_height = service.recorded_height();
assert!(recorded_height.is_none())
assert_eq!(*recorded_height, 0);
}

#[tokio::test]
async fn run__recorded_height_updated_by_da_costs() {
// given
let l2_height = 10;
let recorded_height = 9;
let unexpected_costs = DaBlockCosts {
let l2_block_range_start = 2;
let expected_recorded_height = 9;
let costs = DaBlockCosts {
bundle_id: 1,
l2_blocks: 2..=recorded_height,
l2_blocks: l2_block_range_start..=expected_recorded_height,
bundle_size_bytes: 1024 * 128,
blob_cost_wei: 2,
};
let notifier = Arc::new(tokio::sync::Notify::new());
let da_block_costs_source =
DummyDaBlockCosts::new(Ok(unexpected_costs.clone()), notifier.clone());
DummyDaBlockCosts::new(Ok(costs.clone()), notifier.clone());
let latest_l2_height = latest_l2_height(l2_height);
let (sender, mut receiver) =
tokio::sync::broadcast::channel(DA_BLOCK_COSTS_CHANNEL_SIZE);
let mut service = DaSourceService::new_with_sender(
da_block_costs_source,
Some(Duration::from_millis(1)),
latest_l2_height,
None,
(l2_block_range_start - 1).into(), /* we want to start polling from before the l2 block range */
sender,
);
let mut watcher = StateWatcher::started();
Expand All @@ -194,8 +202,7 @@ mod tests {
let next = service.run(&mut watcher).await;

// then
let actual = service.recorded_height().unwrap();
let expected = BlockHeight::from(recorded_height);
assert_eq!(expected, actual);
let actual = service.recorded_height();
assert_eq!(expected_recorded_height, *actual);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ use std::ops::Deref;

#[async_trait::async_trait]
pub trait BlockCommitterApi: Send + Sync {
/// Used on first run to get the latest costs and seqno
async fn get_latest_costs(&self) -> DaBlockCostsResult<Option<RawDaBlockCosts>>;
/// Used to get the costs for a specific seqno
async fn get_costs_by_l2_block_number(
&self,
Expand Down Expand Up @@ -90,17 +88,17 @@ where
{
async fn request_da_block_costs(
&mut self,
last_recorded_height: &Option<BlockHeight>,
last_recorded_height: &BlockHeight,
) -> DaBlockCostsResult<Vec<DaBlockCosts>> {
let raw_da_block_costs: Vec<_> = match last_recorded_height.and_then(|x| x.succ())
{
Some(ref next_height) => {
self.client
.get_costs_by_l2_block_number(*next_height.deref())
.await?
}
None => self.client.get_latest_costs().await?.into_iter().collect(),
};
let next_height = last_recorded_height.succ().ok_or(anyhow!(
"Failed to increment the last recorded height: {:?}",
last_recorded_height
))?;

let raw_da_block_costs: Vec<_> = self
.client
.get_costs_by_l2_block_number(*next_height.deref())
.await?;

let da_block_costs: Vec<_> =
raw_da_block_costs.iter().map(DaBlockCosts::from).collect();
Expand Down Expand Up @@ -141,19 +139,6 @@ impl BlockCommitterApi for BlockCommitterHttpApi {
Ok(vec![])
}
}

async fn get_latest_costs(&self) -> DaBlockCostsResult<Option<RawDaBlockCosts>> {
// Latest: http://localhost:8080/v1/costs?variant=latest&limit=5
if let Some(url) = &self.url {
let formatted_url = format!("{url}/v1/costs?variant=latest&limit=1");
let response = self.client.get(formatted_url).send().await?;
let raw_da_block_costs = response.json::<Vec<RawDaBlockCosts>>().await?;
// only take the first element, since we are only looking for the most recent
Ok(raw_da_block_costs.first().cloned())
} else {
Ok(None)
}
}
}

#[cfg(test)]
Expand Down Expand Up @@ -269,56 +254,6 @@ mod test_block_committer_http_api {
// then
assert_eq!(actual, expected);
}

#[test]
fn get_latest_costs__when_url_is_none__then_returns_none() {
let rt = tokio::runtime::Runtime::new().unwrap();

// given
let block_committer = BlockCommitterHttpApi::new(None);

// when
let actual =
rt.block_on(async { block_committer.get_latest_costs().await.unwrap() });

// then
assert_eq!(actual, None);
}

#[test]
fn get_latest_costs__when_url_is_some__then_returns_expected_costs() {
let rt = tokio::runtime::Runtime::new().unwrap();
let mut mock = FakeServer::new();
let url = mock.url();

// given
let block_committer = BlockCommitterHttpApi::new(Some(url));
let not_expected = RawDaBlockCosts {
id: 1,
start_height: 1,
end_height: 10,
da_block_height: 1u64.into(),
cost: 1,
size: 1,
};
mock.add_response(not_expected);
let expected = RawDaBlockCosts {
id: 2,
start_height: 11,
end_height: 20,
da_block_height: 2u64.into(),
cost: 2,
size: 2,
};
mock.add_response(expected.clone());

// when
let actual =
rt.block_on(async { block_committer.get_latest_costs().await.unwrap() });

// then
assert_eq!(actual, Some(expected));
}
}
#[cfg(any(test, feature = "test-helpers"))]
pub mod fake_server {
Expand Down Expand Up @@ -450,9 +385,6 @@ mod tests {

#[async_trait::async_trait]
impl BlockCommitterApi for MockBlockCommitterApi {
async fn get_latest_costs(&self) -> DaBlockCostsResult<Option<RawDaBlockCosts>> {
Ok(self.value.clone())
}
async fn get_costs_by_l2_block_number(
&self,
l2_block_number: u32,
Expand Down Expand Up @@ -482,22 +414,6 @@ mod tests {
}
}

#[tokio::test]
async fn request_da_block_cost__when_last_value_is_none__then_get_latest_costs_is_called(
) {
// given
let da_block_costs = test_da_block_costs();
let expected = vec![(&da_block_costs).into()];
let mock_api = MockBlockCommitterApi::new(Some(da_block_costs));
let mut block_committer = BlockCommitterDaBlockCosts::new(mock_api);

// when
let actual = block_committer.request_da_block_costs(&None).await.unwrap();

// then
assert_eq!(actual, expected);
}

#[tokio::test]
async fn request_da_block_cost__when_last_value_is_some__then_get_costs_by_l2_block_number_is_called(
) {
Expand All @@ -510,7 +426,7 @@ mod tests {

// when
let actual = block_committer
.request_da_block_costs(&Some(latest_height))
.request_da_block_costs(&latest_height)
.await
.unwrap();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ impl DummyDaBlockCosts {
impl DaBlockCostsSource for DummyDaBlockCosts {
async fn request_da_block_costs(
&mut self,
_latest_recorded_height: &Option<BlockHeight>,
_latest_recorded_height: &BlockHeight,
) -> DaBlockCostsResult<Vec<DaBlockCosts>> {
match &self.value {
Ok(da_block_costs) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,15 @@ pub struct DaSourceService<Source> {
poll_interval: Interval,
source: Source,
shared_state: SharedState,
// This is the latest L2 height that is shared between this service
// and the block importer
// This is done for filtering out da block costs which reference
// a height greater than the latest L2 height
// This is a situation that occurs during syncing of the node
latest_l2_height: Arc<Mutex<BlockHeight>>,
recorded_height: Option<BlockHeight>,
// This is the last recorded height of the da block costs
// This is used to fetch the da block costs from the source
recorded_height: BlockHeight,
}

pub(crate) const DA_BLOCK_COSTS_CHANNEL_SIZE: usize = 16 * 1024;
Expand All @@ -58,7 +65,7 @@ where
source: Source,
poll_interval: Option<Duration>,
latest_l2_height: Arc<Mutex<BlockHeight>>,
recorded_height: Option<BlockHeight>,
recorded_height: BlockHeight,
) -> Self {
let (sender, _) = tokio::sync::broadcast::channel(DA_BLOCK_COSTS_CHANNEL_SIZE);
#[allow(clippy::arithmetic_side_effects)]
Expand All @@ -78,7 +85,7 @@ where
source: Source,
poll_interval: Option<Duration>,
latest_l2_height: Arc<Mutex<BlockHeight>>,
recorded_height: Option<BlockHeight>,
recorded_height: BlockHeight,
sender: Sender<DaBlockCosts>,
) -> Self {
Self {
Expand Down Expand Up @@ -109,12 +116,8 @@ where
tracing::debug!("Sending block costs: {:?}", da_block_costs);
let end = BlockHeight::from(*da_block_costs.l2_blocks.end());
self.shared_state.0.send(da_block_costs)?;
if let Some(recorded_height) = self.recorded_height {
if end > recorded_height {
self.recorded_height = Some(end)
}
} else {
self.recorded_height = Some(end)
if end > self.recorded_height {
self.recorded_height = end
}
}
Ok(())
Expand All @@ -136,7 +139,7 @@ where
}

#[cfg(test)]
pub fn recorded_height(&self) -> Option<BlockHeight> {
pub fn recorded_height(&self) -> BlockHeight {
self.recorded_height
}
}
Expand All @@ -147,7 +150,7 @@ where
pub trait DaBlockCostsSource: Send + Sync {
async fn request_da_block_costs(
&mut self,
recorded_height: &Option<BlockHeight>,
recorded_height: &BlockHeight,
) -> Result<Vec<DaBlockCosts>>;
}

Expand Down Expand Up @@ -211,11 +214,12 @@ pub fn new_da_service<S: DaBlockCostsSource>(
da_source: S,
poll_interval: Option<Duration>,
latest_l2_height: Arc<Mutex<BlockHeight>>,
recorded_height: BlockHeight,
) -> ServiceRunner<DaSourceService<S>> {
ServiceRunner::new(DaSourceService::new(
da_source,
poll_interval,
latest_l2_height,
None,
recorded_height,
))
}
Loading

0 comments on commit 4ac8593

Please sign in to comment.