Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 31 additions & 19 deletions src/transports/websocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,18 @@ export class WebSocketTransport<
return !this.wsConnection || this.wsConnection.readyState === WebSocket.CLOSED
}

/**
* Increments the failover counter and updates the wsConnectionFailoverCount metric.
* Both operations must always happen together, so they are encapsulated here.
*/
private incrementFailoverCounter(filteredUrl: string): void {
this.streamHandlerInvocationsWithNoConnection += 1
metrics
.get('wsConnectionFailoverCount')
.labels({ transport_name: this.name, url: filteredUrl })
.set(this.streamHandlerInvocationsWithNoConnection)
}

serializeMessage(payload: unknown): string {
return typeof payload === 'string' ? payload : JSON.stringify(payload)
}
Expand Down Expand Up @@ -289,20 +301,24 @@ export class WebSocketTransport<

// Called when the WS connection closes for any reason
close: (event: WebSocket.CloseEvent) => {
// If the connection closed with 1000, it's a usual closure
const level = event.code === 1000 ? 'debug' : 'info'
logger[level](
`Closed websocket connection. Code: ${event.code} ; reason: ${event.reason?.toString()}`,
)

// Record active ws connections by decrementing count on close
// Using URL in label since connection_key is removed from v3
metrics.get('wsConnectionActive').dec()

// Also, register that the connection was closed and the reason why.
// We need to filter out query params from the URL to avoid having
// the cardinality of the metric go out of control.
const filteredUrl = this.currentUrl.split('?')[0]
// If the connection closed with 1000, it's a usual closure
const isAbnormal = event.code !== 1000

if (isAbnormal) {
this.incrementFailoverCounter(filteredUrl)
logger.warn(
`WebSocket closed abnormally (code: ${event.code}, reason: ${event.reason?.toString() || 'none'}). ` +
`Failover counter incremented to ${this.streamHandlerInvocationsWithNoConnection}. ` +
`URL: ${filteredUrl}`,
)
} else {
logger.debug(`WebSocket closed normally (code: ${event.code}). URL: ${filteredUrl}`)
}

metrics.get('wsConnectionActive').dec()
metrics.get('wsConnectionClosures').inc({
code: event.code,
url: filteredUrl,
Expand Down Expand Up @@ -413,17 +429,13 @@ export class WebSocketTransport<
// WS_SUBSCRIPTION_UNRESPONSIVE_TTL. There is interplay with WS_SUBSCRIPTION_TTL
// to determine minimum TTL of an open connection given no explicit connection errors.
if (connectionUnresponsive) {
this.streamHandlerInvocationsWithNoConnection += 1
logger.info(
`The connection is unresponsive (last message ${timeSinceLastMessage}ms ago), incremented failover counter to ${this.streamHandlerInvocationsWithNoConnection}`,
)
// Filter out query params from the URL to avoid leaking sensitive data
// and prevent metric cardinality explosion
const filteredUrl = this.currentUrl.split('?')[0]
metrics
.get('wsConnectionFailoverCount')
.labels({ transport_name: this.name, url: filteredUrl })
.set(this.streamHandlerInvocationsWithNoConnection)
this.incrementFailoverCounter(filteredUrl)
logger.info(
`The connection is unresponsive (last message ${timeSinceLastMessage}ms ago), incremented failover counter to ${this.streamHandlerInvocationsWithNoConnection}`,
)
}

// We want to check if the URL we calculate is different from the one currently connected.
Expand Down
Loading
Loading