diff --git a/package.json b/package.json index 41977da1..18d55e45 100644 --- a/package.json +++ b/package.json @@ -43,10 +43,6 @@ "url": "https://github.com/apify/apify-client-js/issues" }, "homepage": "https://docs.apify.com/api/client/js/", - "files": [ - "dist", - "!dist/*.tsbuildinfo" - ], "scripts": { "build": "npm run clean && npm run build:node && npm run build:browser", "postbuild": "gen-esm-wrapper dist/index.js dist/index.mjs", diff --git a/src/resource_clients/actor.ts b/src/resource_clients/actor.ts index 4d73cbb0..14c52cec 100644 --- a/src/resource_clients/actor.ts +++ b/src/resource_clients/actor.ts @@ -155,12 +155,15 @@ export class ActorClient extends ResourceClient { const newRunClient = this.apifyClient.run(id); const streamedLog = await newRunClient.getStreamedLog({ toLog: options?.log }); + const statusMessageWatcher = await newRunClient.getStatusMessageWatcher({ toLog: options?.log }); streamedLog?.start(); + statusMessageWatcher?.start(); return this.apifyClient .run(id) .waitForFinish({ waitSecs }) .finally(async () => { await streamedLog?.stop(); + await statusMessageWatcher?.stop(); }); } diff --git a/src/resource_clients/log.ts b/src/resource_clients/log.ts index c77b43bb..ca8aab48 100644 --- a/src/resource_clients/log.ts +++ b/src/resource_clients/log.ts @@ -11,6 +11,7 @@ import type { ApiClientSubResourceOptions } from '../base/api_client'; import { ResourceClient } from '../base/resource_client'; import type { ApifyRequestConfig } from '../http_client'; import { cast, catchNotFoundOrThrow } from '../utils'; +import type { RunClient } from './run'; export class LogClient extends ResourceClient { /** @@ -237,3 +238,80 @@ export interface StreamedLogOptions { /** Whether to redirect all logs from Actor run start (even logs from the past). */ fromStart?: boolean; } + +/** + * Helper class for redirecting Actor run status and status message to log. + */ +export class StatusMessageWatcher { + private static finalSleepTimeMs = 6000; + private static defaultCheckPeriodMs = 5000; + + protected toLog: Log; + protected checkPeriod: number; + protected lastStatusMessage = ''; + private runClient: RunClient; + private loggingTask: Promise | null = null; + private stopLogging = false; + + constructor(options: StatusMessageWatcherOptions) { + const { toLog, runClient, checkPeriod = StatusMessageWatcher.defaultCheckPeriodMs } = options; + this.runClient = runClient; + this.toLog = toLog; + this.checkPeriod = checkPeriod; + } + + /** + * Start Actor run status and status message redirection. + */ + start() { + if (this.loggingTask) { + throw new Error('Logging task already active'); + } + this.stopLogging = false; + this.loggingTask = this._logChangedStatusMessage(); + } + + /** + * Stop Actor run status and status message redirection. + */ + async stop(): Promise { + if (!this.loggingTask) { + throw new Error('Logging task is not active'); + } + await new Promise((resolve) => { + // Wait for the final status and status message to be set + setTimeout(resolve, StatusMessageWatcher.finalSleepTimeMs); + }); + this.stopLogging = true; + await this.loggingTask; + this.loggingTask = null; + } + + async _logChangedStatusMessage(): Promise { + while (!this.stopLogging) { + const runData = await this.runClient.get(); + if (runData !== undefined) { + const status = runData.status ?? 'Unknown status'; + const statusMessage = runData.statusMessage ?? ''; + const newStatusMessage = `Status: ${status}, Message: ${statusMessage}`; + if (newStatusMessage !== this.lastStatusMessage) { + // Log only when status or status message changed + this.lastStatusMessage = newStatusMessage; + this.toLog.info(newStatusMessage); + } + await new Promise((resolve) => { + setTimeout(resolve, this.checkPeriod); + }); + } + } + } +} + +export interface StatusMessageWatcherOptions { + /** Run client used to communicate with the Apify API. */ + runClient: RunClient; + /** Log to which the Actor run logs will be redirected. */ + toLog: Log; + /** How often should change in status be checked. */ + checkPeriod?: number; +} diff --git a/src/resource_clients/run.ts b/src/resource_clients/run.ts index 8883e176..115b42de 100644 --- a/src/resource_clients/run.ts +++ b/src/resource_clients/run.ts @@ -11,7 +11,7 @@ import { cast, isNode, parseDateFields, pluckData } from '../utils'; import type { ActorRun } from './actor'; import { DatasetClient } from './dataset'; import { KeyValueStoreClient } from './key_value_store'; -import { LogClient, LoggerActorRedirect, StreamedLog } from './log'; +import { LogClient, LoggerActorRedirect, StatusMessageWatcher, StreamedLog } from './log'; import { RequestQueueClient } from './request_queue'; const RUN_CHARGE_IDEMPOTENCY_HEADER = 'idempotency-key'; @@ -279,24 +279,49 @@ export class RunClient extends ResourceClient { return undefined; } if (toLog === undefined || toLog === 'default') { - // Create default StreamedLog - // Get actor name and run id - const runData = await this.get(); - const runId = runData?.id ?? ''; - - const actorId = runData?.actId ?? ''; - const actorData = (await this.apifyClient.actor(actorId).get()) || { name: '' }; + toLog = await this.getActorRedirectLog(); + } - const actorName = actorData?.name ?? ''; - const name = [actorName, `runId:${runId}`].filter(Boolean).join(' '); + return new StreamedLog({ logClient: this.log(), toLog, fromStart }); + } - toLog = new Log({ level: LEVELS.DEBUG, prefix: `${name} -> `, logger: new LoggerActorRedirect() }); + /** + * Get StatusMessageWatcher for convenient streaming of the Actor run status message and its redirection. + */ + async getStatusMessageWatcher( + options: getStatusMessageWatcherOptions = {}, + ): Promise { + let { toLog } = options; + if (toLog === null || !isNode()) { + // Explicitly no logging or not in Node.js + return undefined; } + if (toLog === undefined || toLog === 'default') { + toLog = await this.getActorRedirectLog(); + } + return new StatusMessageWatcher({ toLog, runClient: this, checkPeriod: options.checkPeriod }); + } - return new StreamedLog({ logClient: this.log(), toLog, fromStart }); + private async getActorRedirectLog(): Promise { + // Get actor name and run id + const runData = await this.get(); + const runId = runData ? `${runData.id ?? ''}` : ''; + + const actorId = runData?.actId ?? ''; + const actorData = (await this.apifyClient.actor(actorId).get()) || { name: '' }; + + const actorName = runData ? (actorData.name ?? '') : ''; + const name = [actorName, `runId:${runId}`].filter(Boolean).join(' '); + + return new Log({ level: LEVELS.DEBUG, prefix: `${name} -> `, logger: new LoggerActorRedirect() }); } } +export interface getStatusMessageWatcherOptions { + toLog?: Log | null | 'default'; + checkPeriod?: number; +} + export interface GetStreamedLogOptions { toLog?: Log | null | 'default'; fromStart?: boolean; diff --git a/test/actors.test.js b/test/actors.test.js index ada2468b..d2fc1684 100644 --- a/test/actors.test.js +++ b/test/actors.test.js @@ -1,9 +1,9 @@ const { Browser, validateRequest, DEFAULT_OPTIONS } = require('./_helper'); -const { ActorListSortBy, ApifyClient, LoggerActorRedirect } = require('apify-client'); +const { ActorListSortBy, ApifyClient, LoggerActorRedirect, RunClient, StatusMessageWatcher } = require('apify-client'); const { stringifyWebhooksToBase64 } = require('../src/utils'); const { mockServer, createDefaultApp } = require('./mock_server/server'); const c = require('ansi-colors'); -const { MOCKED_ACTOR_LOGS_PROCESSED, StatusGenerator } = require('./mock_server/test_utils'); +const { MOCKED_ACTOR_LOGS_PROCESSED, StatusGenerator, MOCKED_ACTOR_STATUSES } = require('./mock_server/test_utils'); const { Log, LEVELS } = require('@apify/log'); const express = require('express'); const { setTimeout } = require('node:timers/promises'); @@ -727,12 +727,16 @@ describe('Run actor with redirected logs', () => { describe('actor.call - redirected logs', () => { test.each(testCases)('logOptions:$logOptions', async ({ expectedPrefix, logOptions }) => { + const mockedGetStatusMessageWatcher = jest + .spyOn(RunClient.prototype, 'getStatusMessageWatcher') + .mockImplementation(() => {}); const logSpy = jest.spyOn(console, 'log').mockImplementation(() => {}); await client.actor('redirect-actor-id').call(undefined, logOptions); expect(logSpy.mock.calls).toEqual(MOCKED_ACTOR_LOGS_PROCESSED.map((item) => [expectedPrefix + item])); logSpy.mockRestore(); + mockedGetStatusMessageWatcher.mockRestore(); }); test('logOptions:{ "log": null }', async () => { @@ -745,3 +749,86 @@ describe('Run actor with redirected logs', () => { }); }); }); + +describe('Run actor with status message watcher', () => { + let baseUrl; + let client; + const originalCheckPeriodMs = StatusMessageWatcher.defaultCheckPeriodMs; + const originalFinalSleepTimeMs = StatusMessageWatcher.finalSleepTimeMs; + const statusGenerator = new StatusGenerator(); + + beforeAll(async () => { + // Use custom router for the tests + const router = express.Router(); + // Set up a status generator to simulate run status changes. It will be reset for each test. + router.get('/actor-runs/redirect-run-id', async (req, res) => { + // Delay the response to give the actor time to run and produce expected logs + await setTimeout(10); + let [status, statusMessage] = ['', '']; + [status, statusMessage] = statusGenerator.next().value; + res.json({ data: { id: 'redirect-run-id', actId: 'redirect-actor-id', status, statusMessage } }); + }); + const app = createDefaultApp(router); + const server = await mockServer.start(undefined, app); + baseUrl = `http://localhost:${server.address().port}`; + + StatusMessageWatcher.defaultCheckPeriodMs = 1; // speed up tests + StatusMessageWatcher.finalSleepTimeMs = 1; // speed up tests + }); + + afterAll(async () => { + await Promise.all([mockServer.close()]); + StatusMessageWatcher.defaultCheckPeriodMs = originalCheckPeriodMs; + StatusMessageWatcher.finalSleepTimeMs = originalFinalSleepTimeMs; + }); + + beforeEach(async () => { + client = new ApifyClient({ + baseUrl, + maxRetries: 0, + ...DEFAULT_OPTIONS, + }); + }); + afterEach(async () => { + // Reset the generator to so that the next test starts fresh + statusGenerator.reset(); + client = null; + }); + + const testCases = [ + { expectedPrefix: c.cyan('redirect-actor-name runId:redirect-run-id -> '), logOptions: {} }, + { expectedPrefix: c.cyan('redirect-actor-name runId:redirect-run-id -> '), logOptions: { log: 'default' } }, + { + expectedPrefix: c.cyan('custom prefix...'), + logOptions: { + log: new Log({ level: LEVELS.DEBUG, prefix: 'custom prefix...', logger: new LoggerActorRedirect() }), + }, + }, + ]; + + describe('actor.call - status watcher', () => { + test.each(testCases)('logOptions:$logOptions', async ({ expectedPrefix, logOptions }) => { + const logSpy = jest.spyOn(console, 'log').mockImplementation(() => {}); + // Ignore StreamedLog to not clutter the status message watcher test + const mockedGetStreamedLog = jest.spyOn(RunClient.prototype, 'getStreamedLog').mockImplementation(() => {}); + + await client.actor('redirect-actor-id').call(undefined, logOptions); + + const uniqueStatuses = Array.from(new Set(MOCKED_ACTOR_STATUSES.map(JSON.stringify))).map(JSON.parse); + expect(logSpy.mock.calls).toEqual( + uniqueStatuses.map((item) => [`${expectedPrefix}Status: ${item[0]}, Message: ${item[1]}`]), + ); + logSpy.mockRestore(); + mockedGetStreamedLog.mockRestore(); + }); + + test('no log', async () => { + const logSpy = jest.spyOn(console, 'log').mockImplementation(() => {}); + + await client.actor('redirect-actor-id').call(undefined, { log: null }); + + expect(logSpy.mock.calls).toEqual([]); + logSpy.mockRestore(); + }); + }); +}); diff --git a/test/mock_server/test_utils.js b/test/mock_server/test_utils.js index 6296059e..32b88de0 100644 --- a/test/mock_server/test_utils.js +++ b/test/mock_server/test_utils.js @@ -34,15 +34,13 @@ const MOCKED_ACTOR_LOGS_PROCESSED = [ ]; const MOCKED_ACTOR_STATUSES = [ + ['RUNNING', 'Actor Started'], ['RUNNING', 'Actor Started'], ['RUNNING', 'Doing some stuff'], ['RUNNING', 'Doing some stuff'], ['RUNNING', 'Doing some stuff'], - ['RUNNING', 'Doing some stuff'], - ['RUNNING', 'Doing some stuff'], - ['RUNNING', 'Doing some stuff'], - ['RUNNING', 'Doing some stuff'], - ['RUNNING', 'Doing some stuff'], + ['SUCCEEDED', 'Actor Finished'], + ['SUCCEEDED', 'Actor Finished'], ['SUCCEEDED', 'Actor Finished'], ]; diff --git a/test/runs.test.js b/test/runs.test.js index c8a05032..c0809980 100644 --- a/test/runs.test.js +++ b/test/runs.test.js @@ -1,9 +1,10 @@ const { Browser, validateRequest, DEFAULT_OPTIONS } = require('./_helper'); -const { ApifyClient } = require('apify-client'); -const { mockServer } = require('./mock_server/server'); +const { ApifyClient, StatusMessageWatcher } = require('apify-client'); +const { mockServer, createDefaultApp } = require('./mock_server/server'); const c = require('ansi-colors'); -const { MOCKED_ACTOR_LOGS_PROCESSED } = require('./mock_server/test_utils'); +const { MOCKED_ACTOR_LOGS_PROCESSED, MOCKED_ACTOR_STATUSES, StatusGenerator } = require('./mock_server/test_utils'); const { setTimeout: setTimeoutNode } = require('node:timers/promises'); +const express = require('express'); describe('Run methods', () => { let baseUrl; @@ -432,3 +433,74 @@ describe('Redirect run logs', () => { }); }); }); + +describe('Redirect run status message', () => { + let baseUrl; + let client; + const statusGenerator = new StatusGenerator(); + const originalCheckPeriodMs = StatusMessageWatcher.defaultCheckPeriodMs; + const originalFinalSleepTimeMs = StatusMessageWatcher.finalSleepTimeMs; + + beforeAll(async () => { + // Use custom router for the tests + const router = express.Router(); + // Set up a status generator to simulate run status changes. It will be reset for each test. + router.get('/actor-runs/redirect-run-id', async (req, res) => { + // Delay the response to give the actor time to run and produce expected logs + await new Promise((resolve) => { + setTimeout(resolve, 10); + }); + const [status, statusMessage] = statusGenerator.next().value; + res.json({ data: { id: 'redirect-run-id', actId: 'redirect-actor-id', status, statusMessage } }); + }); + const app = createDefaultApp(router); + const server = await mockServer.start(undefined, app); + baseUrl = `http://localhost:${server.address().port}`; + + StatusMessageWatcher.defaultCheckPeriodMs = 1; // speed up tests + StatusMessageWatcher.finalSleepTimeMs = 1; // speed up tests + }); + + afterAll(async () => { + await Promise.all([mockServer.close()]); + StatusMessageWatcher.defaultCheckPeriodMs = originalCheckPeriodMs; + StatusMessageWatcher.finalSleepTimeMs = originalFinalSleepTimeMs; + }); + + beforeEach(async () => { + client = new ApifyClient({ + baseUrl, + maxRetries: 0, + ...DEFAULT_OPTIONS, + }); + }); + afterEach(async () => { + // Reset the generator to so that the next test starts fresh + statusGenerator.reset(); + client = null; + }); + + describe('run.getStatusMessageWatcher', () => { + test('Log same repeated statuses just once', async () => { + const logSpy = jest.spyOn(console, 'log').mockImplementation(() => {}); + + const statusMessageWatcher = await client + .run('redirect-run-id') + .getStatusMessageWatcher({ checkPeriod: 1 }); + + statusMessageWatcher.start(); + // Wait some time to accumulate statuses + await new Promise((resolve) => { + setTimeout(resolve, 100); + }); + await statusMessageWatcher.stop(); + + const loggerPrefix = c.cyan('redirect-actor-name runId:redirect-run-id -> '); + const uniqueStatuses = Array.from(new Set(MOCKED_ACTOR_STATUSES.map(JSON.stringify))).map(JSON.parse); + expect(logSpy.mock.calls).toEqual( + uniqueStatuses.map((item) => [`${loggerPrefix}Status: ${item[0]}, Message: ${item[1]}`]), + ); + logSpy.mockRestore(); + }); + }); +});