diff --git a/src/transports/websocket.ts b/src/transports/websocket.ts index f38736dc..6449c60c 100644 --- a/src/transports/websocket.ts +++ b/src/transports/websocket.ts @@ -187,6 +187,18 @@ export class WebSocketTransport< return !this.wsConnection || this.wsConnection.readyState === WebSocket.CLOSED } + /** + * Increments the failover counter and updates the wsConnectionFailoverCount metric. + * Both operations must always happen together, so they are encapsulated here. + */ + private incrementFailoverCounter(filteredUrl: string): void { + this.streamHandlerInvocationsWithNoConnection += 1 + metrics + .get('wsConnectionFailoverCount') + .labels({ transport_name: this.name, url: filteredUrl }) + .set(this.streamHandlerInvocationsWithNoConnection) + } + serializeMessage(payload: unknown): string { return typeof payload === 'string' ? payload : JSON.stringify(payload) } @@ -289,20 +301,24 @@ export class WebSocketTransport< // Called when the WS connection closes for any reason close: (event: WebSocket.CloseEvent) => { - // If the connection closed with 1000, it's a usual closure - const level = event.code === 1000 ? 'debug' : 'info' - logger[level]( - `Closed websocket connection. Code: ${event.code} ; reason: ${event.reason?.toString()}`, - ) - - // Record active ws connections by decrementing count on close - // Using URL in label since connection_key is removed from v3 - metrics.get('wsConnectionActive').dec() - - // Also, register that the connection was closed and the reason why. // We need to filter out query params from the URL to avoid having // the cardinality of the metric go out of control. const filteredUrl = this.currentUrl.split('?')[0] + // If the connection closed with 1000, it's a usual closure + const isAbnormal = event.code !== 1000 + + if (isAbnormal) { + this.incrementFailoverCounter(filteredUrl) + logger.warn( + `WebSocket closed abnormally (code: ${event.code}, reason: ${event.reason?.toString() || 'none'}). ` + + `Failover counter incremented to ${this.streamHandlerInvocationsWithNoConnection}. ` + + `URL: ${filteredUrl}`, + ) + } else { + logger.debug(`WebSocket closed normally (code: ${event.code}). URL: ${filteredUrl}`) + } + + metrics.get('wsConnectionActive').dec() metrics.get('wsConnectionClosures').inc({ code: event.code, url: filteredUrl, @@ -413,17 +429,13 @@ export class WebSocketTransport< // WS_SUBSCRIPTION_UNRESPONSIVE_TTL. There is interplay with WS_SUBSCRIPTION_TTL // to determine minimum TTL of an open connection given no explicit connection errors. if (connectionUnresponsive) { - this.streamHandlerInvocationsWithNoConnection += 1 - logger.info( - `The connection is unresponsive (last message ${timeSinceLastMessage}ms ago), incremented failover counter to ${this.streamHandlerInvocationsWithNoConnection}`, - ) // Filter out query params from the URL to avoid leaking sensitive data // and prevent metric cardinality explosion const filteredUrl = this.currentUrl.split('?')[0] - metrics - .get('wsConnectionFailoverCount') - .labels({ transport_name: this.name, url: filteredUrl }) - .set(this.streamHandlerInvocationsWithNoConnection) + this.incrementFailoverCounter(filteredUrl) + logger.info( + `The connection is unresponsive (last message ${timeSinceLastMessage}ms ago), incremented failover counter to ${this.streamHandlerInvocationsWithNoConnection}`, + ) } // We want to check if the URL we calculate is different from the one currently connected. diff --git a/test/transports/websocket.test.ts b/test/transports/websocket.test.ts index 529d8c5d..e8b53cd9 100644 --- a/test/transports/websocket.test.ts +++ b/test/transports/websocket.test.ts @@ -1312,3 +1312,402 @@ test.serial('does not heartbeat when handler throws an error', async (t) => { mockWsServer.close() await t.context.clock.runToLastAsync() }) + +test.serial( + 'increments failover counter on abnormal closure and passes it to url function', + async (t) => { + const base = 'ETH' + const quote = 'DOGE' + process.env['METRICS_ENABLED'] = 'true' + eaMetrics.clear() + + const counterValues: number[] = [] + + mockWebSocketProvider(WebSocketClassProvider) + const mockWsServer = new Server(ENDPOINT_URL, { mock: false }) + mockWsServer.on('connection', (socket) => { + socket.on('message', () => { + socket.close({ code: 4000, reason: 'Simulated abnormal closure', wasClean: false }) + }) + }) + + const transport = new WebSocketTransport({ + url: (_context, _desiredSubs, params) => { + counterValues.push(params.streamHandlerInvocationsWithNoConnection) + return ENDPOINT_URL + }, + handlers: { + message(message) { + if (!message.pair) { + return [] + } + const [curBase, curQuote] = message.pair.split('/') + return [ + { + params: { base: curBase, quote: curQuote }, + response: { + data: { + result: message.value, + }, + result: message.value, + }, + }, + ] + }, + }, + builders: { + subscribeMessage: (params) => ({ + request: 'subscribe', + pair: `${params.base}/${params.quote}`, + }), + unsubscribeMessage: (params) => ({ + request: 'unsubscribe', + pair: `${params.base}/${params.quote}`, + }), + }, + }) + + const webSocketEndpoint = new AdapterEndpoint({ + name: 'TEST', + transport: transport, + inputParameters, + }) + + const config = new AdapterConfig( + {}, + { + envDefaultOverrides: { + BACKGROUND_EXECUTE_MS_WS, + WS_SUBSCRIPTION_UNRESPONSIVE_TTL: 180_000, + }, + }, + ) + + const adapter = new Adapter({ + name: 'TEST', + defaultEndpoint: 'test', + config, + endpoints: [webSocketEndpoint], + }) + + const testAdapter = await TestAdapter.startWithMockedCache(adapter, t.context) + + await testAdapter.request({ base, quote }) + + // Each cycle: connect -> subscribe -> server closes with 4000 -> counter increments + await runAllUntilTime(t.context.clock, BACKGROUND_EXECUTE_MS_WS * 5 + 100) + + // The counter should be strictly increasing due to abnormal close incrementing it + t.true(counterValues.length >= 3, `Expected at least 3 url calls, got ${counterValues.length}`) + + for (let i = 1; i < counterValues.length; i++) { + t.true( + counterValues[i] > counterValues[i - 1], + `Counter should increase: index ${i} (${counterValues[i]}) should be > index ${i - 1} (${counterValues[i - 1]})`, + ) + } + + process.env['METRICS_ENABLED'] = 'false' + await testAdapter.api.close() + mockWsServer.close() + await t.context.clock.runToLastAsync() + }, +) + +test.serial('cycles between primary and secondary URLs on abnormal closure', async (t) => { + const base = 'ETH' + const quote = 'DOGE' + + const PRIMARY_URL = 'wss://primary.test.com/ws' + const SECONDARY_URL = 'wss://secondary.test.com/ws' + const urlsConnected: string[] = [] + + mockWebSocketProvider(WebSocketClassProvider) + + const mockPrimary = new Server(PRIMARY_URL, { mock: false }) + mockPrimary.on('connection', (socket) => { + socket.on('message', () => { + socket.close({ code: 4000, reason: 'Primary abnormal close', wasClean: false }) + }) + }) + + const mockSecondary = new Server(SECONDARY_URL, { mock: false }) + mockSecondary.on('connection', (socket) => { + socket.on('message', () => { + socket.close({ code: 4001, reason: 'Secondary abnormal close', wasClean: false }) + }) + }) + + // This inline URL function is a minimal stand-in for any adapter that uses + // the counter to alternate between a primary and secondary URL (e.g. Tiingo's + // wsSelectUrl with a 1:1 ratio). The framework test cannot import wsSelectUrl + // directly since ea-framework-js has no dependency on external-adapters-js. + // The production scenario using the actual wsSelectUrl is covered in: + // packages/sources/tiingo/test/integration/adapter-ws-reconnect.test.ts + const transport = new WebSocketTransport({ + url: (_context, _desiredSubs, params) => { + const counter = params.streamHandlerInvocationsWithNoConnection + const zeroIndexed = counter - 1 + const cyclePos = zeroIndexed % 2 + const url = cyclePos < 1 ? PRIMARY_URL : SECONDARY_URL + urlsConnected.push(url) + return url + }, + handlers: { + message(message) { + if (!message.pair) { + return [] + } + const [curBase, curQuote] = message.pair.split('/') + return [ + { + params: { base: curBase, quote: curQuote }, + response: { + data: { result: message.value }, + result: message.value, + }, + }, + ] + }, + }, + builders: { + subscribeMessage: (params) => ({ + request: 'subscribe', + pair: `${params.base}/${params.quote}`, + }), + unsubscribeMessage: (params) => ({ + request: 'unsubscribe', + pair: `${params.base}/${params.quote}`, + }), + }, + }) + + const webSocketEndpoint = new AdapterEndpoint({ + name: 'TEST', + transport: transport, + inputParameters, + }) + + const config = new AdapterConfig( + {}, + { + envDefaultOverrides: { + BACKGROUND_EXECUTE_MS_WS, + WS_SUBSCRIPTION_UNRESPONSIVE_TTL: 180_000, + }, + }, + ) + + const adapter = new Adapter({ + name: 'TEST', + defaultEndpoint: 'test', + config, + endpoints: [webSocketEndpoint], + }) + + const testAdapter = await TestAdapter.startWithMockedCache(adapter, t.context) + + await testAdapter.request({ base, quote }) + + // Run through enough cycles to see URL cycling: + // counter=0 -> primary, counter=1 -> primary, counter=2 -> secondary, + // counter=3 -> primary, counter=4 -> secondary, ... + await runAllUntilTime(t.context.clock, BACKGROUND_EXECUTE_MS_WS * 6 + 100) + + const primaryCount = urlsConnected.filter((u) => u === PRIMARY_URL).length + const secondaryCount = urlsConnected.filter((u) => u === SECONDARY_URL).length + + t.true(primaryCount >= 2, `Expected at least 2 primary connections, got ${primaryCount}`) + t.true(secondaryCount >= 1, `Expected at least 1 secondary connection, got ${secondaryCount}`) + + // After hitting secondary, it should cycle back to primary + const firstSecondaryIndex = urlsConnected.indexOf(SECONDARY_URL) + t.true(firstSecondaryIndex >= 0, 'Should have connected to secondary') + t.true( + firstSecondaryIndex < urlsConnected.length - 1, + 'Secondary should not be the last connection (should have returned to primary)', + ) + t.is( + urlsConnected[firstSecondaryIndex + 1], + PRIMARY_URL, + 'After secondary, should reconnect to primary', + ) + + await testAdapter.api.close() + mockPrimary.close() + mockSecondary.close() + await t.context.clock.runToLastAsync() +}) + +test.serial('does not increment failover counter on normal closure (code 1000)', async (t) => { + const base = 'ETH' + const quote = 'DOGE' + + const counterValues: number[] = [] + + mockWebSocketProvider(WebSocketClassProvider) + const mockWsServer = new Server(ENDPOINT_URL, { mock: false }) + mockWsServer.on('connection', (socket) => { + socket.on('message', () => { + // Normal closure -- framework should NOT increment the failover counter + socket.close() + }) + }) + + const transport = new WebSocketTransport({ + url: (_context, _desiredSubs, params) => { + counterValues.push(params.streamHandlerInvocationsWithNoConnection) + return ENDPOINT_URL + }, + handlers: { + message(message) { + if (!message.pair) { + return [] + } + const [curBase, curQuote] = message.pair.split('/') + return [ + { + params: { base: curBase, quote: curQuote }, + response: { + data: { result: message.value }, + result: message.value, + }, + }, + ] + }, + }, + builders: { + subscribeMessage: (params) => ({ + request: 'subscribe', + pair: `${params.base}/${params.quote}`, + }), + unsubscribeMessage: (params) => ({ + request: 'unsubscribe', + pair: `${params.base}/${params.quote}`, + }), + }, + }) + + const webSocketEndpoint = new AdapterEndpoint({ + name: 'TEST', + transport: transport, + inputParameters, + }) + + const config = new AdapterConfig( + {}, + { + envDefaultOverrides: { + BACKGROUND_EXECUTE_MS_WS, + WS_SUBSCRIPTION_UNRESPONSIVE_TTL: 180_000, + }, + }, + ) + + const adapter = new Adapter({ + name: 'TEST', + defaultEndpoint: 'test', + config, + endpoints: [webSocketEndpoint], + }) + + const testAdapter = await TestAdapter.startWithMockedCache(adapter, t.context) + + await testAdapter.request({ base, quote }) + + // Run through several background execute cycles where the server closes normally each time + await runAllUntilTime(t.context.clock, BACKGROUND_EXECUTE_MS_WS * 5 + 100) + + t.true(counterValues.length >= 3, `Expected at least 3 url calls, got ${counterValues.length}`) + + // Counter should remain 0 throughout -- normal closes must not increment it + for (const value of counterValues) { + t.is(value, 0, `Counter should stay at 0 after normal close, got ${value}`) + } + + await testAdapter.api.close() + mockWsServer.close() + await t.context.clock.runToLastAsync() +}) + +test.serial('sets ws_connection_failover_count metric on abnormal closure', async (t) => { + const base = 'ETH' + const quote = 'DOGE' + process.env['METRICS_ENABLED'] = 'true' + eaMetrics.clear() + + mockWebSocketProvider(WebSocketClassProvider) + const mockWsServer = new Server(ENDPOINT_URL, { mock: false }) + mockWsServer.on('connection', (socket) => { + socket.on('message', () => { + socket.close({ code: 4000, reason: 'Abnormal', wasClean: false }) + }) + }) + + const adapter = createAdapter({ + WS_SUBSCRIPTION_UNRESPONSIVE_TTL: 180_000, + }) + + const testAdapter = await TestAdapter.startWithMockedCache(adapter, t.context) + + await testAdapter.request({ base, quote }) + + // One background execute cycle: connect -> subscribe -> close 4000 -> counter=1 + await runAllUntilTime(t.context.clock, BACKGROUND_EXECUTE_MS_WS + 100) + + const metrics = await testAdapter.getMetrics() + metrics.assert(t, { + name: 'ws_connection_failover_count', + labels: { transport_name: 'default_single_transport', url: 'wss://test-ws.com/asd' }, + expectedValue: 1, + }) + + process.env['METRICS_ENABLED'] = 'false' + await testAdapter.api.close() + mockWsServer.close() + await t.context.clock.runToLastAsync() +}) + +test.serial( + 'does not set ws_connection_failover_count metric on normal closure (code 1000)', + async (t) => { + const base = 'ETH' + const quote = 'DOGE' + process.env['METRICS_ENABLED'] = 'true' + eaMetrics.clear() + + mockWebSocketProvider(WebSocketClassProvider) + const mockWsServer = new Server(ENDPOINT_URL, { mock: false }) + mockWsServer.on('connection', (socket) => { + socket.on('message', () => { + // Normal closure -- metric must not be emitted + socket.close() + }) + }) + + const adapter = createAdapter({ + WS_SUBSCRIPTION_UNRESPONSIVE_TTL: 180_000, + }) + + const testAdapter = await TestAdapter.startWithMockedCache(adapter, t.context) + + await testAdapter.request({ base, quote }) + + await runAllUntilTime(t.context.clock, BACKGROUND_EXECUTE_MS_WS + 100) + + const metrics = await testAdapter.getMetrics() + // The failover count metric must not appear in the Prometheus output at all + const metricPresent = [...metrics.map.keys()].some((k) => + k.startsWith('ws_connection_failover_count'), + ) + t.false( + metricPresent, + 'ws_connection_failover_count should not be emitted after a normal close', + ) + + process.env['METRICS_ENABLED'] = 'false' + await testAdapter.api.close() + mockWsServer.close() + await t.context.clock.runToLastAsync() + }, +)