diff --git a/packages/sources/coinbase-prime/src/config/index.ts b/packages/sources/coinbase-prime/src/config/index.ts index cd529b8397..057e1b99b6 100644 --- a/packages/sources/coinbase-prime/src/config/index.ts +++ b/packages/sources/coinbase-prime/src/config/index.ts @@ -24,6 +24,11 @@ export const config = new AdapterConfig({ required: true, sensitive: true, }, + DELAYED_RESPONSE_MS: { + description: 'The amount of time to delay the new response in milliseconds', + type: 'number', + default: 180_000, + }, BACKGROUND_EXECUTE_MS: { description: 'The amount of time the background execute should sleep before performing the next request', diff --git a/packages/sources/coinbase-prime/src/endpoint/balance2.ts b/packages/sources/coinbase-prime/src/endpoint/balance2.ts new file mode 100644 index 0000000000..6eb754e568 --- /dev/null +++ b/packages/sources/coinbase-prime/src/endpoint/balance2.ts @@ -0,0 +1,66 @@ +import { AdapterEndpoint } from '@chainlink/external-adapter-framework/adapter' +import { InputParameters } from '@chainlink/external-adapter-framework/validation' +import { AdapterError } from '@chainlink/external-adapter-framework/validation/error' +import { SingleNumberResultResponse } from '@chainlink/external-adapter-framework/util' +import { config } from '../config' +import { balanceTransport } from '../transport/balance2' +import { getApiKeys } from '../transport/utils' + +export const inputParameters = new InputParameters( + { + portfolio: { + required: true, + type: 'string', + description: 'The portfolio ID to query the balance of', + }, + symbol: { + required: true, + type: 'string', + description: 'The symbol to return the balance for', + }, + type: { + type: 'string', + description: 'The balance type to return', + default: 'total', + options: ['total', 'vault', 'trading'], + }, + apiKey: { + type: 'string', + description: + 'Alternative api keys to use for this request, {$apiKey}_ACCESS_KEY {$apiKey}_PASSPHRASE {$apiKey}_SIGNING_KEY required in environment variables', + default: '', + }, + acceptDelay: { + type: 'boolean', + description: 'Delay ', + default: false, + }, + }, + [ + { + portfolio: 'abcd1234-123a-1234-ab12-12a34bcd56e7', + symbol: 'BTC', + type: 'total', + apiKey: '', + acceptDelay: false, + }, + ], +) + +export type BaseEndpointTypes = { + Parameters: typeof inputParameters.definition + Response: SingleNumberResultResponse + Settings: typeof config.settings +} + +export const endpoint = new AdapterEndpoint({ + name: 'balance2', + transport: balanceTransport, + inputParameters, + customInputValidation: (request, settings): AdapterError | undefined => { + if (request.requestContext.data.apiKey) { + getApiKeys(request.requestContext.data.apiKey, settings) + } + return + }, +}) diff --git a/packages/sources/coinbase-prime/src/endpoint/index.ts b/packages/sources/coinbase-prime/src/endpoint/index.ts index 681acabb92..18b6846719 100644 --- a/packages/sources/coinbase-prime/src/endpoint/index.ts +++ b/packages/sources/coinbase-prime/src/endpoint/index.ts @@ -1,2 +1,3 @@ export { endpoint as balance } from './balance' +export { endpoint as balance2 } from './balance2' export { endpoint as wallet } from './wallet' diff --git a/packages/sources/coinbase-prime/src/index.ts b/packages/sources/coinbase-prime/src/index.ts index 403f9854dc..abc79e506e 100644 --- a/packages/sources/coinbase-prime/src/index.ts +++ b/packages/sources/coinbase-prime/src/index.ts @@ -1,13 +1,13 @@ import { expose, ServerInstance } from '@chainlink/external-adapter-framework' import { config } from './config' -import { balance, wallet } from './endpoint' +import { balance, wallet, balance2 } from './endpoint' import { PoRAdapter } from '@chainlink/external-adapter-framework/adapter/por' export const adapter = new PoRAdapter({ defaultEndpoint: balance.name, name: 'COINBASE_PRIME', config, - endpoints: [balance, wallet], + endpoints: [balance, wallet, balance2], rateLimiting: { tiers: { default: { diff --git a/packages/sources/coinbase-prime/src/transport/balance2.ts b/packages/sources/coinbase-prime/src/transport/balance2.ts new file mode 100644 index 0000000000..5595b2feaa --- /dev/null +++ b/packages/sources/coinbase-prime/src/transport/balance2.ts @@ -0,0 +1,256 @@ +import { TransportDependencies } from '@chainlink/external-adapter-framework/transports' +import { BaseEndpointTypes, inputParameters } from '../endpoint/balance2' +import { sign, getApiKeys, errorResponse } from './utils' +import { + calculateCacheKey, + calculateHttpRequestKey, +} from '@chainlink/external-adapter-framework/cache' +import { EndpointContext } from '@chainlink/external-adapter-framework/adapter' +import { sleep, AdapterResponse, makeLogger } from '@chainlink/external-adapter-framework/util' +import { Requester } from '@chainlink/external-adapter-framework/util/requester' +import { SubscriptionTransport } from '@chainlink/external-adapter-framework/transports/abstract/subscription' + +const logger = makeLogger('Balance2Transport') + +export interface ResponseSchema { + balances: { + symbol: string + amount: string + holds: string + bonded_amount: string + reserved_amount: string + unbonding_amount: string + unvested_amount: string + pending_rewards_amount: string + past_rewards_amount: string + bondable_amount: string + withdrawable_amount: string + fiat_amount: string + }[] + type: string + trading_balances: { + total: string // Returns total in fiat amount + holds: string + } + vault_balances: { + total: string // Returns total in fiat amount + holds: string + } +} + +export type BalanceTransportTypes = BaseEndpointTypes & { + Provider: { + RequestBody: never + ResponseBody: ResponseSchema + } +} + +type RequestParams = typeof inputParameters.validated + +// revisit if we have >100 separate portfolios using this EA +type BlipCacheValue = { + result: number + timestamp: number +} +const blipCache = new Map() + +export class BalanceTransport extends SubscriptionTransport { + settings!: BalanceTransportTypes['Settings'] + requester!: Requester + endpointName!: string + + async initialize( + dependencies: TransportDependencies, + adapterSettings: BalanceTransportTypes['Settings'], + endpointName: string, + transportName: string, + ): Promise { + await super.initialize(dependencies, adapterSettings, endpointName, transportName) + this.settings = adapterSettings + this.requester = dependencies.requester + this.endpointName = endpointName + } + + async backgroundHandler( + context: EndpointContext, + entries: RequestParams[], + ) { + await Promise.all(entries.map(async (param) => this.handleRequest(param))) + await sleep(context.adapterSettings.BACKGROUND_EXECUTE_MS) + } + + async handleRequest(param: RequestParams) { + let response: AdapterResponse + try { + response = await this._handleRequest(param) + } catch (e) { + const errorMessage = e instanceof Error ? e.message : 'Unknown error occurred' + logger.error(e, errorMessage) + response = { + statusCode: 502, + errorMessage, + timestamps: { + providerDataRequestedUnixMs: 0, + providerDataReceivedUnixMs: 0, + providerIndicatedTimeUnixMs: undefined, + }, + } + } + await this.responseCache.write(this.name, [{ params: param, response }]) + } + + async _handleRequest( + param: RequestParams, + ): Promise> { + const { portfolio, symbol, type, apiKey, acceptDelay } = param + const providerDataRequestedUnixMs = Date.now() + + const response = await this.sendBalanceRequest(portfolio, symbol, type, apiKey) + if (!response) { + return errorResponse( + `The data provider did not return data for Portfolio: ${param.portfolio}, Balance Type: ${param.type}, Symbol: ${param.symbol}`, + providerDataRequestedUnixMs, + ) + } + + if (!response.balances) { + return errorResponse( + `The data provider response does not contain a balances list for Portfolio: ${param.portfolio}, Balance Type: ${param.type}, Symbol: ${param.symbol}`, + providerDataRequestedUnixMs, + ) + } + + // The adapter only supports querying one asset at a time so the balances list should only contain 1 element + if (response.balances.length !== 1) { + return errorResponse( + `The data provider response does not contain exactly one element in the balances list for Portfolio: ${param.portfolio}, Balance Type: ${param.type}, Symbol: ${param.symbol}`, + providerDataRequestedUnixMs, + ) + } + + const result = Number(response.balances[0].amount) + if (isNaN(result)) { + return errorResponse( + `The data provider returned non-numeric balance: ${response.balances[0].amount}`, + providerDataRequestedUnixMs, + ) + } + + const generateResponseBody = (r: number = result) => { + return { + result: r, + data: { + result: r, + }, + statusCode: 200, + timestamps: { + providerDataRequestedUnixMs, + providerDataReceivedUnixMs: Date.now(), + providerIndicatedTimeUnixMs: undefined, + }, + } + } + + // If acceptDelay is false, return the new result right away + if (!acceptDelay) { + return generateResponseBody() + } + + const cacheKey = calculateCacheKey({ + transportName: this.name, + data: param, + adapterName: this.responseCache.adapterName, + endpointName: this.responseCache.endpointName, + adapterSettings: this.responseCache.adapterSettings, + }) + + // If `result` doesn't match already cached response, don't update the response cache right away. + // We want to delay returning the new value by time = DELAYED_RESPONSE_MS by caching this value + // in a separate map for DELAYED_RESPONSE_MS. + const cachedResponse = await this.responseCache.cache.get(cacheKey) + if (!cachedResponse?.result || result === cachedResponse.result) { + // If no cached result or the new result is the same as the cached result, + // return the new result, which writes to or refreshes the response cache TTL + // Clear the blipCache to avoid edge case where the value goes from x to y, then back to x, then back to y + // which would maintain a value in the cache, registering the second y as having passed the cache threshold immediately + logger.trace(`Preventatively deleting blipCache for ${cacheKey}`) + blipCache.delete(cacheKey) + return generateResponseBody() + } + + const blipCacheValue = blipCache.get(cacheKey) + + // If the result is the same as the temporarily cached value in blipCache, we want to check if + // the value in blipCache has been cached long enough to be considered "good" + if (result === blipCacheValue?.result) { + const isBlipCacheStale = + blipCacheValue?.timestamp <= Date.now() - this.settings.DELAYED_RESPONSE_MS + if (isBlipCacheStale) { + // blipCache value has been cached long enough and seems like a good value, update the response cache + logger.debug(`Deleting blipCache for ${cacheKey}`) + blipCache.delete(cacheKey) + return generateResponseBody() + } + } else { + // blipCache value is missing or is not the same as the result, overwrite + logger.debug(`Setting blipCache for ${cacheKey} to ${result}`) + blipCache.set(cacheKey, { result, timestamp: providerDataRequestedUnixMs }) + } + + // At this point, we have a new result that is different from the cached result + // and the blipCache value is still under the DELAYED_RESPONSE_MS threshold. + // return the cached result + return generateResponseBody(cachedResponse.result) + } + + async sendBalanceRequest( + portfolio: string, + symbol: string, + type: string, + apiKey: string, + ): Promise { + const [signingKey, accessKey, passPhrase] = getApiKeys(apiKey, this.settings) + const timestamp = Math.floor(Date.now() / 1000) + const method = 'GET' + const path = `/v1/portfolios/${portfolio}/balances` + const message = `${timestamp}${method}${path}` + const signature = sign(message, signingKey) + + const requestConfig = { + baseURL: this.settings.API_ENDPOINT, + url: path, + headers: { + 'X-CB-ACCESS-KEY': accessKey, + 'X-CB-ACCESS-PASSPHRASE': passPhrase, + 'X-CB-ACCESS-SIGNATURE': signature, + 'X-CB-ACCESS-TIMESTAMP': timestamp, + 'Content-Type': 'application/json', + }, + params: { + symbols: symbol.toUpperCase(), + balance_type: `${type.toUpperCase()}_BALANCES`, + }, + } + + const res = await this.requester.request( + calculateHttpRequestKey({ + context: { + adapterSettings: this.settings, + inputParameters, + endpointName: this.endpointName, + }, + data: requestConfig.params, + transportName: this.name, + }), + requestConfig, + ) + + return res.response.data + } + + getSubscriptionTtlFromConfig(adapterSettings: BaseEndpointTypes['Settings']): number { + return adapterSettings.WARMUP_SUBSCRIPTION_TTL + } +} + +export const balanceTransport = new BalanceTransport() diff --git a/packages/sources/coinbase-prime/src/transport/utils.ts b/packages/sources/coinbase-prime/src/transport/utils.ts index 334a546323..79361585e5 100644 --- a/packages/sources/coinbase-prime/src/transport/utils.ts +++ b/packages/sources/coinbase-prime/src/transport/utils.ts @@ -29,3 +29,19 @@ export const getApiKeys = (apiKey: string, config: BaseEndpointTypes['Settings'] return [config.SIGNING_KEY, config.ACCESS_KEY, config.PASSPHRASE] } } + +export const errorResponse = ( + message: string, + providerDataRequestedUnixMs: number, + statusCode = 502, +) => { + return { + errorMessage: message, + statusCode, + timestamps: { + providerDataRequestedUnixMs, + providerDataReceivedUnixMs: Date.now(), + providerIndicatedTimeUnixMs: undefined, + }, + } +}