Skip to content

Commit

Permalink
RPC Consistency Proposal (#2473)
Browse files Browse the repository at this point in the history
## Linked Issues/PRs
<!-- List of related issues/PRs -->


Related issue:  #1897 
Closes #2605

See
https://www.notion.so/fuellabs/RPC-Consistency-Proposal-V2-13b2f2293f31809bbce0d93a4c28d633
for the proposal specs.

This PR implements a variation of the proposal in the link, due to
techincal limitations of the HTTP protocol when using graphql
subscriptions.

for context: Headers cannot be used, the reason being that subscriptions
will return multiple graphql responses in a single HTTP response (and
therefore have a single `HeaderMap`), while we require that each item in
the response has its associated `CURRENT_FUEL_BLOCK_HEIGHT` field.
Moreover the Response metadata for subscriptions is returned before the
actual response body (as it is standard with HTTP).

in the end graphql has a (very undocumented) way to add an `extensions`
field to responses, at the same level of `data/errors` field. This does
not require to tweak anything at the HTTP level.

We will implement an improvement where the server node will wait for a
configurable number of blocks before returning a `PRECONDITION FAILED`
response, in a follow-up PR.



Entrypoint for reviews: 
-
https://github.com/FuelLabs/fuel-core/pull/2473/files#diff-9caf873d3167ab9485e7882d621b5147e98b2478cc6462095b88374e2f914f3aR428
for the graphql handler
-
https://github.com/FuelLabs/fuel-core/pull/2473/files#diff-9caf873d3167ab9485e7882d621b5147e98b2478cc6462095b88374e2f914f3aR428
for the extension definition




## Manual testing 

## TODO: 

- [x] Tests 

## Description
<!-- List of detailed changes -->

## Checklist
- [ ] ~Breaking changes are clearly marked as such in the PR description
and changelog~
- [x] New behavior is reflected in tests
- [x] [The specification](https://github.com/FuelLabs/fuel-specs/)
matches the implemented behavior (link update PR if changes are needed)

### Before requesting review
- [x] I have reviewed the code myself
- [ ] ~I have created follow-up issues caused by this PR and linked them
here~

### After merging, notify other teams

[Add or remove entries as needed]

- [ ] [Rust SDK](https://github.com/FuelLabs/fuels-rs/)
- [ ] [Sway compiler](https://github.com/FuelLabs/sway/)
- [ ] [Platform
documentation](https://github.com/FuelLabs/devrel-requests/issues/new?assignees=&labels=new+request&projects=&template=NEW-REQUEST.yml&title=%5BRequest%5D%3A+)
(for out-of-organization contributors, the person merging the PR will do
this)
- [ ] Someone else?

---------

Co-authored-by: Mårten Blankfors <[email protected]>
Co-authored-by: Green Baneling <[email protected]>
Co-authored-by: Rafał Chabowski <[email protected]>
  • Loading branch information
4 people authored Feb 12, 2025
1 parent 0c87663 commit f73a12e
Show file tree
Hide file tree
Showing 14 changed files with 782 additions and 67 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

### Changed

- [2473](https://github.com/FuelLabs/fuel-core/pull/2473): Graphql requests and responses make use of a new `extensions` object to specify request/response metadata. A request `extensions` object can contain an integer-valued `required_fuel_block_height` field. When specified, the request will return an error unless the node's current fuel block height is at least the value specified in the `required_fuel_block_height` field. All graphql responses now contain an integer-valued `current_fuel_block_height` field in the `extensions` object, which contains the block height of the last block processed by the node.
- [2653](https://github.com/FuelLabs/fuel-core/pull/2653): Added cleaner error for wasm-executor upon failed deserialization.
- [2705](https://github.com/FuelLabs/fuel-core/pull/2705): Update the default value for `--max-block-size` and `--max-transmit-size` to 50 MB

Expand Down
218 changes: 180 additions & 38 deletions crates/client/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,33 +1,40 @@
#[cfg(feature = "subscriptions")]
use crate::client::types::StatusWithTransaction;
use crate::client::{
schema::{
block::BlockByHeightArgs,
coins::{
ExcludeInput,
SpendQueryElementInput,
use crate::{
client::{
schema::{
block::BlockByHeightArgs,
coins::{
ExcludeInput,
SpendQueryElementInput,
},
contract::ContractBalanceQueryArgs,
gas_price::EstimateGasPrice,
message::MessageStatusArgs,
relayed_tx::RelayedTransactionStatusArgs,
tx::DryRunArg,
Tai64Timestamp,
TransactionId,
},
contract::ContractBalanceQueryArgs,
gas_price::EstimateGasPrice,
message::MessageStatusArgs,
relayed_tx::RelayedTransactionStatusArgs,
tx::DryRunArg,
Tai64Timestamp,
TransactionId,
},
types::{
asset::AssetDetail,
gas_price::LatestGasPrice,
message::MessageStatus,
primitives::{
Address,
AssetId,
BlockId,
ContractId,
UtxoId,
types::{
asset::AssetDetail,
gas_price::LatestGasPrice,
message::MessageStatus,
primitives::{
Address,
AssetId,
BlockId,
ContractId,
UtxoId,
},
upgrades::StateTransitionBytecode,
RelayedTransactionStatus,
},
upgrades::StateTransitionBytecode,
RelayedTransactionStatus,
},
reqwest_ext::{
FuelGraphQlResponse,
FuelOperation,
ReqwestExt,
},
};
use anyhow::Context;
Expand All @@ -39,8 +46,6 @@ use base64::prelude::{
#[cfg(feature = "subscriptions")]
use cynic::StreamingOperation;
use cynic::{
http::ReqwestExt,
GraphQlResponse,
Id,
MutationBuilder,
Operation,
Expand Down Expand Up @@ -129,6 +134,10 @@ use std::{
self,
FromStr,
},
sync::{
Arc,
Mutex,
},
};
use tai64::Tai64;
use tracing as _;
Expand All @@ -151,12 +160,56 @@ pub mod types;

type RegisterId = u32;

#[derive(Debug, derive_more::Display, derive_more::From)]
#[non_exhaustive]
/// Error occurring during interaction with the FuelClient
// anyhow::Error is wrapped inside a custom Error type,
// so that we can specific error variants in the future.
pub enum Error {
/// Unknown or not expected(by architecture) error.
#[from]
Other(anyhow::Error),
}

/// Consistency policy for the [`FuelClient`] to define the strategy
/// for the required height feature.
#[derive(Debug)]
pub enum ConsistencyPolicy {
/// Automatically fetch the next block height from the response and
/// use it as an input to the next query to guarantee consistency
/// of the results for the queries.
Auto {
/// The required block height for the queries.
height: Arc<Mutex<Option<BlockHeight>>>,
},
/// Use manually sets the block height for all queries
/// via the [`FuelClient::with_required_fuel_block_height`].
Manual {
/// The required block height for the queries.
height: Option<BlockHeight>,
},
}

impl Clone for ConsistencyPolicy {
fn clone(&self) -> Self {
match self {
Self::Auto { height } => Self::Auto {
// We don't want to share the same mutex between the different
// instances of the `FuelClient`.
height: Arc::new(Mutex::new(height.lock().ok().and_then(|h| *h))),
},
Self::Manual { height } => Self::Manual { height: *height },
}
}
}

#[derive(Debug, Clone)]
pub struct FuelClient {
client: reqwest::Client,
#[cfg(feature = "subscriptions")]
cookie: std::sync::Arc<reqwest::cookie::Jar>,
url: reqwest::Url,
require_height: ConsistencyPolicy,
}

impl FromStr for FuelClient {
Expand Down Expand Up @@ -184,13 +237,22 @@ impl FromStr for FuelClient {
client,
cookie,
url,
require_height: ConsistencyPolicy::Auto {
height: Arc::new(Mutex::new(None)),
},
})
}

#[cfg(not(feature = "subscriptions"))]
{
let client = reqwest::Client::new();
Ok(Self { client, url })
Ok(Self {
client,
url,
require_height: ConsistencyPolicy::Auto {
height: Arc::new(Mutex::new(None)),
},
})
}
}
}
Expand Down Expand Up @@ -223,6 +285,36 @@ impl FuelClient {
Self::from_str(url.as_ref())
}

pub fn with_required_fuel_block_height(
&mut self,
new_height: Option<BlockHeight>,
) -> &mut Self {
match &mut self.require_height {
ConsistencyPolicy::Auto { height } => {
*height.lock().expect("Mutex poisoned") = new_height;
}
ConsistencyPolicy::Manual { height } => {
*height = new_height;
}
}
self
}

pub fn use_manual_consistency_policy(
&mut self,
height: Option<BlockHeight>,
) -> &mut Self {
self.require_height = ConsistencyPolicy::Manual { height };
self
}

pub fn required_block_height(&self) -> Option<BlockHeight> {
match &self.require_height {
ConsistencyPolicy::Auto { height } => height.lock().ok().and_then(|h| *h),
ConsistencyPolicy::Manual { height } => *height,
}
}

/// Send the GraphQL query to the client.
pub async fn query<ResponseData, Vars>(
&self,
Expand All @@ -232,20 +324,59 @@ impl FuelClient {
Vars: serde::Serialize,
ResponseData: serde::de::DeserializeOwned + 'static,
{
let required_fuel_block_height = self.required_block_height();
let fuel_operation = FuelOperation::new(q, required_fuel_block_height);
let response = self
.client
.post(self.url.clone())
.run_graphql(q)
.run_fuel_graphql(fuel_operation)
.await
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;

Self::decode_response(response)
let inner_required_height = match &self.require_height {
ConsistencyPolicy::Auto { height } => Some(height.clone()),
_ => None,
};

Self::decode_response(response, inner_required_height)
}

fn decode_response<R>(response: GraphQlResponse<R>) -> io::Result<R>
fn decode_response<R, E>(
response: FuelGraphQlResponse<R, E>,
inner_required_height: Option<Arc<Mutex<Option<BlockHeight>>>>,
) -> io::Result<R>
where
R: serde::de::DeserializeOwned + 'static,
{
if let Some(inner_required_height) = inner_required_height {
if let Some(current_fuel_block_height) = response
.extensions
.as_ref()
.and_then(|e| e.current_fuel_block_height)
{
let mut lock = inner_required_height.lock().expect("Mutex poisoned");

if current_fuel_block_height >= lock.unwrap_or_default() {
*lock = Some(current_fuel_block_height);
}
}
}

if let Some(failed) = response
.extensions
.as_ref()
.and_then(|e| e.fuel_block_height_precondition_failed)
{
if failed {
return Err(io::Error::new(
io::ErrorKind::Other,
"The required block height was not met",
));
}
}

let response = response.response;

match (response.data, response.errors) {
(Some(d), _) => Ok(d),
(_, Some(e)) => Err(from_strings_errors_to_std_error(
Expand All @@ -271,7 +402,11 @@ impl FuelClient {
use reqwest::cookie::CookieStore;
let mut url = self.url.clone();
url.set_path("/v1/graphql-sub");
let json_query = serde_json::to_string(&q)?;

let required_fuel_block_height = self.required_block_height();
let fuel_operation = FuelOperation::new(q, required_fuel_block_height);

let json_query = serde_json::to_string(&fuel_operation)?;
let mut client_builder = es::ClientBuilder::for_url(url.as_str())
.map_err(|e| {
io::Error::new(
Expand Down Expand Up @@ -329,18 +464,25 @@ impl FuelClient {

let mut last = None;

let inner_required_height = match &self.require_height {
ConsistencyPolicy::Auto { height } => Some(height.clone()),
_ => None,
};

let stream = es::Client::stream(&client)
.take_while(|result| {
.zip(futures::stream::repeat(inner_required_height))
.take_while(|(result, _)| {
futures::future::ready(!matches!(result, Err(es::Error::Eof)))
})
.filter_map(move |result| {
.filter_map(move |(result, inner_required_height)| {
tracing::debug!("Got result: {result:?}");
let r = match result {
Ok(es::SSE::Event(es::Event { data, .. })) => {
match serde_json::from_str::<GraphQlResponse<ResponseData>>(&data)
{
match serde_json::from_str::<FuelGraphQlResponse<ResponseData>>(
&data,
) {
Ok(resp) => {
match Self::decode_response(resp) {
match Self::decode_response(resp, inner_required_height) {
Ok(resp) => {
match last.replace(data) {
// Remove duplicates
Expand Down
1 change: 1 addition & 0 deletions crates/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#![deny(unused_crate_dependencies)]
#![deny(warnings)]
pub mod client;
pub mod reqwest_ext;
pub mod schema;

/// The GraphQL schema used by the library.
Expand Down
Loading

0 comments on commit f73a12e

Please sign in to comment.