Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support optimistic concurrency control headers #3462

Draft
wants to merge 13 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/fuel-core/VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.40.4
git:master
16 changes: 12 additions & 4 deletions packages/account/src/providers/fuel-graphql-subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,29 @@ type FuelGraphQLSubscriberOptions = {
query: DocumentNode;
variables?: Record<string, unknown>;
fetchFn: typeof fetch;
operationName?: string;
};

type Events = Array<{
data: Record<string, unknown>;
extensions: Record<string, unknown>;
errors?: { message: string }[];
}>;

export class FuelGraphqlSubscriber implements AsyncIterator<unknown> {
public static incompatibleNodeVersionMessage: string | false = false;
private static textDecoder = new TextDecoder();

private constructor(private stream: ReadableStreamDefaultReader<Uint8Array>) {}

public static async create(options: FuelGraphQLSubscriberOptions) {
const { url, query, variables, fetchFn } = options;
const { url, query, variables, fetchFn, operationName } = options;
const response = await fetchFn(`${url}-sub`, {
method: 'POST',
body: JSON.stringify({
query: print(query),
variables,
operationName,
}),
headers: {
'Content-Type': 'application/json',
Expand Down Expand Up @@ -67,17 +75,17 @@ export class FuelGraphqlSubscriber implements AsyncIterator<unknown> {
}
}

private events: Array<{ data: unknown; errors?: { message: string }[] }> = [];
private events: Events = [];
private parsingLeftover = '';

async next(): Promise<IteratorResult<unknown, unknown>> {
// eslint-disable-next-line no-constant-condition
while (true) {
if (this.events.length > 0) {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const { data, errors } = this.events.shift()!;
const { data, extensions, errors } = this.events.shift()!;
assertGqlResponseHasNoErrors(errors, FuelGraphqlSubscriber.incompatibleNodeVersionMessage);
return { value: data, done: false };
return { value: { ...data, extensions }, done: false };
}
const { value, done } = await this.stream.read();
if (done) {
Expand Down
39 changes: 33 additions & 6 deletions packages/account/src/providers/provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ import {
} from './utils';
import type { RetryOptions } from './utils/auto-retry-fetch';
import { autoRetryFetch } from './utils/auto-retry-fetch';
import {
extractBlockHeight,
extractOperationDefinition,
isBlockSensitiveOperation,
} from './utils/graphql-helpers';
import { assertGqlResponseHasNoErrors } from './utils/handle-gql-error-message';
import { validatePaginationArgs } from './utils/validate-pagination-args';

Expand Down Expand Up @@ -410,6 +415,8 @@ type NodeInfoCache = Record<string, NodeInfo>;

type Operations = ReturnType<typeof getOperationsSdk>;

export type OperationsNames = keyof ReturnType<typeof getOperationsSdk>;

type SdkOperations = Omit<Operations, 'statusChange' | 'submitAndAwaitStatus'> & {
statusChange: (
...args: Parameters<Operations['statusChange']>
Expand All @@ -436,6 +443,8 @@ export default class Provider {
/** @hidden */
public url: string;
/** @hidden */
private currentBlockHeight: number = 0;
/** @hidden */
private urlWithoutAuth: string;
/** @hidden */
private static chainInfoCache: ChainInfoCache = {};
Expand All @@ -458,7 +467,7 @@ export default class Provider {
/**
* @hidden
*/
private static getFetchFn(options: ProviderOptions): NonNullable<ProviderOptions['fetch']> {
private getFetchFn(options: ProviderOptions): NonNullable<ProviderOptions['fetch']> {
const { retryOptions, timeout, headers } = options;

return autoRetryFetch(async (...args) => {
Expand All @@ -472,6 +481,12 @@ export default class Provider {
headers: { ...request?.headers, ...headers },
};

const requestBody = JSON.parse(fullRequest.body as string);
if (isBlockSensitiveOperation(requestBody.operationName)) {
requestBody.extensions = { required_fuel_block_height: this.currentBlockHeight };
fullRequest.body = JSON.stringify(requestBody);
}

if (options.requestMiddleware) {
fullRequest = await options.requestMiddleware(fullRequest);
}
Expand Down Expand Up @@ -542,6 +557,17 @@ export default class Provider {
};
}

getCurrentBlockHeight(): number {
return this.currentBlockHeight;
}

setCurrentBlockHeight(param: { extensions: unknown }) {
const blockHeight = extractBlockHeight(param);
if (blockHeight && blockHeight > this.currentBlockHeight) {
this.currentBlockHeight = blockHeight;
}
}

/**
* Initialize Provider async stuff
*/
Expand Down Expand Up @@ -679,11 +705,12 @@ export default class Provider {
* @hidden
*/
private createOperations(): SdkOperations {
const fetchFn = Provider.getFetchFn(this.options);
const fetchFn = this.getFetchFn(this.options);
const gqlClient = new GraphQLClient(this.urlWithoutAuth, {
fetch: (input: RequestInfo | URL, requestInit?: RequestInit) =>
fetchFn(input.toString(), requestInit || {}, this.options),
responseMiddleware: (response: GraphQLClientResponse<unknown> | Error) => {
this.setCurrentBlockHeight(response as { extensions: unknown });
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Torres-ssf Because the node waiting time threshold is dynamic and can be changed or disabled at any time, we must validate the response and implement the whole timing/check/retry independently.

if ('response' in response) {
const graphQlResponse = response.response as GraphQLResponse;
assertGqlResponseHasNoErrors(
Expand All @@ -695,17 +722,17 @@ export default class Provider {
});

const executeQuery = (query: DocumentNode, vars: Record<string, unknown>) => {
const opDefinition = query.definitions.find((x) => x.kind === 'OperationDefinition') as {
operation: string;
};
const isSubscription = opDefinition?.operation === 'subscription';
const operationDefinition = extractOperationDefinition(query);

const isSubscription = operationDefinition.operation === 'subscription';

if (isSubscription) {
return FuelGraphqlSubscriber.create({
url: this.urlWithoutAuth,
query,
fetchFn: (url, requestInit) => fetchFn(url as string, requestInit, this.options),
variables: vars,
operationName: operationDefinition.name?.value,
});
}
return gqlClient.request(query, vars);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,9 @@ export class TransactionResponse {
);
}
if (statusChange.type !== 'SubmittedStatus') {
// TODO: Fix type here
// eslint-disable-next-line @typescript-eslint/no-explicit-any
this.provider.setCurrentBlockHeight(sub as any);
break;
}
}
Expand Down
21 changes: 21 additions & 0 deletions packages/account/src/providers/utils/graphql-helpers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import type { DocumentNode, OperationDefinitionNode } from 'graphql';

import type { OperationsNames } from '../provider';

const BLOCK_SENSITIVE_OPERATIONS: OperationsNames[] = [
'submit',
'statusChange',
'getCoinsToSpend',
'submitAndAwaitStatus',
];

export const isBlockSensitiveOperation = (operationName: OperationsNames): boolean =>
BLOCK_SENSITIVE_OPERATIONS.includes(operationName);

export const extractOperationDefinition = (operation: DocumentNode) =>
operation.definitions.find((x) => x.kind === 'OperationDefinition') as OperationDefinitionNode;

export const extractBlockHeight = (response: { extensions?: unknown }): number | undefined => {
const { extensions } = response;
return (extensions as { current_fuel_block_height: number })?.current_fuel_block_height;
};
2 changes: 1 addition & 1 deletion packages/versions/src/lib/getBuiltinVersions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import type { Versions } from './types';
export function getBuiltinVersions(): Versions {
return {
FORC: '0.66.7',
FUEL_CORE: '0.40.4',
FUEL_CORE: 'git:1897-rpc-consistency-proposal',
FUELS: '0.99.0',
};
}
Loading