diff --git a/.changeset/smooth-wolves-mix.md b/.changeset/smooth-wolves-mix.md new file mode 100644 index 0000000000..cadc42f663 --- /dev/null +++ b/.changeset/smooth-wolves-mix.md @@ -0,0 +1,5 @@ +--- +'posthog-js': minor +--- + +feat: introduce a task queue that will yield to the main thread periodically reducing the impact of long operations diff --git a/packages/browser/jest.config.js b/packages/browser/jest.config.js index 886ba34c15..74df024fb3 100644 --- a/packages/browser/jest.config.js +++ b/packages/browser/jest.config.js @@ -1,5 +1,5 @@ module.exports = { - testPathIgnorePatterns: ['/node_modules/', '/cypress/', '/react/', '/test_data/', '/testcafe/'], + testPathIgnorePatterns: ['/node_modules/', '/cypress/', '/react/', '/test_data/', '/testcafe/', '\\.d\\.ts$'], moduleFileExtensions: ['js', 'json', 'ts', 'tsx'], setupFilesAfterEnv: ['./src/__tests__/setup.js'], modulePathIgnorePatterns: ['src/__tests__/setup.js', 'src/__tests__/helpers/'], diff --git a/packages/browser/src/__tests__/retry-queue.test.ts b/packages/browser/src/__tests__/retry-queue.test.ts index 22efb1dabe..4830d86d2a 100644 --- a/packages/browser/src/__tests__/retry-queue.test.ts +++ b/packages/browser/src/__tests__/retry-queue.test.ts @@ -18,10 +18,10 @@ describe('RetryQueue', () => { jest.spyOn(assignableWindow.console, 'warn').mockImplementation() }) - const fastForwardTimeAndRunTimer = (time = 3500) => { + const fastForwardTimeAndRunTimer = async (time = 3500) => { now += time jest.setSystemTime(now) - jest.runOnlyPendingTimers() + await jest.runOnlyPendingTimersAsync() } const enqueueRequests = () => { @@ -55,7 +55,7 @@ describe('RetryQueue', () => { mockPosthog._send_request.mockClear() } - it('processes retry requests', () => { + it('processes retry requests', async () => { enqueueRequests() expect(retryQueue.length).toEqual(4) @@ -95,7 +95,7 @@ describe('RetryQueue', () => { ]) // Fast forward enough time to clear the jitter - fastForwardTimeAndRunTimer(3500) + await fastForwardTimeAndRunTimer(3500) // clears queue expect(retryQueue.length).toEqual(0) @@ -109,9 +109,9 @@ describe('RetryQueue', () => { ]) }) - it('adds the retry_count to the url', () => { + it('adds the retry_count to the url', async () => { enqueueRequests() - fastForwardTimeAndRunTimer(3500) + await fastForwardTimeAndRunTimer(3500) expect(mockPosthog._send_request.mock.calls.map(([arg1]) => arg1.url)).toEqual([ '/e?retry_count=1', @@ -136,12 +136,12 @@ describe('RetryQueue', () => { ]) }) - it('enqueues requests when offline and flushes immediately when online again', () => { + it('enqueues requests when offline and flushes immediately when online again', async () => { retryQueue['_areWeOnline'] = false expect(retryQueue['_areWeOnline']).toEqual(false) enqueueRequests() - fastForwardTimeAndRunTimer() + await fastForwardTimeAndRunTimer() // requests aren't attempted when we're offline expect(mockPosthog._send_request).toHaveBeenCalledTimes(0) @@ -166,7 +166,7 @@ describe('RetryQueue', () => { expect(retryQueue.length).toEqual(0) }) - it('only calls the callback when successful', () => { + it('only calls the callback when successful', async () => { const cb = jest.fn() mockPosthog._send_request.mockImplementation(({ callback }) => { callback?.({ statusCode: 500 }) @@ -182,7 +182,7 @@ describe('RetryQueue', () => { callback?.({ statusCode: 200, text: 'it worked!' }) }) - fastForwardTimeAndRunTimer() + await fastForwardTimeAndRunTimer() expect(retryQueue.length).toEqual(0) expect(cb).toHaveBeenCalledTimes(1) @@ -254,23 +254,23 @@ describe('RetryQueue', () => { }) describe('memory management', () => { - it('stops polling when queue becomes empty', () => { + it('stops polling when queue becomes empty', async () => { enqueueRequests() expect(retryQueue['_isPolling']).toBe(true) expect(retryQueue['_poller']).toBeDefined() expect(retryQueue.length).toEqual(4) - fastForwardTimeAndRunTimer(3500) + await fastForwardTimeAndRunTimer(3500) expect(retryQueue.length).toEqual(0) expect(retryQueue['_isPolling']).toBe(false) expect(retryQueue['_poller']).toBeUndefined() }) - it('restarts polling when items are added after stopping', () => { + it('restarts polling when items are added after stopping', async () => { enqueueRequests() - fastForwardTimeAndRunTimer(3500) + await fastForwardTimeAndRunTimer(3500) expect(retryQueue['_isPolling']).toBe(false) expect(retryQueue['_poller']).toBeUndefined() diff --git a/packages/browser/src/extensions/replay/external/lazy-loaded-session-recorder.ts b/packages/browser/src/extensions/replay/external/lazy-loaded-session-recorder.ts index 6dd1df59ee..b4594d48ff 100644 --- a/packages/browser/src/extensions/replay/external/lazy-loaded-session-recorder.ts +++ b/packages/browser/src/extensions/replay/external/lazy-loaded-session-recorder.ts @@ -35,6 +35,7 @@ import { assignableWindow, LazyLoadedSessionRecordingInterface, window, document import { addEventListener } from '../../../utils' import { MutationThrottler } from './mutation-throttler' import { createLogger } from '../../../utils/logger' +import { processWithYield } from '../../../utils/task-queue' import { clampToRange, includes, @@ -1068,17 +1069,21 @@ export class LazyLoadedSessionRecording implements LazyLoadedSessionRecordingInt if (this._buffer.data.length > 0) { const snapshotEvents = splitBuffer(this._buffer) - snapshotEvents.forEach((snapshotBuffer) => { - this._flushedSizeTracker?.trackSize(snapshotBuffer.size) - this._captureSnapshot({ - $snapshot_bytes: snapshotBuffer.size, - $snapshot_data: snapshotBuffer.data, - $session_id: snapshotBuffer.sessionId, - $window_id: snapshotBuffer.windowId, - $lib: 'web', - $lib_version: Config.LIB_VERSION, - }) - }) + void processWithYield( + snapshotEvents, + (snapshotBuffer) => { + this._flushedSizeTracker?.trackSize(snapshotBuffer.size) + this._captureSnapshot({ + $snapshot_bytes: snapshotBuffer.size, + $snapshot_data: snapshotBuffer.data, + $session_id: snapshotBuffer.sessionId, + $window_id: snapshotBuffer.windowId, + $lib: 'web', + $lib_version: Config.LIB_VERSION, + }) + }, + { timeBudgetMs: 30 } + ) } // buffer is empty, we clear it in case the session id has changed diff --git a/packages/browser/src/extensions/replay/external/network-plugin.ts b/packages/browser/src/extensions/replay/external/network-plugin.ts index 58cdbb0d3a..f4f17db790 100644 --- a/packages/browser/src/extensions/replay/external/network-plugin.ts +++ b/packages/browser/src/extensions/replay/external/network-plugin.ts @@ -18,6 +18,7 @@ import { formDataToQuery } from '../../../utils/request-utils' import { patch } from '../rrweb-plugins/patch' import { isHostOnDenyList } from '../../../extensions/replay/external/denylist' import { defaultNetworkOptions } from './config' +import { processWithYield } from '../../../utils/task-queue' const logger = createLogger('[Recorder]') @@ -61,11 +62,18 @@ function initPerformanceObserver(cb: networkCallback, win: IWindow, options: Req isNavigationTiming(entry) || (isResourceTiming(entry) && options.initiatorTypes.includes(entry.initiatorType as InitiatorType)) ) - cb({ - requests: initialPerformanceEntries.flatMap((entry) => - prepareRequest({ entry, method: undefined, status: undefined, networkRequest: {}, isInitial: true }) - ), - isInitial: true, + + // Process initial performance entries with yielding for large sets + processWithYield( + initialPerformanceEntries, + (entry) => + prepareRequest({ entry, method: undefined, status: undefined, networkRequest: {}, isInitial: true }), + { timeBudgetMs: 30 } + ).then((requests) => { + cb({ + requests: requests.flat(), + isInitial: true, + }) }) } const observer = new win.PerformanceObserver((entries) => { diff --git a/packages/browser/src/posthog-core.ts b/packages/browser/src/posthog-core.ts index 6321626c1e..938603a4aa 100644 --- a/packages/browser/src/posthog-core.ts +++ b/packages/browser/src/posthog-core.ts @@ -11,6 +11,7 @@ import { SURVEYS_REQUEST_TIMEOUT_MS, USER_STATE, } from './constants' +import { TaskQueue } from './utils/task-queue' import { DeadClicksAutocapture, isDeadClicksEnabledForAutocapture } from './extensions/dead-clicks-autocapture' import { ExceptionObserver } from './extensions/exception-autocapture' import { HistoryAutocapture } from './extensions/history-autocapture' @@ -663,10 +664,6 @@ export class PostHog { } private _initExtensions(startInCookielessMode: boolean): void { - // we don't support IE11 anymore, so performance.now is safe - // eslint-disable-next-line compat/compat - const initStartTime = performance.now() - this.historyAutocapture = new HistoryAutocapture(this) this.historyAutocapture.startIfEnabled() @@ -733,52 +730,39 @@ export class PostHog { }) // Process tasks with time-slicing to avoid blocking - this._processInitTaskQueue(initTasks, initStartTime) - } - - private _processInitTaskQueue(queue: Array<() => void>, initStartTime: number): void { - const TIME_BUDGET_MS = 30 // Respect frame budget (~60fps = 16ms, but we're already deferred) - - while (queue.length > 0) { - // Only time-slice if deferred init is enabled, otherwise run synchronously - if (this.config.__preview_deferred_init_extensions) { - // we don't support IE11 anymore, so performance.now is safe - // eslint-disable-next-line compat/compat - const elapsed = performance.now() - initStartTime - - // Check if we've exceeded our time budget - if (elapsed >= TIME_BUDGET_MS && queue.length > 0) { - // Yield to browser, then continue processing - setTimeout(() => { - this._processInitTaskQueue(queue, initStartTime) - }, 0) - return - } - } - - // Process next task - const task = queue.shift() - if (task) { + // Only use time-slicing if deferred init is enabled, otherwise process synchronously + if (this.config.__preview_deferred_init_extensions) { + const taskQueue = new TaskQueue({ + timeBudgetMs: 30, + onComplete: (totalTimeMs) => { + this.register_for_session({ + $sdk_debug_extensions_init_method: 'deferred', + $sdk_debug_extensions_init_time_ms: totalTimeMs, + }) + logger.info(`PostHog extensions initialized (${totalTimeMs}ms)`) + }, + onError: (error) => { + logger.error('Error initializing extension:', error) + }, + }) + taskQueue.enqueueAll(initTasks) + } else { + // we don't support IE11 anymore, so performance.now is safe + // eslint-disable-next-line compat/compat + const startTime = performance.now() + initTasks.forEach((task) => { try { task() } catch (error) { logger.error('Error initializing extension:', error) } - } - } - - // All tasks complete - record timing for both sync and deferred modes - // we don't support IE11 anymore, so performance.now is safe - // eslint-disable-next-line compat/compat - const taskInitTiming = Math.round(performance.now() - initStartTime) - this.register_for_session({ - $sdk_debug_extensions_init_method: this.config.__preview_deferred_init_extensions - ? 'deferred' - : 'synchronous', - $sdk_debug_extensions_init_time_ms: taskInitTiming, - }) - if (this.config.__preview_deferred_init_extensions) { - logger.info(`PostHog extensions initialized (${taskInitTiming}ms)`) + }) + // eslint-disable-next-line compat/compat + const totalTimeMs = Math.round(performance.now() - startTime) + this.register_for_session({ + $sdk_debug_extensions_init_method: 'synchronous', + $sdk_debug_extensions_init_time_ms: totalTimeMs, + }) } } diff --git a/packages/browser/src/request-queue.ts b/packages/browser/src/request-queue.ts index b30acdd9a5..a30527590b 100644 --- a/packages/browser/src/request-queue.ts +++ b/packages/browser/src/request-queue.ts @@ -3,6 +3,7 @@ import { each } from './utils' import { isArray, isUndefined, clampToRange } from '@posthog/core' import { logger } from './utils/logger' +import { processWithYield } from './utils/task-queue' export const DEFAULT_FLUSH_INTERVAL_MS = 3000 @@ -57,22 +58,27 @@ export class RequestQueue { if (this._isPaused) { return } - this._flushTimeout = setTimeout(() => { + this._flushTimeout = setTimeout(async () => { this._clearFlushTimeout() if (this._queue.length > 0) { const requests = this._formatQueue() - for (const key in requests) { - const req = requests[key] - const now = new Date().getTime() + const requestEntries = Object.entries(requests) + const now = new Date().getTime() - if (req.data && isArray(req.data)) { - each(req.data, (data) => { - data['offset'] = Math.abs(data['timestamp'] - now) - delete data['timestamp'] - }) - } - this._sendRequest(req) - } + // Process timestamp updates with yielding for large batches + await processWithYield( + requestEntries, + ([, req]) => { + if (req.data && isArray(req.data)) { + each(req.data, (data) => { + data['offset'] = Math.abs(data['timestamp'] - now) + delete data['timestamp'] + }) + } + this._sendRequest(req) + }, + { timeBudgetMs: 30 } + ) } }, this._flushTimeoutMs) } diff --git a/packages/browser/src/retry-queue.ts b/packages/browser/src/retry-queue.ts index e86f008678..03cf7c3958 100644 --- a/packages/browser/src/retry-queue.ts +++ b/packages/browser/src/retry-queue.ts @@ -6,6 +6,7 @@ import { window } from './utils/globals' import { PostHog } from './posthog-core' import { extendURLParams } from './request' import { addEventListener } from './utils' +import { processWithYield } from './utils/task-queue' const thirtyMinutes = 30 * 60 * 1000 @@ -120,15 +121,15 @@ export class RetryQueue { return } - this._poller = setTimeout(() => { + this._poller = setTimeout(async () => { if (this._areWeOnline && this._queue.length > 0) { - this._flush() + await this._flush() } this._poll() }, this._pollIntervalMs) as any as number } - private _flush(): void { + private async _flush(): Promise { const now = Date.now() const notToFlush: RetryQueueElement[] = [] const toFlush = this._queue.filter((item) => { @@ -142,9 +143,9 @@ export class RetryQueue { this._queue = notToFlush if (toFlush.length > 0) { - for (const { requestOptions } of toFlush) { - this.retriableRequest(requestOptions) - } + await processWithYield(toFlush, ({ requestOptions }) => this.retriableRequest(requestOptions), { + timeBudgetMs: 30, + }) } } diff --git a/packages/browser/src/site-apps.ts b/packages/browser/src/site-apps.ts index 1a1dd9c413..949a259275 100644 --- a/packages/browser/src/site-apps.ts +++ b/packages/browser/src/site-apps.ts @@ -2,6 +2,7 @@ import { PostHog } from './posthog-core' import { CaptureResult, Properties, RemoteConfig, SiteApp, SiteAppGlobals, SiteAppLoader } from './types' import { assignableWindow } from './utils/globals' import { createLogger } from './utils/logger' +import { processWithYield } from './utils/task-queue' const logger = createLogger('[SiteApps]') @@ -18,13 +19,17 @@ export class SiteApps { } public get isEnabled(): boolean { - return !!this._instance.config.opt_in_site_apps + return this._instance.config.opt_in_site_apps } private _eventCollector(_eventName: string, eventPayload?: CaptureResult | undefined) { if (!eventPayload) { return } + if (eventPayload.event === '$snapshot') { + return + } + const globals = this.globalsForEvent(eventPayload) this._bufferedInvocations.push(globals) if (this._bufferedInvocations.length > 1000) { @@ -59,7 +64,8 @@ export class SiteApps { groups[type] = { id: groupIds[type], type, properties } } const { $set_once, $set, ..._event } = event - const globals = { + + return { event: { ..._event, properties: { @@ -80,7 +86,6 @@ export class SiteApps { }, groups, } - return globals } setupSiteApp(loader: SiteAppLoader) { @@ -88,8 +93,15 @@ export class SiteApps { const processBufferedEvents = () => { if (!app.errored && this._bufferedInvocations.length) { logger.info(`Processing ${this._bufferedInvocations.length} events for site app with id ${loader.id}`) - this._bufferedInvocations.forEach((globals) => app.processEvent?.(globals)) - app.processedBuffer = true + void processWithYield(this._bufferedInvocations, (globals) => app.processEvent?.(globals), { + timeBudgetMs: 30, + onComplete: () => { + app.processedBuffer = true + if (Object.values(this.apps).every((app) => app.processedBuffer || app.errored)) { + this._stopBuffering?.() + } + }, + }) } if (Object.values(this.apps).every((app) => app.processedBuffer || app.errored)) { @@ -157,6 +169,10 @@ export class SiteApps { return } + if (event.event === '$snapshot') { + return + } + const globals = this.globalsForEvent(event) for (const app of Object.values(this.apps)) { diff --git a/packages/browser/src/utils/__tests__/task-queue.test.ts b/packages/browser/src/utils/__tests__/task-queue.test.ts new file mode 100644 index 0000000000..a835cfa7bb --- /dev/null +++ b/packages/browser/src/utils/__tests__/task-queue.test.ts @@ -0,0 +1,366 @@ +import { TaskQueue, processWithYield, processAsyncWithYield } from '../task-queue' + +describe('TaskQueue', () => { + beforeEach(() => { + jest.useFakeTimers() + }) + + afterEach(() => { + jest.useRealTimers() + }) + + describe('basic functionality', () => { + it('processes tasks in FIFO order', () => { + const results: number[] = [] + const queue = new TaskQueue() + + queue.enqueue(() => results.push(1)) + queue.enqueue(() => results.push(2)) + queue.enqueue(() => results.push(3)) + + jest.runAllTimers() + + expect(results).toEqual([1, 2, 3]) + }) + + it('starts processing immediately when first task is enqueued', () => { + const results: number[] = [] + const queue = new TaskQueue() + + queue.enqueue(() => results.push(1)) + + // Should process synchronously without timers if within budget + expect(results).toEqual([1]) + }) + + it('reports pending task count', () => { + const queue = new TaskQueue() + + expect(queue.pending).toBe(0) + + queue.enqueue(() => {}) + queue.enqueue(() => {}) + + jest.runAllTimers() + + expect(queue.pending).toBe(0) + }) + + it('reports processing status', () => { + const queue = new TaskQueue() + + expect(queue.isProcessing).toBe(false) + + queue.enqueue(() => {}) + + jest.runAllTimers() + + expect(queue.isProcessing).toBe(false) + }) + + it('enqueues multiple tasks at once', () => { + const results: number[] = [] + const queue = new TaskQueue() + + queue.enqueueAll([() => results.push(1), () => results.push(2), () => results.push(3)]) + + jest.runAllTimers() + + expect(results).toEqual([1, 2, 3]) + }) + }) + + describe('time-slicing and yielding', () => { + it('processes all tasks eventually', () => { + const queue = new TaskQueue({ timeBudgetMs: 30 }) + const results: number[] = [] + + for (let i = 0; i < 10; i++) { + queue.enqueue(() => results.push(i)) + } + + jest.runAllTimers() + + expect(results).toEqual([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]) + }) + + it('processes synchronously when within budget', () => { + const queue = new TaskQueue({ timeBudgetMs: 1000 }) + const results: number[] = [] + + // Just a few tasks that won't exceed budget + for (let i = 0; i < 3; i++) { + queue.enqueue(() => results.push(i)) + } + + // All should process synchronously without needing to run timers + expect(results).toEqual([0, 1, 2]) + }) + }) + + describe('error handling', () => { + it('continues processing after task error', () => { + const results: number[] = [] + const errors: Error[] = [] + const queue = new TaskQueue({ + onError: (error) => errors.push(error), + }) + + queue.enqueue(() => results.push(1)) + queue.enqueue(() => { + throw new Error('Task failed') + }) + queue.enqueue(() => results.push(3)) + + jest.runAllTimers() + + expect(results).toEqual([1, 3]) + expect(errors).toHaveLength(1) + }) + + it('calls custom error handler when provided', () => { + const errors: Error[] = [] + const queue = new TaskQueue({ + onError: (error) => errors.push(error), + }) + + const expectedError = new Error('Custom error') + queue.enqueue(() => { + throw expectedError + }) + + jest.runAllTimers() + + expect(errors).toHaveLength(1) + expect(errors[0]).toBe(expectedError) + }) + + it('logs using SDK logger when no error handler provided', () => { + // The SDK logger will handle the error internally + // We just verify the queue continues processing + const results: number[] = [] + const queue = new TaskQueue() + + queue.enqueue(() => results.push(1)) + queue.enqueue(() => { + throw new Error('Test error') + }) + queue.enqueue(() => results.push(3)) + + jest.runAllTimers() + + // Should process tasks before and after the error + expect(results).toEqual([1, 3]) + }) + }) + + describe('completion callback', () => { + it('calls onComplete when all tasks finish', () => { + let completed = false + let reportedTime = 0 + const queue = new TaskQueue({ + onComplete: (timeMs) => { + completed = true + reportedTime = timeMs + }, + }) + + queue.enqueueAll([() => {}, () => {}, () => {}]) + + jest.runAllTimers() + + expect(completed).toBe(true) + expect(reportedTime).toBeGreaterThanOrEqual(0) + }) + + it('reports processing time', () => { + let reportedTime = 0 + const queue = new TaskQueue({ + onComplete: (timeMs) => { + reportedTime = timeMs + }, + }) + + queue.enqueueAll([() => {}, () => {}]) + + jest.runAllTimers() + + expect(reportedTime).toBeGreaterThanOrEqual(0) + }) + }) +}) + +describe('processWithYield', () => { + beforeEach(() => { + jest.useFakeTimers() + }) + + afterEach(() => { + jest.useRealTimers() + }) + + it('processes array items and returns results', async () => { + const items = [1, 2, 3, 4, 5] + const promise = processWithYield(items, (x) => x * 2) + + jest.runAllTimers() + + const results = await promise + expect(results).toEqual([2, 4, 6, 8, 10]) + }) + + it('preserves array indices', async () => { + const items = ['a', 'b', 'c'] + const promise = processWithYield(items, (item, index) => `${index}:${item}`) + + jest.runAllTimers() + + const results = await promise + expect(results).toEqual(['0:a', '1:b', '2:c']) + }) + + it('yields for large arrays', async () => { + const items = new Array(100).fill(0).map((_, i) => i) + + let mockTime = 0 + jest.spyOn(performance, 'now').mockImplementation(() => mockTime) + + const promise = processWithYield( + items, + (x) => { + mockTime += 1 // Each item takes 1ms + return x * 2 + }, + { timeBudgetMs: 30 } + ) + + jest.runAllTimers() + + const results = await promise + expect(results).toHaveLength(100) + expect(results[0]).toBe(0) + expect(results[99]).toBe(198) + }) + + it('calls onComplete with processing time', async () => { + let completedTime = 0 + const items = [1, 2, 3] + + const promise = processWithYield(items, (x) => x * 2, { + onComplete: (timeMs) => { + completedTime = timeMs + }, + }) + + jest.runAllTimers() + + await promise + expect(completedTime).toBeGreaterThanOrEqual(0) + }) +}) + +describe('processAsyncWithYield', () => { + beforeEach(() => { + jest.useFakeTimers() + }) + + afterEach(() => { + jest.useRealTimers() + }) + + it('processes async tasks sequentially', async () => { + const results: number[] = [] + const tasks = [ + async () => { + results.push(1) + return 1 + }, + async () => { + results.push(2) + return 2 + }, + async () => { + results.push(3) + return 3 + }, + ] + + const promise = processAsyncWithYield(tasks) + + jest.runAllTimers() + await promise + + expect(results).toEqual([1, 2, 3]) + }) + + it('returns results from async tasks', async () => { + const tasks = [async () => 'a', async () => 'b', async () => 'c'] + + const promise = processAsyncWithYield(tasks) + + jest.runAllTimers() + const results = await promise + + expect(results).toEqual(['a', 'b', 'c']) + }) + + it('processes all async tasks', async () => { + const tasks = new Array(10).fill(0).map((_, i) => async () => i) + + const results = await processAsyncWithYield(tasks, { timeBudgetMs: 30 }) + + expect(results).toEqual([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]) + }) + + it('handles async task errors with custom handler', async () => { + const errors: Error[] = [] + const expectedError = new Error('Async task failed') + + const tasks = [ + async () => 1, + async () => { + throw expectedError + }, + async () => 3, + ] + + const promise = processAsyncWithYield(tasks, { + onError: (error) => errors.push(error), + }) + + jest.runAllTimers() + await promise + + expect(errors).toHaveLength(1) + expect(errors[0]).toBe(expectedError) + }) + + it('calls onComplete with total time', async () => { + let completedTime = 0 + + let mockTime = 0 + jest.spyOn(performance, 'now').mockImplementation(() => mockTime) + + const tasks = [ + async () => { + mockTime += 10 + return 1 + }, + async () => { + mockTime += 20 + return 2 + }, + ] + + const promise = processAsyncWithYield(tasks, { + onComplete: (timeMs) => { + completedTime = timeMs + }, + }) + + jest.runAllTimers() + await promise + + expect(completedTime).toBe(30) + }) +}) diff --git a/packages/browser/src/utils/task-queue.ts b/packages/browser/src/utils/task-queue.ts new file mode 100644 index 0000000000..8c3a1ec434 --- /dev/null +++ b/packages/browser/src/utils/task-queue.ts @@ -0,0 +1,210 @@ +/** + * Scheduler for breaking up CPU-intensive work to avoid blocking the main thread. + * Uses time-budgeting to yield control back to the browser periodically, + * improving Interaction to Next Paint (INP) and overall page responsiveness. + */ + +import { logger } from './logger' + +export interface TaskQueueConfig { + /** + * Maximum time (in ms) to spend processing tasks before yielding. + * Default: 30ms (conservative, allows browser time for rendering and interactions) + */ + timeBudgetMs?: number + + /** + * Callback invoked when all tasks have been processed. + * Receives the total time spent processing tasks. + */ + onComplete?: (totalTimeMs: number) => void + + /** + * Callback invoked when a task throws an error. + * If not provided, errors are logged using the SDK logger. + */ + onError?: (error: Error, task: () => void) => void +} + +export class TaskQueue { + private _queue: Array<() => void> = [] + private _processing = false + private _startTime = 0 + private _config: Required + + constructor(config: TaskQueueConfig = {}) { + this._config = { + timeBudgetMs: config.timeBudgetMs ?? 30, + onComplete: config.onComplete ?? (() => {}), + onError: + config.onError ?? + ((error: Error) => { + logger.error('Error processing task:', error) + }), + } + } + + /** + * Add a task to the queue. Tasks are processed in FIFO order. + * If the queue is not already processing, processing starts immediately. + */ + enqueue(task: () => void): void { + this._queue.push(task) + + if (!this._processing) { + this._processing = true + // we don't support IE11 anymore, so performance.now is safe + // eslint-disable-next-line compat/compat + this._startTime = performance.now() + this._process() + } + } + + /** + * Add multiple tasks to the queue at once. + */ + enqueueAll(tasks: Array<() => void>): void { + this._queue.push(...tasks) + + if (!this._processing) { + this._processing = true + // we don't support IE11 anymore, so performance.now is safe + // eslint-disable-next-line compat/compat + this._startTime = performance.now() + this._process() + } + } + + /** + * Returns the number of pending tasks in the queue. + */ + get pending(): number { + return this._queue.length + } + + /** + * Returns whether the queue is currently processing tasks. + */ + get isProcessing(): boolean { + return this._processing + } + + private _process(): void { + // we don't support IE11 anymore, so performance.now is safe + // eslint-disable-next-line compat/compat + const batchStartTime = performance.now() + + while (this._queue.length > 0) { + // we don't support IE11 anymore, so performance.now is safe + // eslint-disable-next-line compat/compat + const elapsed = performance.now() - batchStartTime + + if (elapsed >= this._config.timeBudgetMs) { + // Exceeded time budget, yield to browser + setTimeout(() => { + this._process() + }, 0) + return + } + + const task = this._queue.shift() + if (task) { + try { + task() + } catch (error) { + this._config.onError(error as Error, task) + } + } + } + + // All tasks complete + // we don't support IE11 anymore, so performance.now is safe + // eslint-disable-next-line compat/compat + const totalTime = Math.round(performance.now() - this._startTime) + this._processing = false + this._config.onComplete(totalTime) + } +} + +/** + * Process an array of items with a transformation function, yielding to the main thread + * periodically to avoid blocking. Returns a promise that resolves when all items are processed. + * + * @param items - Array of items to process + * @param fn - Function to apply to each item + * @param config - Optional task queue configuration + * @returns Promise that resolves with array of results when processing completes + */ +export function processWithYield( + items: T[], + fn: (item: T, index: number) => R, + config?: TaskQueueConfig +): // eslint-disable-next-line compat/compat +Promise { + // eslint-disable-next-line compat/compat + return new Promise((resolve) => { + const results: R[] = [] + const queue = new TaskQueue({ + ...config, + onComplete: (totalTimeMs) => { + config?.onComplete?.(totalTimeMs) + resolve(results) + }, + }) + + items.forEach((item, index) => { + queue.enqueue(() => { + results[index] = fn(item, index) + }) + }) + }) +} + +/** + * Runs async tasks sequentially with yielding between each task. + * Unlike processWithYield, this supports async functions and waits for each to complete. + * + * @param tasks - Array of async functions to execute + * @param config - Optional task queue configuration + * @returns Promise that resolves when all tasks complete + */ +export async function processAsyncWithYield(tasks: Array<() => Promise>, config?: TaskQueueConfig): Promise { + const results: T[] = [] + const timeBudgetMs = config?.timeBudgetMs ?? 30 + // we don't support IE11 anymore, so performance.now is safe + // eslint-disable-next-line compat/compat + const startTime = performance.now() + // eslint-disable-next-line compat/compat + let batchStartTime = performance.now() + + for (let i = 0; i < tasks.length; i++) { + // we don't support IE11 anymore, so performance.now is safe + // eslint-disable-next-line compat/compat + const elapsed = performance.now() - batchStartTime + + if (elapsed >= timeBudgetMs && i < tasks.length) { + // Yield to browser + // eslint-disable-next-line compat/compat + await new Promise((resolve) => setTimeout(resolve, 0)) + // eslint-disable-next-line compat/compat + batchStartTime = performance.now() + } + + try { + results[i] = await tasks[i]() + } catch (error) { + if (config?.onError) { + config.onError(error as Error, tasks[i]) + } else { + logger.error('Error processing async task:', error) + } + } + } + + // we don't support IE11 anymore, so performance.now is safe + // eslint-disable-next-line compat/compat + const totalTime = Math.round(performance.now() - startTime) + config?.onComplete?.(totalTime) + + return results +} diff --git a/packages/browser/terser-mangled-names.json b/packages/browser/terser-mangled-names.json index 59ac97dbbe..f05de59a31 100644 --- a/packages/browser/terser-mangled-names.json +++ b/packages/browser/terser-mangled-names.json @@ -240,8 +240,9 @@ "_prepareFeatureFlagsForCallbacks", "_previousPageViewProperties", "_primary_window_exists_storage_key", - "_processInitTaskQueue", + "_process", "_processQueuedEvents", + "_processing", "_queue", "_queuedRRWebEvents", "_random", @@ -316,6 +317,7 @@ "_startRecorder", "_startScrollObserver", "_startSelectionChangedObserver", + "_startTime", "_start_queue_if_opted_in", "_statusMatcher", "_stopBuffering", diff --git a/playground/remix/app/providers.tsx b/playground/remix/app/providers.tsx index adab7127a8..739aa3083f 100644 --- a/playground/remix/app/providers.tsx +++ b/playground/remix/app/providers.tsx @@ -8,8 +8,8 @@ export function PHProvider({ children }: { children: React.ReactNode }) { const [events, setEvents] = useState([]) useEffect(() => { - posthog.init('phc_test_key_for_playground', { - api_host: '/ph-relay-xyz123', + posthog.init('sTMFPsFhdP1Ssg', { + api_host: 'https://us.i.posthog.com', ui_host: 'https://us.posthog.com', defaults: '2025-05-24', before_send: (cr) => { diff --git a/playground/remix/package.json b/playground/remix/package.json index 25c0649364..95e4939a3c 100644 --- a/playground/remix/package.json +++ b/playground/remix/package.json @@ -10,17 +10,17 @@ "typecheck": "tsc" }, "dependencies": { - "@posthog/react": "*", - "@remix-run/node": "^2.15.2", - "@remix-run/react": "^2.15.2", - "@remix-run/serve": "^2.15.2", + "@posthog/react": "1.4.0", + "@remix-run/node": "2.16.8", + "@remix-run/react": "2.16.8", + "@remix-run/serve": "2.16.8", "isbot": "^4.1.0", - "posthog-js": "*", + "posthog-js": "1.290.0", "react": "^18.3.1", "react-dom": "^18.3.1" }, "devDependencies": { - "@remix-run/dev": "^2.15.2", + "@remix-run/dev": "2.16.8", "@types/react": "^18.3.16", "@types/react-dom": "^18.3.5", "typescript": "^5.7.2",