Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion packages/node/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,12 @@
"module": "dist/entrypoints/index.node.mjs",
"types": "dist/entrypoints/index.node.d.ts",
"dependencies": {
"@posthog/core": "workspace:*"
"@posthog/core": "workspace:*",
"eventsource": "^2.0.2"
},
"devDependencies": {
"@types/node": "^20.0.0",
"@types/eventsource": "^1.1.15",
"@posthog-tooling/tsconfig-base": "workspace:*",
"jest": "catalog:",
"@rslib/core": "catalog:"
Expand Down
1 change: 1 addition & 0 deletions packages/node/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ export abstract class PostHogBackendClient extends PostHogCoreStateless implemen
this._events.emit('localEvaluationFlagsLoaded', count)
},
customHeaders: this.getCustomHeaders(),
realtimeFlags: options.realtimeFlags ?? false,
})
}
}
Expand Down
96 changes: 96 additions & 0 deletions packages/node/src/extensions/feature-flags/feature-flags.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { FeatureFlagCondition, FlagProperty, FlagPropertyValue, PostHogFeatureFl
import type { FeatureFlagValue, JsonType, PostHogFetchOptions, PostHogFetchResponse } from '@posthog/core'
import { safeSetTimeout } from '@posthog/core'
import { hashSHA1 } from './crypto'
import EventSource from 'eventsource'

const SIXTY_SECONDS = 60 * 1000

Expand Down Expand Up @@ -53,6 +54,7 @@ type FeatureFlagsPollerOptions = {
onError?: (error: Error) => void
onLoad?: (count: number) => void
customHeaders?: { [key: string]: string }
realtimeFlags?: boolean
}

class FeatureFlagsPoller {
Expand All @@ -74,6 +76,9 @@ class FeatureFlagsPoller {
shouldBeginExponentialBackoff: boolean = false
backOffCount: number = 0
onLoad?: (count: number) => void
eventSource?: EventSource
sseConnected: boolean = false
realtimeFlags: boolean = false

constructor({
pollingInterval,
Expand All @@ -98,6 +103,7 @@ class FeatureFlagsPoller {
this.fetch = options.fetch || fetch
this.onError = options.onError
this.customHeaders = customHeaders
this.realtimeFlags = options.realtimeFlags ?? false
this.onLoad = options.onLoad
void this.loadFeatureFlags()
}
Expand Down Expand Up @@ -648,6 +654,11 @@ class FeatureFlagsPoller {
this.shouldBeginExponentialBackoff = false
this.backOffCount = 0
this.onLoad?.(this.featureFlags.length)

// Set up SSE connection after initial flags are loaded successfully
if (this.realtimeFlags && !this.sseConnected) {
this._setupSSEConnection()
}
break
}

Expand Down Expand Up @@ -696,8 +707,93 @@ class FeatureFlagsPoller {
}
}

private _setupSSEConnection(): void {
if (this.eventSource || this.sseConnected) {
// Already connected or connecting
return
}

if (!this.realtimeFlags) {
return
}

const token = this.projectApiKey
const url = `${this.host}/flags/definitions/stream?token=${encodeURIComponent(token)}`

try {
const eventSourceOptions: Record<string, any> = {
headers: {
...this.customHeaders,
Authorization: `Bearer ${this.personalApiKey}`,
},
}

this.eventSource = new EventSource(url, eventSourceOptions)

this.eventSource.onopen = () => {
this.sseConnected = true
}

this.eventSource.onmessage = (event) => {
try {
const flagData = JSON.parse(event.data)
if (flagData && typeof flagData === 'object') {
// Update flags from SSE message
this._processFlagUpdate(flagData)
}
} catch (error) {
this.onError?.(new Error(`Error parsing SSE message: ${error}`))
}
}

this.eventSource.onerror = (error) => {
this.onError?.(new Error(`SSE connection error: ${error}`))
this._closeSSEConnection()

// Attempt to reconnect after a delay
setTimeout(() => {
if (this.realtimeFlags && !this.sseConnected) {
this._setupSSEConnection()
}
}, 5000)
}
} catch (error) {
this.onError?.(new Error(`Failed to establish SSE connection: ${error}`))
}
}

private _closeSSEConnection(): void {
if (this.eventSource) {
this.eventSource.close()
this.eventSource = undefined
}
this.sseConnected = false
}

private _processFlagUpdate(flagData: { [key: string]: any }): void {
const flag = flagData as PostHogFeatureFlag

if (flag.deleted) {
// Remove the flag
delete this.featureFlagsByKey[flag.key]
this.featureFlags = this.featureFlags.filter((f) => f.key !== flag.key)
} else {
// Update or add the flag
this.featureFlagsByKey[flag.key] = flag

// Update in the array
const existingIndex = this.featureFlags.findIndex((f) => f.key === flag.key)
if (existingIndex >= 0) {
this.featureFlags[existingIndex] = flag
} else {
this.featureFlags.push(flag)
}
}
}

stopPoller(): void {
clearTimeout(this.poller)
this._closeSSEConnection()
}
}

Expand Down
10 changes: 10 additions & 0 deletions packages/node/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,16 @@ export type PostHogOptions = PostHogCoreOptions & {
* @default undefined
*/
evaluationEnvironments?: readonly string[]
/**
* Enable real-time feature flag updates via Server-Sent Events (SSE).
* When enabled, the SDK will establish a persistent connection to receive
* flag updates in real-time instead of relying solely on polling.
*
* Requires personalApiKey to be set for authentication.
*
* @default false
*/
realtimeFlags?: boolean
}

export type PostHogFeatureFlag = {
Expand Down
Loading
Loading