Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/smooth-wolves-mix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'posthog-js': minor
---

feat: introduce a task queue that will yield to the main thread periodically reducing the impact of long operations
2 changes: 1 addition & 1 deletion packages/browser/jest.config.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
module.exports = {
testPathIgnorePatterns: ['/node_modules/', '/cypress/', '/react/', '/test_data/', '/testcafe/'],
testPathIgnorePatterns: ['/node_modules/', '/cypress/', '/react/', '/test_data/', '/testcafe/', '\\.d\\.ts$'],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't look right, you're escaping the slash rather than the dot?

Shouldn't it be \.d'\.ts$?

moduleFileExtensions: ['js', 'json', 'ts', 'tsx'],
setupFilesAfterEnv: ['./src/__tests__/setup.js'],
modulePathIgnorePatterns: ['src/__tests__/setup.js', 'src/__tests__/helpers/'],
Expand Down
28 changes: 14 additions & 14 deletions packages/browser/src/__tests__/retry-queue.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ describe('RetryQueue', () => {
jest.spyOn(assignableWindow.console, 'warn').mockImplementation()
})

const fastForwardTimeAndRunTimer = (time = 3500) => {
const fastForwardTimeAndRunTimer = async (time = 3500) => {
now += time
jest.setSystemTime(now)
jest.runOnlyPendingTimers()
await jest.runOnlyPendingTimersAsync()
}

const enqueueRequests = () => {
Expand Down Expand Up @@ -55,7 +55,7 @@ describe('RetryQueue', () => {
mockPosthog._send_request.mockClear()
}

it('processes retry requests', () => {
it('processes retry requests', async () => {
enqueueRequests()

expect(retryQueue.length).toEqual(4)
Expand Down Expand Up @@ -95,7 +95,7 @@ describe('RetryQueue', () => {
])

// Fast forward enough time to clear the jitter
fastForwardTimeAndRunTimer(3500)
await fastForwardTimeAndRunTimer(3500)

// clears queue
expect(retryQueue.length).toEqual(0)
Expand All @@ -109,9 +109,9 @@ describe('RetryQueue', () => {
])
})

it('adds the retry_count to the url', () => {
it('adds the retry_count to the url', async () => {
enqueueRequests()
fastForwardTimeAndRunTimer(3500)
await fastForwardTimeAndRunTimer(3500)

expect(mockPosthog._send_request.mock.calls.map(([arg1]) => arg1.url)).toEqual([
'/e?retry_count=1',
Expand All @@ -136,12 +136,12 @@ describe('RetryQueue', () => {
])
})

it('enqueues requests when offline and flushes immediately when online again', () => {
it('enqueues requests when offline and flushes immediately when online again', async () => {
retryQueue['_areWeOnline'] = false
expect(retryQueue['_areWeOnline']).toEqual(false)

enqueueRequests()
fastForwardTimeAndRunTimer()
await fastForwardTimeAndRunTimer()

// requests aren't attempted when we're offline
expect(mockPosthog._send_request).toHaveBeenCalledTimes(0)
Expand All @@ -166,7 +166,7 @@ describe('RetryQueue', () => {
expect(retryQueue.length).toEqual(0)
})

it('only calls the callback when successful', () => {
it('only calls the callback when successful', async () => {
const cb = jest.fn()
mockPosthog._send_request.mockImplementation(({ callback }) => {
callback?.({ statusCode: 500 })
Expand All @@ -182,7 +182,7 @@ describe('RetryQueue', () => {
callback?.({ statusCode: 200, text: 'it worked!' })
})

fastForwardTimeAndRunTimer()
await fastForwardTimeAndRunTimer()

expect(retryQueue.length).toEqual(0)
expect(cb).toHaveBeenCalledTimes(1)
Expand Down Expand Up @@ -254,23 +254,23 @@ describe('RetryQueue', () => {
})

describe('memory management', () => {
it('stops polling when queue becomes empty', () => {
it('stops polling when queue becomes empty', async () => {
enqueueRequests()

expect(retryQueue['_isPolling']).toBe(true)
expect(retryQueue['_poller']).toBeDefined()
expect(retryQueue.length).toEqual(4)

fastForwardTimeAndRunTimer(3500)
await fastForwardTimeAndRunTimer(3500)

expect(retryQueue.length).toEqual(0)
expect(retryQueue['_isPolling']).toBe(false)
expect(retryQueue['_poller']).toBeUndefined()
})

it('restarts polling when items are added after stopping', () => {
it('restarts polling when items are added after stopping', async () => {
enqueueRequests()
fastForwardTimeAndRunTimer(3500)
await fastForwardTimeAndRunTimer(3500)

expect(retryQueue['_isPolling']).toBe(false)
expect(retryQueue['_poller']).toBeUndefined()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import { assignableWindow, LazyLoadedSessionRecordingInterface, window, document
import { addEventListener } from '../../../utils'
import { MutationThrottler } from './mutation-throttler'
import { createLogger } from '../../../utils/logger'
import { processWithYield } from '../../../utils/task-queue'
import {
clampToRange,
includes,
Expand Down Expand Up @@ -1068,17 +1069,21 @@ export class LazyLoadedSessionRecording implements LazyLoadedSessionRecordingInt

if (this._buffer.data.length > 0) {
const snapshotEvents = splitBuffer(this._buffer)
snapshotEvents.forEach((snapshotBuffer) => {
this._flushedSizeTracker?.trackSize(snapshotBuffer.size)
this._captureSnapshot({
$snapshot_bytes: snapshotBuffer.size,
$snapshot_data: snapshotBuffer.data,
$session_id: snapshotBuffer.sessionId,
$window_id: snapshotBuffer.windowId,
$lib: 'web',
$lib_version: Config.LIB_VERSION,
})
})
void processWithYield(
snapshotEvents,
(snapshotBuffer) => {
this._flushedSizeTracker?.trackSize(snapshotBuffer.size)
this._captureSnapshot({
$snapshot_bytes: snapshotBuffer.size,
$snapshot_data: snapshotBuffer.data,
$session_id: snapshotBuffer.sessionId,
$window_id: snapshotBuffer.windowId,
$lib: 'web',
$lib_version: Config.LIB_VERSION,
})
},
{ timeBudgetMs: 30 }
)
}

// buffer is empty, we clear it in case the session id has changed
Expand Down
18 changes: 13 additions & 5 deletions packages/browser/src/extensions/replay/external/network-plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import { formDataToQuery } from '../../../utils/request-utils'
import { patch } from '../rrweb-plugins/patch'
import { isHostOnDenyList } from '../../../extensions/replay/external/denylist'
import { defaultNetworkOptions } from './config'
import { processWithYield } from '../../../utils/task-queue'

const logger = createLogger('[Recorder]')

Expand Down Expand Up @@ -61,11 +62,18 @@ function initPerformanceObserver(cb: networkCallback, win: IWindow, options: Req
isNavigationTiming(entry) ||
(isResourceTiming(entry) && options.initiatorTypes.includes(entry.initiatorType as InitiatorType))
)
cb({
requests: initialPerformanceEntries.flatMap((entry) =>
prepareRequest({ entry, method: undefined, status: undefined, networkRequest: {}, isInitial: true })
),
isInitial: true,

// Process initial performance entries with yielding for large sets
processWithYield(
initialPerformanceEntries,
(entry) =>
prepareRequest({ entry, method: undefined, status: undefined, networkRequest: {}, isInitial: true }),
{ timeBudgetMs: 30 }
).then((requests) => {
cb({
requests: requests.flat(),
isInitial: true,
})
})
}
const observer = new win.PerformanceObserver((entries) => {
Expand Down
74 changes: 29 additions & 45 deletions packages/browser/src/posthog-core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {
SURVEYS_REQUEST_TIMEOUT_MS,
USER_STATE,
} from './constants'
import { TaskQueue } from './utils/task-queue'
import { DeadClicksAutocapture, isDeadClicksEnabledForAutocapture } from './extensions/dead-clicks-autocapture'
import { ExceptionObserver } from './extensions/exception-autocapture'
import { HistoryAutocapture } from './extensions/history-autocapture'
Expand Down Expand Up @@ -663,10 +664,6 @@ export class PostHog {
}

private _initExtensions(startInCookielessMode: boolean): void {
// we don't support IE11 anymore, so performance.now is safe
// eslint-disable-next-line compat/compat
const initStartTime = performance.now()

this.historyAutocapture = new HistoryAutocapture(this)
this.historyAutocapture.startIfEnabled()

Expand Down Expand Up @@ -733,52 +730,39 @@ export class PostHog {
})

// Process tasks with time-slicing to avoid blocking
this._processInitTaskQueue(initTasks, initStartTime)
}

private _processInitTaskQueue(queue: Array<() => void>, initStartTime: number): void {
const TIME_BUDGET_MS = 30 // Respect frame budget (~60fps = 16ms, but we're already deferred)

while (queue.length > 0) {
// Only time-slice if deferred init is enabled, otherwise run synchronously
if (this.config.__preview_deferred_init_extensions) {
// we don't support IE11 anymore, so performance.now is safe
// eslint-disable-next-line compat/compat
const elapsed = performance.now() - initStartTime

// Check if we've exceeded our time budget
if (elapsed >= TIME_BUDGET_MS && queue.length > 0) {
// Yield to browser, then continue processing
setTimeout(() => {
this._processInitTaskQueue(queue, initStartTime)
}, 0)
return
}
}

// Process next task
const task = queue.shift()
if (task) {
// Only use time-slicing if deferred init is enabled, otherwise process synchronously
if (this.config.__preview_deferred_init_extensions) {
const taskQueue = new TaskQueue({
timeBudgetMs: 30,
onComplete: (totalTimeMs) => {
this.register_for_session({
$sdk_debug_extensions_init_method: 'deferred',
$sdk_debug_extensions_init_time_ms: totalTimeMs,
})
logger.info(`PostHog extensions initialized (${totalTimeMs}ms)`)
},
onError: (error) => {
logger.error('Error initializing extension:', error)
},
})
taskQueue.enqueueAll(initTasks)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we not need to call a method to actually start the processing?

} else {
// we don't support IE11 anymore, so performance.now is safe
// eslint-disable-next-line compat/compat
const startTime = performance.now()
initTasks.forEach((task) => {
try {
task()
} catch (error) {
logger.error('Error initializing extension:', error)
}
}
}

// All tasks complete - record timing for both sync and deferred modes
// we don't support IE11 anymore, so performance.now is safe
// eslint-disable-next-line compat/compat
const taskInitTiming = Math.round(performance.now() - initStartTime)
this.register_for_session({
$sdk_debug_extensions_init_method: this.config.__preview_deferred_init_extensions
? 'deferred'
: 'synchronous',
$sdk_debug_extensions_init_time_ms: taskInitTiming,
})
if (this.config.__preview_deferred_init_extensions) {
logger.info(`PostHog extensions initialized (${taskInitTiming}ms)`)
})
// eslint-disable-next-line compat/compat
const totalTimeMs = Math.round(performance.now() - startTime)
this.register_for_session({
$sdk_debug_extensions_init_method: 'synchronous',
$sdk_debug_extensions_init_time_ms: totalTimeMs,
})
}
}

Expand Down
30 changes: 18 additions & 12 deletions packages/browser/src/request-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { each } from './utils'

import { isArray, isUndefined, clampToRange } from '@posthog/core'
import { logger } from './utils/logger'
import { processWithYield } from './utils/task-queue'

export const DEFAULT_FLUSH_INTERVAL_MS = 3000

Expand Down Expand Up @@ -57,22 +58,27 @@ export class RequestQueue {
if (this._isPaused) {
return
}
this._flushTimeout = setTimeout(() => {
this._flushTimeout = setTimeout(async () => {
this._clearFlushTimeout()
if (this._queue.length > 0) {
const requests = this._formatQueue()
for (const key in requests) {
const req = requests[key]
const now = new Date().getTime()
const requestEntries = Object.entries(requests)
const now = new Date().getTime()

if (req.data && isArray(req.data)) {
each(req.data, (data) => {
data['offset'] = Math.abs(data['timestamp'] - now)
delete data['timestamp']
})
}
this._sendRequest(req)
}
// Process timestamp updates with yielding for large batches
await processWithYield(
requestEntries,
([, req]) => {
if (req.data && isArray(req.data)) {
each(req.data, (data) => {
data['offset'] = Math.abs(data['timestamp'] - now)
delete data['timestamp']
})
}
this._sendRequest(req)
},
{ timeBudgetMs: 30 }
)
}
}, this._flushTimeoutMs)
}
Expand Down
13 changes: 7 additions & 6 deletions packages/browser/src/retry-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { window } from './utils/globals'
import { PostHog } from './posthog-core'
import { extendURLParams } from './request'
import { addEventListener } from './utils'
import { processWithYield } from './utils/task-queue'

const thirtyMinutes = 30 * 60 * 1000

Expand Down Expand Up @@ -120,15 +121,15 @@ export class RetryQueue {
return
}

this._poller = setTimeout(() => {
this._poller = setTimeout(async () => {
if (this._areWeOnline && this._queue.length > 0) {
this._flush()
await this._flush()
}
this._poll()
}, this._pollIntervalMs) as any as number
}

private _flush(): void {
private async _flush(): Promise<void> {
const now = Date.now()
const notToFlush: RetryQueueElement[] = []
const toFlush = this._queue.filter((item) => {
Expand All @@ -142,9 +143,9 @@ export class RetryQueue {
this._queue = notToFlush

if (toFlush.length > 0) {
for (const { requestOptions } of toFlush) {
this.retriableRequest(requestOptions)
}
await processWithYield(toFlush, ({ requestOptions }) => this.retriableRequest(requestOptions), {
timeBudgetMs: 30,
})
}
}

Expand Down
Loading
Loading