From 186c5fb6b7c074d94bb9e5d4094f1909671c6f88 Mon Sep 17 00:00:00 2001 From: Brian Gardiner Date: Thu, 14 May 2026 10:07:20 -0400 Subject: [PATCH] feat: parallelize container monitor dependency requests Rewrite monitorDependencies in src/lib/ecosystems/monitor.ts to fan out per-ScanResult /monitor-dependencies PUTs in parallel via pMap, bounded by the SNYK_REQUEST_CONCURRENCY limit (default 5). Per-ScanResult work is extracted into monitorOneScanResult for testability and clarity. Container images that produce many ScanResults (e.g. one per directory of JARs in fat-JAR-heavy images) previously incurred one full RTT per scan result, since the prior implementation used a nested for-loop with await. With bounded parallelism this collapses to ~ceil(N / concurrency) sequential batches, materially reducing wall-clock for large images. Error semantics are preserved: - 401 still throws AuthFailedError (terminates the run). - Other 4xx still throws MonitorError (terminates via pMap fail-fast). - 5xx and other non-4xx errors are accumulated per-ScanResult into the errors array, matching the prior continue-on-error behavior. Result order is preserved by pMap based on input order, so output remains deterministic regardless of completion order. Tests cover concurrency cap (default 5), env override via the internal SNYK_INTERNAL_REQUEST_CONCURRENCY contract that the wrapping Go CLI forwards, ordering preservation, 4xx fail-fast, and 5xx accumulation. --- src/lib/ecosystems/monitor.ts | 109 +++++++----- .../unit/ecosystems-monitor-docker.spec.ts | 156 ++++++++++++++++++ 2 files changed, 226 insertions(+), 39 deletions(-) diff --git a/src/lib/ecosystems/monitor.ts b/src/lib/ecosystems/monitor.ts index 6a02f6e78b..19936044f3 100644 --- a/src/lib/ecosystems/monitor.ts +++ b/src/lib/ecosystems/monitor.ts @@ -1,8 +1,10 @@ import { InspectResult } from '@snyk/cli-interface/legacy/plugin'; import chalk from 'chalk'; +import * as pMap from 'p-map'; import config from '../config'; import { isCI } from '../is-ci'; import { makeRequest } from '../request/promise'; +import { getRequestConcurrency } from '../snyk-test/common'; import { Contributor, MonitorOptions, @@ -148,47 +150,21 @@ async function monitorDependencies( ): Promise<[EcosystemMonitorResult[], EcosystemMonitorError[]]> { const results: EcosystemMonitorResult[] = []; const errors: EcosystemMonitorError[] = []; + const concurrency = getRequestConcurrency(); + for (const [path, scanResults] of Object.entries(scans)) { await spinner(`Monitoring dependencies in ${path}`); - for (const scanResult of scanResults) { - const monitorDependenciesRequest = - await generateMonitorDependenciesRequest(scanResult, options); - - const configOrg = config.org ? decodeURIComponent(config.org) : undefined; - - const payload = { - method: 'PUT', - url: `${config.API}/monitor-dependencies`, - json: true, - headers: { - 'x-is-ci': isCI(), - authorization: getAuthHeader(), - }, - body: monitorDependenciesRequest, - qs: { - org: options.org || configOrg, - }, - }; - try { - const response = - await makeRequest(payload); - results.push({ - ...response, - path, - scanResult, - }); - } catch (error) { - if (error.code === 401) { - throw AuthFailedError(); - } - if (error.code >= 400 && error.code < 500) { - throw new MonitorError(error.code, error.message); - } - errors.push({ - error: 'Could not monitor dependencies in ' + path, - path, - scanResult, - }); + const perScanResults = await pMap( + scanResults, + (scanResult) => monitorOneScanResult(scanResult, options, path), + { concurrency }, + ); + for (const r of perScanResults) { + if (r.result) { + results.push(r.result); + } + if (r.error) { + errors.push(r.error); } } spinner.clearAll(); @@ -196,6 +172,61 @@ async function monitorDependencies( return [results, errors]; } +async function monitorOneScanResult( + scanResult: ScanResult, + options: Options & MonitorOptions, + path: string, +): Promise<{ + result?: EcosystemMonitorResult; + error?: EcosystemMonitorError; +}> { + const monitorDependenciesRequest = await generateMonitorDependenciesRequest( + scanResult, + options, + ); + + const configOrg = config.org ? decodeURIComponent(config.org) : undefined; + + const payload = { + method: 'PUT', + url: `${config.API}/monitor-dependencies`, + json: true, + headers: { + 'x-is-ci': isCI(), + authorization: getAuthHeader(), + }, + body: monitorDependenciesRequest, + qs: { + org: options.org || configOrg, + }, + }; + + try { + const response = await makeRequest(payload); + return { + result: { + ...response, + path, + scanResult, + }, + }; + } catch (error) { + if (error.code === 401) { + throw AuthFailedError(); + } + if (error.code >= 400 && error.code < 500) { + throw new MonitorError(error.code, error.message); + } + return { + error: { + error: 'Could not monitor dependencies in ' + path, + path, + scanResult, + }, + }; + } +} + export async function getFormattedMonitorOutput( results: Array, monitorResults: EcosystemMonitorResult[], diff --git a/test/jest/unit/ecosystems-monitor-docker.spec.ts b/test/jest/unit/ecosystems-monitor-docker.spec.ts index b2f113b9cc..d8717a66b1 100644 --- a/test/jest/unit/ecosystems-monitor-docker.spec.ts +++ b/test/jest/unit/ecosystems-monitor-docker.spec.ts @@ -285,4 +285,160 @@ describe('monitorEcosystem docker/container', () => { ); expect(parsedOutput.projectName).not.toBe('my-custom-project-name'); }); + + describe('parallelization of monitor-dependencies requests', () => { + const ORIGINAL_CONCURRENCY = process.env.SNYK_INTERNAL_REQUEST_CONCURRENCY; + + afterEach(() => { + if (ORIGINAL_CONCURRENCY === undefined) { + delete process.env.SNYK_INTERNAL_REQUEST_CONCURRENCY; + } else { + process.env.SNYK_INTERNAL_REQUEST_CONCURRENCY = ORIGINAL_CONCURRENCY; + } + }); + + function makeMavenScanResult(targetFile: string): ScanResult { + const base = readJsonFixture( + 'maven-project-0-dependencies-scan-result.json', + ) as ScanResult; + return { + ...base, + identity: { ...base.identity, targetFile }, + }; + } + + function makeMonitorResponse(identity: string) { + const base = readJsonFixture( + 'monitor-dependencies-response-with-project-name.json', + ) as ecosystemsTypes.MonitorDependenciesResponse; + return { + ...base, + id: `${identity}-id`, + projectName: identity, + }; + } + + async function runMonitor(scanResults: ScanResult[]) { + jest.spyOn(dockerPlugin, 'scan').mockResolvedValue({ scanResults }); + return ecosystems.monitorEcosystem('docker', ['/srv'], { + path: '/srv', + docker: true, + org: 'my-org', + }); + } + + it('caps in-flight requests at the default concurrency (5)', async () => { + delete process.env.SNYK_INTERNAL_REQUEST_CONCURRENCY; + const scanResults = Array.from({ length: 25 }, (_, i) => + makeMavenScanResult(`app-${i}`), + ); + + let inFlight = 0; + let peakInFlight = 0; + jest.spyOn(request, 'makeRequest').mockImplementation((payload: any) => { + inFlight++; + peakInFlight = Math.max(peakInFlight, inFlight); + const identity = payload.body.scanResult.identity.targetFile; + return new Promise((resolve) => { + setTimeout(() => { + inFlight--; + resolve(makeMonitorResponse(identity)); + }, 10); + }); + }); + + await runMonitor(scanResults); + + expect(peakInFlight).toBeLessThanOrEqual(5); + expect(peakInFlight).toBeGreaterThan(1); + }); + + it('respects SNYK_INTERNAL_REQUEST_CONCURRENCY override', async () => { + process.env.SNYK_INTERNAL_REQUEST_CONCURRENCY = '3'; + const scanResults = Array.from({ length: 15 }, (_, i) => + makeMavenScanResult(`app-${i}`), + ); + + let inFlight = 0; + let peakInFlight = 0; + jest.spyOn(request, 'makeRequest').mockImplementation((payload: any) => { + inFlight++; + peakInFlight = Math.max(peakInFlight, inFlight); + const identity = payload.body.scanResult.identity.targetFile; + return new Promise((resolve) => { + setTimeout(() => { + inFlight--; + resolve(makeMonitorResponse(identity)); + }, 10); + }); + }); + + await runMonitor(scanResults); + + expect(peakInFlight).toBeLessThanOrEqual(3); + }); + + it('preserves result order matching input order', async () => { + delete process.env.SNYK_INTERNAL_REQUEST_CONCURRENCY; + const scanResults = ['os', 'app-1', 'app-2', 'app-3', 'app-4'].map( + makeMavenScanResult, + ); + + // Stagger response times in reverse so completion order != input order. + jest.spyOn(request, 'makeRequest').mockImplementation((payload: any) => { + const identity = payload.body.scanResult.identity.targetFile; + const delay = + { os: 30, 'app-1': 20, 'app-2': 5, 'app-3': 25, 'app-4': 10 }[ + identity + ] ?? 0; + return new Promise((resolve) => + setTimeout(() => resolve(makeMonitorResponse(identity)), delay), + ); + }); + + const [results] = await runMonitor(scanResults); + + expect(results.map((r) => r.projectName)).toEqual([ + 'os', + 'app-1', + 'app-2', + 'app-3', + 'app-4', + ]); + }); + + it('throws MonitorError when any request returns 4xx (fail-fast)', async () => { + delete process.env.SNYK_INTERNAL_REQUEST_CONCURRENCY; + const scanResults = ['app-1', 'app-2', 'app-3'].map(makeMavenScanResult); + + jest.spyOn(request, 'makeRequest').mockImplementation((payload: any) => { + const identity = payload.body.scanResult.identity.targetFile; + if (identity === 'app-2') { + return Promise.reject({ code: 403, message: 'forbidden' }); + } + return Promise.resolve(makeMonitorResponse(identity)); + }); + + await expect(runMonitor(scanResults)).rejects.toThrow('forbidden'); + }); + + it('accumulates 5xx errors per scan-result without aborting', async () => { + delete process.env.SNYK_INTERNAL_REQUEST_CONCURRENCY; + const scanResults = ['app-1', 'app-2', 'app-3'].map(makeMavenScanResult); + + jest.spyOn(request, 'makeRequest').mockImplementation((payload: any) => { + const identity = payload.body.scanResult.identity.targetFile; + if (identity === 'app-2') { + return Promise.reject({ code: 503, message: 'unavailable' }); + } + return Promise.resolve(makeMonitorResponse(identity)); + }); + + const [results, errors] = await runMonitor(scanResults); + + expect(results.map((r) => r.projectName)).toEqual(['app-1', 'app-3']); + expect(errors).toHaveLength(1); + expect(errors[0].error).toContain('Could not monitor dependencies'); + }); + }); });