Skip to content

Commit a46d248

Browse files
Integrates datastream client
1 parent ea65d41 commit a46d248

File tree

6 files changed

+99
-121
lines changed

6 files changed

+99
-121
lines changed

packages/composites/glv-token/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
"dependencies": {
3737
"@chainlink/external-adapter-framework": "2.7.0",
3838
"decimal.js": "^10.3.1",
39-
"ethers": "^5.4.6",
39+
"ethers": "^6.15.0",
4040
"tslib": "2.4.1"
4141
}
4242
}

packages/composites/glv-token/src/config/index.ts

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,18 +26,18 @@ export const config = new AdapterConfig(
2626
required: true,
2727
default: '0x6a9505D0B44cFA863d9281EA5B0b34cB36243b45',
2828
},
29-
TIINGO_ADAPTER_URL: {
30-
description: 'URL of Tiingo EA',
29+
DATA_ENGINE_BASE_URL: {
30+
description: 'URL of DataEngine',
3131
type: 'string',
3232
required: true,
3333
},
34-
NCFX_ADAPTER_URL: {
35-
description: 'URL of NCFX EA',
34+
DATA_ENGINE_USER_ID: {
35+
description: 'User ID of DataEngine',
3636
type: 'string',
3737
required: true,
3838
},
39-
COINMETRICS_ADAPTER_URL: {
40-
description: 'URL of Coinmetrics EA',
39+
DATA_ENGINE_USER_SECRET: {
40+
description: 'Secret key for DataEngine',
4141
type: 'string',
4242
required: true,
4343
},

packages/composites/glv-token/src/datastreams/client.ts

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -62,17 +62,21 @@ export class DataStreamsHttpClient {
6262
quote_asset?: string
6363
feed_type?: string
6464
}): Promise<FeedsResponse> {
65-
const qs = new URLSearchParams()
66-
if (params?.base_asset) qs.set('base_asset', params.base_asset)
67-
if (params?.quote_asset) qs.set('quote_asset', params.quote_asset)
68-
if (params?.feed_type) qs.set('feed_type', params.feed_type)
69-
const path = `/api/v1/feeds${qs.toString() ? `?${qs.toString()}` : ''}`
65+
const query = new URLSearchParams()
66+
if (params?.base_asset) query.set('base_asset', params.base_asset)
67+
if (params?.quote_asset) query.set('quote_asset', params.quote_asset)
68+
if (params?.feed_type) query.set('feed_type', params.feed_type)
69+
const path = `/api/v1/feeds${query.toString() ? `?${query.toString()}` : ''}`
7070
return this.request<FeedsResponse>('GET', path)
7171
}
7272

7373
async resolveFeedId(base: string, quote: string, feedType?: string): Promise<string> {
74-
const r = await this.listFeeds({ base_asset: base, quote_asset: quote, feed_type: feedType })
75-
const ids = (r.feeds ?? [])
74+
const response = await this.listFeeds({
75+
base_asset: base,
76+
quote_asset: quote,
77+
feed_type: feedType,
78+
})
79+
const ids = (response.feeds ?? [])
7680
.map((f) => (f.feedID ?? f.feedid ?? '').toLowerCase())
7781
.filter(Boolean)
7882
if (ids.length === 0) throw new Error(`No feed found for ${base}/${quote}`)
@@ -84,14 +88,14 @@ export class DataStreamsHttpClient {
8488
feedId: string,
8589
): Promise<{ fullReportHex: string; observationsTimestamp: number; validFromTimestamp: number }> {
8690
const path = `/api/v1/reports/latest?feedID=${encodeURIComponent(feedId)}`
87-
const r = await this.request<LatestReportResponse>('GET', path)
88-
const rr = r.report
89-
const full = rr.fullReport ?? rr.fullreport
90-
if (!full) throw new Error(`Missing fullReport for ${feedId}`)
91+
const response = await this.request<LatestReportResponse>('GET', path)
92+
const rawReport = response.report
93+
const fullReport = rawReport.fullReport ?? rawReport.fullreport
94+
if (!fullReport) throw new Error(`Missing fullReport for ${feedId}`)
9195
return {
92-
fullReportHex: full,
93-
observationsTimestamp: rr.observationsTimestamp,
94-
validFromTimestamp: rr.validFromTimestamp,
96+
fullReportHex: fullReport,
97+
observationsTimestamp: rawReport.observationsTimestamp,
98+
validFromTimestamp: rawReport.validFromTimestamp,
9599
}
96100
}
97101
}

packages/composites/glv-token/src/transport/base.ts

Lines changed: 67 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,18 @@
1-
import { ethers, utils } from 'ethers'
2-
import { SubscriptionTransport } from '@chainlink/external-adapter-framework/transports/abstract/subscription'
3-
import { TransportDependencies } from '@chainlink/external-adapter-framework/transports'
1+
import { EndpointContext } from '@chainlink/external-adapter-framework/adapter'
42
import { ResponseCache } from '@chainlink/external-adapter-framework/cache/response'
5-
import { Requester } from '@chainlink/external-adapter-framework/util/requester'
6-
import {
7-
EndpointContext,
8-
LwbaResponseDataFields,
9-
} from '@chainlink/external-adapter-framework/adapter'
3+
import { TransportDependencies } from '@chainlink/external-adapter-framework/transports'
4+
import { SubscriptionTransport } from '@chainlink/external-adapter-framework/transports/abstract/subscription'
105
import { AdapterResponse, makeLogger } from '@chainlink/external-adapter-framework/util'
6+
import { Requester } from '@chainlink/external-adapter-framework/util/requester'
117
import { AdapterDataProviderError } from '@chainlink/external-adapter-framework/validation/error'
8+
import { TypeFromDefinition } from '@chainlink/external-adapter-framework/validation/input-params'
9+
import { ethers } from 'ethers'
1210
import glvAbi from '../config/glvReaderAbi.json'
13-
import { BaseEndpointTypes, inputParameters } from '../endpoint/price'
11+
import { DataStreamsHttpClient } from '../datastreams/client'
12+
import { decodeReport } from '../datastreams/decode'
1413
import { BaseEndpointTypesLwba } from '../endpoint/lwba'
14+
import { BaseEndpointTypes, inputParameters } from '../endpoint/price'
1515
import {
16-
mapParameter,
1716
mapSymbol,
1817
Market,
1918
median,
@@ -23,7 +22,6 @@ import {
2322
toFixed,
2423
Token,
2524
} from './utils'
26-
import { TypeFromDefinition } from '@chainlink/external-adapter-framework/validation/input-params'
2725

2826
const logger = makeLogger('GlvBaseTransport')
2927

@@ -54,10 +52,10 @@ export abstract class BaseGlvTransport<
5452
name!: string
5553
responseCache!: ResponseCache<T>
5654
requester!: Requester
57-
provider!: ethers.providers.JsonRpcProvider
55+
provider!: ethers.JsonRpcProvider
5856
glvReaderContract!: ethers.Contract
5957
settings!: T['Settings']
60-
58+
dataStreamsClient!: DataStreamsHttpClient
6159
tokensMap: Record<string, Token> = {}
6260
marketsMap: Record<string, Market> = {}
6361
decimals: Record<string, number> = {}
@@ -70,7 +68,7 @@ export abstract class BaseGlvTransport<
7068
): Promise<void> {
7169
await super.initialize(dependencies, adapterSettings, endpointName, transportName)
7270
this.settings = adapterSettings
73-
this.provider = new ethers.providers.JsonRpcProvider(
71+
this.provider = new ethers.JsonRpcProvider(
7472
adapterSettings.ARBITRUM_RPC_URL,
7573
adapterSettings.ARBITRUM_CHAIN_ID,
7674
)
@@ -85,6 +83,13 @@ export abstract class BaseGlvTransport<
8583
await this.tokenInfo()
8684
await this.marketInfo()
8785

86+
this.dataStreamsClient = new DataStreamsHttpClient({
87+
baseUrl: adapterSettings.DATA_ENGINE_BASE_URL,
88+
userId: adapterSettings.DATA_ENGINE_USER_ID,
89+
userSecret: adapterSettings.DATA_ENGINE_USER_SECRET,
90+
timeoutMs: adapterSettings.GLV_INFO_API_TIMEOUT_MS,
91+
requester: dependencies.requester,
92+
})
8893
if (this.settings.METADATA_REFRESH_INTERVAL_MS > 0) {
8994
setInterval(() => {
9095
this.tokenInfo()
@@ -195,8 +200,8 @@ export abstract class BaseGlvTransport<
195200
this.glvReaderContract.getGlvTokenPrice(...glvTokenPriceContractParams, false),
196201
])
197202

198-
const maximizedPrice = Number(utils.formatUnits(maximizedPriceRaw, SIGNED_PRICE_DECIMALS))
199-
const minimizedPrice = Number(utils.formatUnits(minimizedPriceRaw, SIGNED_PRICE_DECIMALS))
203+
const maximizedPrice = Number(ethers.formatUnits(maximizedPriceRaw, SIGNED_PRICE_DECIMALS))
204+
const minimizedPrice = Number(ethers.formatUnits(minimizedPriceRaw, SIGNED_PRICE_DECIMALS))
200205
const result = median([minimizedPrice, maximizedPrice])
201206

202207
const timestamps = {
@@ -217,74 +222,64 @@ export abstract class BaseGlvTransport<
217222
private async fetchPrices(assets: string[], dataRequestedTimestamp: number) {
218223
const priceData = {} as PriceData
219224

220-
const sources = [
221-
{ url: this.settings.TIINGO_ADAPTER_URL, name: 'tiingo' },
222-
{ url: this.settings.COINMETRICS_ADAPTER_URL, name: 'coinmetrics' },
223-
{ url: this.settings.NCFX_ADAPTER_URL, name: 'ncfx' },
224-
]
225-
226225
const priceProviders: Record<string, string[]> = {}
227-
const promises = []
228-
229-
for (let i = 0; i < sources.length; i++) {
230-
const source = sources[i]
231-
const assetPromises = assets.map(async (asset) => {
232-
const mappedAsset = mapParameter(source.name, asset)
233-
const base = this.unwrapAsset(mappedAsset)
234-
const requestConfig = {
235-
url: source.url,
236-
method: 'POST',
237-
data: {
238-
data: {
239-
endpoint: 'crypto-lwba',
240-
base,
241-
quote: 'USD',
226+
const sources: Source[] = [{ name: 'data-streams', url: this.settings.DATA_ENGINE_BASE_URL }]
227+
228+
await Promise.all(
229+
assets.map(async (asset) => {
230+
const base = this.unwrapAsset(asset)
231+
const feedId = await this.dataStreamsClient.resolveFeedId(base, 'USD', 'Crypto')
232+
const { fullReportHex } = await this.dataStreamsClient.getLatestReport(feedId)
233+
234+
// Decode (V3 expected: price, bid, ask; decoder handles V2–V10)
235+
const decoded = decodeReport(fullReportHex, feedId)
236+
const scale = 10 ** SIGNED_PRICE_DECIMALS
237+
const toNum = (x: any | undefined) => (x === undefined ? undefined : Number(x) / scale)
238+
239+
const v3Bid = (decoded as any).bid
240+
const v3Ask = (decoded as any).ask
241+
const v3Price = (decoded as any).price
242+
243+
const bidNum = toNum(v3Bid ?? v3Price)
244+
const askNum = toNum(v3Ask ?? v3Price)
245+
246+
if (bidNum === undefined || askNum === undefined) {
247+
throw new AdapterDataProviderError(
248+
{ statusCode: 502, message: `Could not decode bid/ask for ${asset}` },
249+
{
250+
providerDataRequestedUnixMs: dataRequestedTimestamp,
251+
providerDataReceivedUnixMs: Date.now(),
252+
providerIndicatedTimeUnixMs: undefined,
242253
},
243-
},
244-
}
245-
246-
try {
247-
const response = await this.requester.request<{ data: LwbaResponseDataFields['Data'] }>(
248-
JSON.stringify(requestConfig),
249-
requestConfig,
250-
)
251-
const { bid, ask } = response.response.data.data
252-
253-
priceData[asset] = {
254-
bids: [...(priceData[asset]?.bids || []), bid],
255-
asks: [...(priceData[asset]?.asks || []), ask],
256-
}
257-
258-
priceProviders[asset] = priceProviders[asset]
259-
? [...new Set([...priceProviders[asset], source.name])]
260-
: [source.name]
261-
} catch (error) {
262-
const e = error as Error
263-
logger.error(
264-
`Error fetching data for ${asset} from ${source.name}, url - ${source.url}: ${e.message}`,
265254
)
266255
}
267-
})
268256

269-
promises.push(...assetPromises)
270-
}
257+
// Store raw numbers for median calc
258+
priceData[asset] = {
259+
bids: [...(priceData[asset]?.bids || []), bidNum],
260+
asks: [...(priceData[asset]?.asks || []), askNum],
261+
}
271262

272-
await Promise.all(promises)
263+
// Track that Data Streams responded for this *base* key
264+
priceProviders[base] = priceProviders[base] ? priceProviders[base] : []
265+
if (!priceProviders[base].includes(sources[0].name)) {
266+
priceProviders[base].push(sources[0].name)
267+
}
268+
}),
269+
)
273270

274271
this.validateRequiredResponses(priceProviders, sources, assets, dataRequestedTimestamp)
275272

276273
const medianValues = this.calculateMedian(assets, priceData)
277-
278274
const prices: Record<string, Record<string, string | number>> = {}
279275

280-
medianValues.map(
281-
(v) =>
282-
(prices[v.asset] = {
283-
...v,
284-
ask: toFixed(v.ask, this.decimals[v.asset as keyof typeof this.decimals]),
285-
bid: toFixed(v.bid, this.decimals[v.asset as keyof typeof this.decimals]),
286-
}),
287-
)
276+
medianValues.forEach((v) => {
277+
prices[v.asset] = {
278+
...v,
279+
ask: toFixed(v.ask, this.decimals[v.asset as keyof typeof this.decimals]),
280+
bid: toFixed(v.bid, this.decimals[v.asset as keyof typeof this.decimals]),
281+
}
282+
})
288283

289284
return {
290285
prices,

packages/composites/glv-token/src/transport/utils.ts

Lines changed: 0 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -52,24 +52,3 @@ export interface Market {
5252
export function mapSymbol(address: string, symbolMap: Record<string, any>) {
5353
return symbolMap[address]
5454
}
55-
56-
const adapterParamOverride: Record<string, Record<string, string>> = {
57-
coinmetrics: {
58-
TAO: 'tao_bittensor',
59-
SPX6900: 'spx',
60-
},
61-
tiingo: {
62-
FLOKI: 'floki2',
63-
SPX6900: 'spx',
64-
},
65-
ncfx: {
66-
SPX6900: 'spx',
67-
},
68-
}
69-
70-
export function mapParameter(source: string, param: string) {
71-
if (source in adapterParamOverride && param in adapterParamOverride[source]) {
72-
return adapterParamOverride[source][param]
73-
}
74-
return param
75-
}

packages/composites/glv-token/test/integration/adapter.test.ts

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,25 @@
1+
import {
2+
LwbaResponseDataFields,
3+
validateLwbaResponse,
4+
} from '@chainlink/external-adapter-framework/adapter'
15
import {
26
TestAdapter,
37
setEnvVariables,
48
} from '@chainlink/external-adapter-framework/util/testing-utils'
9+
import { ethers } from 'ethers'
510
import * as nock from 'nock'
611
import {
712
mockCoinmetricsEAResponseSuccess,
813
mockNCFXEAResponseSuccess,
914
mockTiingoEAResponseSuccess,
1015
} from './fixtures'
11-
import { ethers } from 'ethers'
12-
import {
13-
LwbaResponseDataFields,
14-
validateLwbaResponse,
15-
} from '@chainlink/external-adapter-framework/adapter'
1616

1717
jest.mock('ethers', () => ({
1818
...jest.requireActual('ethers'),
1919
ethers: {
2020
providers: {
21-
JsonRpcProvider: function (): ethers.providers.JsonRpcProvider {
22-
return {} as ethers.providers.JsonRpcProvider
21+
JsonRpcProvider: function (): ethers.JsonRpcProvider {
22+
return {} as ethers.JsonRpcProvider
2323
},
2424
},
2525
Contract: function () {

0 commit comments

Comments
 (0)