diff --git a/agents/grpc/src/grpc_agent.cc b/agents/grpc/src/grpc_agent.cc index e6381c9988..2df0c56968 100644 --- a/agents/grpc/src/grpc_agent.cc +++ b/agents/grpc/src/grpc_agent.cc @@ -80,6 +80,12 @@ const int CONSOLE_ID_SIZE = 36; const seconds DEFAULT_GRPC_TIMEOUT = seconds{ 60 }; +// Retry policy configuration +constexpr size_t retry_max_attempts = 5; +constexpr auto retry_initial_backoff = std::chrono::milliseconds(500); +constexpr auto retry_max_backoff = std::chrono::seconds(5); +constexpr float retry_backoff_multiplier = 2.0f; + JSThreadMetrics::JSThreadMetrics(SharedEnvInst envinst): metrics_(ThreadMetrics::Create(envinst)) { } @@ -1068,6 +1074,11 @@ int GrpcAgent::config(const json& config) { } } + opts.retry_policy_max_attempts = retry_max_attempts; + opts.retry_policy_initial_backoff = retry_initial_backoff; + opts.retry_policy_max_backoff = retry_max_backoff; + opts.retry_policy_backoff_multiplier = retry_backoff_multiplier; + nsolid_service_stub_ = GrpcClient::MakeNSolidServiceStub(opts, tls_keylog_file_); diff --git a/agents/grpc/src/grpc_client.cc b/agents/grpc/src/grpc_client.cc index a340fd5785..236e1115ba 100644 --- a/agents/grpc/src/grpc_client.cc +++ b/agents/grpc/src/grpc_client.cc @@ -73,6 +73,40 @@ std::shared_ptr grpc_arguments.SetInt(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 1); grpc_arguments.SetInt(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA, 0); + static const auto kServiceConfigJson = std::string_view{R"( + { + "methodConfig": [ + { + "name": [{}], + "retryPolicy": { + "maxAttempts": %0000000000u, + "initialBackoff": "%0000000000.1fs", + "maxBackoff": "%0000000000.1fs", + "backoffMultiplier": %0000000000.1f, + "retryableStatusCodes": [ + "UNAVAILABLE" + ] + } + } + ] + })"}; + + // Allocate string with buffer large enough to hold the formatted json config + auto service_config = std::string(kServiceConfigJson.size(), '\0'); + float initial_backoff = options.retry_policy_initial_backoff.count(); + float max_backoff = options.retry_policy_max_backoff.count(); + float backoff_multiplier = options.retry_policy_backoff_multiplier; + std::snprintf( + service_config.data(), + service_config.size(), + kServiceConfigJson.data(), + options.retry_policy_max_attempts, + std::min(std::max(initial_backoff, 0.f), 999999999.f), + std::min(std::max(max_backoff, 0.f), 999999999.f), + std::min(std::max(backoff_multiplier, 0.f), 999999999.f)); + + grpc_arguments.SetServiceConfigJSON(service_config); + return CreateCustomChannel(options.endpoint, MakeCredentials(options, tls_keylog_file), grpc_arguments); diff --git a/deps/opentelemetry-cpp/otlp-http-exporter.gyp b/deps/opentelemetry-cpp/otlp-http-exporter.gyp index a9ce0ef769..8be012662d 100644 --- a/deps/opentelemetry-cpp/otlp-http-exporter.gyp +++ b/deps/opentelemetry-cpp/otlp-http-exporter.gyp @@ -64,12 +64,13 @@ 'ENABLE_ASYNC_EXPORT', 'ENABLE_OTLP_GRPC_CREDENTIAL_PREVIEW', 'OPENTELEMETRY_STL_VERSION=2020', + 'ENABLE_OTLP_RETRY_PREVIEW', ], 'dependencies': [ '../protobuf/protobuf.gyp:protobuf', '../curl/curl.gyp:curl', '../grpc/grpc.gyp:grpc++', - '../protobuf/abseil.gyp:abseil_proto', + '../protobuf/abseil.gyp:abseil_proto', '../zlib/zlib.gyp:zlib', ], 'direct_dependent_settings': { @@ -77,6 +78,7 @@ 'ENABLE_ASYNC_EXPORT', 'ENABLE_OTLP_GRPC_CREDENTIAL_PREVIEW', 'OPENTELEMETRY_STL_VERSION=2020', + 'ENABLE_OTLP_RETRY_PREVIEW', ], 'include_dirs': [ 'api/include', diff --git a/test/agents/test-grpc-retry-policies.mjs b/test/agents/test-grpc-retry-policies.mjs new file mode 100644 index 0000000000..ed05a95dad --- /dev/null +++ b/test/agents/test-grpc-retry-policies.mjs @@ -0,0 +1,366 @@ +// Flags: --expose-internals + +import { mustCall, mustSucceed } from '../common/index.mjs'; +import { GRPCServer, TestClient } from '../common/nsolid-grpc-agent/index.js'; + +import assert from 'node:assert'; + +const services = [ + { name: 'ExportInfo', trigger: (client, s, id) => s.info(id), event: 'info' }, + { name: 'ExportMetricsCmd', trigger: (client, s, id) => s.metrics(id), event: 'metrics_cmd' }, + { name: 'ExportSpans', trigger: (client, s, id) => client.trace('http'), event: 'spans' }, +]; + +const tests = []; + +tests.push({ + name: 'should retry on transient UNAVAILABLE and succeed', + test: async (getEnv) => { + return new Promise((resolve) => { + const grpcServer = new GRPCServer(); + grpcServer.start(mustSucceed(async (port) => { + const env = getEnv(port); + const client = new TestClient([], { env }); + const agentId = await client.id(); + + let completed = 0; + const total = services.length; + + for (const svc of services) { + // Inject failure for the first call + grpcServer.injectFailure(svc.name, 'UNAVAILABLE', 1); + + if (svc.name === 'ExportSpans') { + grpcServer.on(svc.event, mustCall(async (data) => { + const attemptsHeader = data.metadata['grpc-previous-rpc-attempts']; + assert(attemptsHeader && attemptsHeader[0] === '1', `Should have retried once for ${svc.name}, got ${attemptsHeader}`); + completed++; + if (completed === total) { + grpcServer.clearFaults(); + await client.shutdown(0); + grpcServer.close(); + resolve(); + } + })); + await client.trace('http'); + continue; + } + + svc.trigger(client, grpcServer, agentId).then(mustCall(async ({ data }) => { + const attemptsHeader = data.metadata['grpc-previous-rpc-attempts']; + assert(attemptsHeader && attemptsHeader[0] === '1', `Should have retried once for ${svc.name}, got ${attemptsHeader}`); + completed++; + if (completed === total) { + grpcServer.clearFaults(); + await client.shutdown(0); + grpcServer.close(); + resolve(); + } + })); + } + })); + }); + }, +}); + +tests.push({ + name: 'should fail after max retries (5) on persistent UNAVAILABLE', + test: async (getEnv) => { + return new Promise((resolve) => { + const grpcServer = new GRPCServer(); + grpcServer.start(mustSucceed(async (port) => { + const env = getEnv(port); + const client = new TestClient([], { env }); + const agentId = await client.id(); + + let completed = 0; + const total = services.length; + + for (const svc of services) { + // Inject failures for more than 5 attempts + grpcServer.injectFailure(svc.name, 'UNAVAILABLE', 20); + + if (svc.name === 'ExportSpans') { + const timeout = new Promise((resolveTimeout) => setTimeout(() => resolveTimeout('timeout'), 10000)); + const success = new Promise((resolve) => grpcServer.on(svc.event, () => resolve('success'))); + Promise.race([success, timeout]).then(mustCall(async (result) => { + if (result === 'timeout') { + // Good, failed as expected due to exhausted retries + completed++; + if (completed === total) { + grpcServer.clearFaults(); + await client.shutdown(0); + grpcServer.close(); + resolve(); + } + } else { + throw new Error(`Should have failed after retries for ${svc.name}`); + } + })); + } + + const triggerPromise = svc.trigger(client, grpcServer, agentId); + if (svc.name !== 'ExportSpans') { + const timeout = new Promise((resolveTimeout) => setTimeout(() => resolveTimeout('timeout'), 20000)); + Promise.race([triggerPromise, timeout]).then(mustCall(async (result) => { + if (result === 'timeout') { + // Good, failed as expected due to exhausted retries + completed++; + if (completed === total) { + grpcServer.clearFaults(); + await client.shutdown(0); + grpcServer.close(); + resolve(); + } + } else { + throw new Error(`Should have failed after retries for ${svc.name}`); + } + })); + } + } + })); + }); + }, +}); + +tests.push({ + name: 'should not retry on non-retryable DEADLINE_EXCEEDED', + test: async (getEnv) => { + return new Promise((resolve) => { + const grpcServer = new GRPCServer(); + grpcServer.start(mustSucceed(async (port) => { + const env = getEnv(port); + const client = new TestClient([], { env }); + const agentId = await client.id(); + + let completed = 0; + const total = services.length; + + for (const svc of services) { + grpcServer.injectFailure(svc.name, 'DEADLINE_EXCEEDED', 1); + + if (svc.name === 'ExportSpans') { + const timeout = new Promise((resolveTimeout) => setTimeout(() => resolveTimeout('timeout'), 1000)); + const success = new Promise((resolve) => grpcServer.on(svc.event, () => resolve('success'))); + Promise.race([success, timeout]).then(mustCall(async (result) => { + if (result === 'timeout') { + // Good, failed immediately without retries + completed++; + if (completed === total) { + await client.shutdown(0); + grpcServer.clearFaults(); + grpcServer.close(); + resolve(); + } + } else { + await client.shutdown(0); + throw new Error(`Should have failed immediately for ${svc.name}`); + } + })); + } + + const triggerPromise = svc.trigger(client, grpcServer, agentId); + if (svc.name !== 'ExportSpans') { + const timeout = new Promise((resolveTimeout) => setTimeout(() => resolveTimeout('timeout'), 1000)); + Promise.race([triggerPromise, timeout]).then(mustCall(async (result) => { + if (result === 'timeout') { + // Good, failed immediately without retries + completed++; + if (completed === total) { + await client.shutdown(0); + grpcServer.clearFaults(); + grpcServer.close(); + resolve(); + } + } else { + await client.shutdown(0); + throw new Error(`Should have failed immediately for ${svc.name}`); + } + })); + } + } + })); + }); + }, +}); + +tests.push({ + name: 'should enforce custom deadline from NSOLID_GRPC_DEADLINE', + test: async (getEnv) => { + return new Promise((resolve) => { + const grpcServer = new GRPCServer(); + grpcServer.start(mustSucceed(async (port) => { + // Inject 2s delay on services + for (const svc of services) { + grpcServer.injectDelay(svc.name, 2000); + } + + const env = { ...getEnv(port), NSOLID_GRPC_DEADLINE: '1' }; + const client = new TestClient([], { env }); + const agentId = await client.id(); + + let completed = 0; + const total = services.length; + + for (const svc of services) { + if (svc.name === 'ExportSpans') { + const timeout = new Promise((resolveTimeout) => setTimeout(() => resolveTimeout('timeout'), 2000)); + const success = new Promise((resolve) => grpcServer.on(svc.event, () => resolve('success'))); + Promise.race([success, timeout]).then(mustCall(async (result) => { + if (result === 'timeout') { + // Good, timed out due to deadline + completed++; + if (completed === total) { + await client.shutdown(0); + grpcServer.clearFaults(); + grpcServer.close(); + resolve(); + } + } else { + throw new Error(`Should have timed out for ${svc.name}`); + } + })); + continue; + } + + const triggerPromise = svc.trigger(client, grpcServer, agentId); + if (svc.name !== 'ExportSpans') { + const timeout = new Promise((resolveTimeout) => setTimeout(() => resolveTimeout('timeout'), 2000)); + Promise.race([triggerPromise, timeout]).then(mustCall(async (result) => { + if (result === 'timeout') { + // Good, timed out due to deadline + completed++; + if (completed === total) { + await client.shutdown(0); + grpcServer.clearFaults(); + grpcServer.close(); + resolve(); + } + } else { + await client.shutdown(0); + throw new Error(`Should have timed out for ${svc.name}`); + } + })); + } + } + })); + }); + }, +}); + +tests.push({ + name: 'should use default 10s deadline when env not set', + test: async (getEnv) => { + return new Promise((resolve) => { + const grpcServer = new GRPCServer(); + grpcServer.start(mustSucceed(async (port) => { + // Inject short delay, should succeed + for (const svc of services) { + grpcServer.injectDelay(svc.name, 500); + } + + const env = getEnv(port); + const client = new TestClient([], { env }); + const agentId = await client.id(); + + let completed = 0; + const total = services.length; + + for (const svc of services) { + if (svc.name === 'ExportSpans') { + grpcServer.on(svc.event, mustCall(async () => { + completed++; + if (completed === total) { + await client.shutdown(0); + grpcServer.clearFaults(); + grpcServer.close(); + resolve(); + } + })); + + await client.trace('http'); + continue; + } + + const promise = svc.trigger(client, grpcServer, agentId); + promise.then(mustCall(async () => { + completed++; + if (completed === total) { + await client.shutdown(0); + grpcServer.clearFaults(); + grpcServer.close(); + resolve(); + } + })); + } + })); + }); + }, +}); + +tests.push({ + name: 'should handle invalid NSOLID_GRPC_DEADLINE gracefully', + test: async (getEnv) => { + return new Promise((resolve) => { + const grpcServer = new GRPCServer(); + grpcServer.start(mustSucceed(async (port) => { + // Inject delay, should succeed with default 10s + for (const svc of services) { + grpcServer.injectDelay(svc.name, 1000); + } + + const env = { ...getEnv(port), NSOLID_GRPC_DEADLINE: 'invalid' }; + const client = new TestClient([], { env }); + const agentId = await client.id(); + + let completed = 0; + const total = services.length; + + for (const svc of services) { + if (svc.name === 'ExportSpans') { + grpcServer.on(svc.event, mustCall(async () => { + completed++; + if (completed === total) { + await client.shutdown(0); + grpcServer.clearFaults(); + grpcServer.close(); + resolve(); + } + })); + } + + const promise = svc.trigger(client, grpcServer, agentId); + if (svc.name !== 'ExportSpans') { + promise.then(mustCall(async () => { + completed++; + if (completed === total) { + await client.shutdown(0); + grpcServer.clearFaults(); + grpcServer.close(); + resolve(); + } + })); + } + } + })); + }); + }, +}); + +const testConfigs = [ + { + getEnv: (port) => ({ + NODE_DEBUG_NATIVE: 'nsolid_grpc_agent', + NSOLID_GRPC: `localhost:${port}`, + NSOLID_GRPC_INSECURE: 1, + NSOLID_TRACING_ENABLED: 1, + }), + }, +]; + +for (const testConfig of testConfigs) { + for (const { name, test } of tests) { + console.log(`[retry-policies] ${name}`); + await test(testConfig.getEnv); + } +} diff --git a/test/agents/test-grpc-tracing.mjs b/test/agents/test-grpc-tracing.mjs index 6bee3d2cab..75ab90315a 100644 --- a/test/agents/test-grpc-tracing.mjs +++ b/test/agents/test-grpc-tracing.mjs @@ -16,7 +16,7 @@ const { function checkResource(resource) { validateArray(resource.attributes, 'resource.attributes'); - assert.strictEqual(resource.attributes.length, 5); + assert.strictEqual(resource.attributes.length, 6); } // traceId: { @@ -207,7 +207,7 @@ tests.push({ if (phase === 'done') return; - mergeResourceSpans(spans, resourceSpans); + mergeResourceSpans(spans.request, resourceSpans); if (phase === 'initial' && resourceSpans.length === 1 && @@ -275,7 +275,7 @@ tests.push({ if (phase === 'done') return; - mergeResourceSpans(spans, resourceSpans); + mergeResourceSpans(spans.request, resourceSpans); if (phase === 'initial' && resourceSpans.length === 1 && diff --git a/test/common/nsolid-grpc-agent/index.js b/test/common/nsolid-grpc-agent/index.js index b2a23ddd01..4185416e48 100644 --- a/test/common/nsolid-grpc-agent/index.js +++ b/test/common/nsolid-grpc-agent/index.js @@ -49,6 +49,7 @@ function checkResource(resource, agentId, config, metrics) { 'telemetry.sdk.name': 'opentelemetry', 'service.instance.id': agentId, 'service.name': config.app, + 'service.version': config.appVersion, }; if (metrics) { @@ -215,11 +216,13 @@ class GRPCServer extends EventEmitter { if (this.#server) { const requestId = randomUUID(); this.#server.send({ type: 'info', agentId, requestId }); - this.#server.once('message', (msg) => { - if (msg.type === 'info') { + const msgListener = (msg) => { + if (msg.type === 'info' && msg.data.msg.common.requestId === requestId) { + this.#server.off('message', msgListener); resolve({ requestId, data: msg.data }); } - }); + }; + this.#server.on('message', msgListener); } else { resolve(null); } @@ -231,11 +234,13 @@ class GRPCServer extends EventEmitter { if (this.#server) { const requestId = randomUUID(); this.#server.send({ type: 'metrics', agentId, requestId }); - this.#server.on('message', (msg) => { - if (msg.type === 'metrics_cmd') { + const msgListener = (msg) => { + if (msg.type === 'metrics_cmd' && msg.data.msg.common.requestId === requestId) { + this.#server.off('message', msgListener); resolve({ requestId, data: msg.data }); } - }); + }; + this.#server.on('message', msgListener); } else { resolve(null); } @@ -311,6 +316,24 @@ class GRPCServer extends EventEmitter { }); } + injectFailure(service, status = 'UNAVAILABLE', count = 1) { + if (this.#server) { + this.#server.send({ type: 'inject_failure', service, status, count }); + } + } + + injectDelay(service, delayMs = 0) { + if (this.#server) { + this.#server.send({ type: 'inject_delay', service, delay: delayMs }); + } + } + + clearFaults() { + if (this.#server) { + this.#server.send({ type: 'clear_faults' }); + } + } + close() { this.#server.send({ type: 'close' }); } diff --git a/test/common/nsolid-grpc-agent/server.mjs b/test/common/nsolid-grpc-agent/server.mjs index 1913e01b04..3bb26cb341 100644 --- a/test/common/nsolid-grpc-agent/server.mjs +++ b/test/common/nsolid-grpc-agent/server.mjs @@ -1,5 +1,6 @@ import assert from 'node:assert'; import path from 'node:path'; +import { setTimeout } from 'node:timers/promises'; import { parseArgs } from 'node:util'; import grpc from '@grpc/grpc-js'; import protoLoader from '@grpc/proto-loader'; @@ -26,6 +27,44 @@ const includeDirs = [path.resolve(import.meta.dirname, const commandCallMap = new Map(); +// Fault injection state +const faultInjections = new Map(); // service -> { status, remaining } +const delayInjections = new Map(); // service -> delayMs + +// Helper to check and inject fault for unary calls +function checkAndInjectFault(serviceName, callback) { + const fault = faultInjections.get(serviceName); + if (fault && fault.remaining > 0) { + fault.remaining--; + const status = grpc.status[fault.status] || grpc.status.UNAVAILABLE; + console.log(`Injecting fault for ${serviceName}`, { code: status, message: `Injected fault: ${fault.status}` }); + callback({ code: status, message: `Injected fault: ${fault.status}` }); + return true; + } + return false; +} + +// Helper to check and inject fault for streaming calls +function checkAndInjectFaultStreaming(serviceName, call) { + const fault = faultInjections.get(serviceName); + if (fault && fault.remaining > 0) { + fault.remaining--; + const status = grpc.status[fault.status] || grpc.status.UNAVAILABLE; + call.destroy({ code: status, message: `Injected fault: ${fault.status}` }); + return true; + } + return false; +} + +// Helper to inject delay for unary calls +async function injectDelay(serviceName, callback) { + const delay = delayInjections.get(serviceName); + if (delay) { + await setTimeout(delay); + } + callback(); +} + // Create a local server to receive data from async function startServer(cb) { const server = new grpc.Server(); @@ -44,8 +83,11 @@ async function startServer(cb) { Export: (data, callback) => { console.dir(data.request, { depth: null }); // console.log('Logs received'); - callback(null, { message: 'Logs received' }); - cb(null, 'logs', data.request); + if (checkAndInjectFault('ExportLogs', callback)) return; + injectDelay('ExportLogs', () => { + callback(null, { message: 'Logs received' }); + cb(null, 'logs', data.request); + }); }, }); @@ -55,8 +97,12 @@ async function startServer(cb) { Export: (data, callback) => { // console.dir(data, { depth: null }); console.log('Metrics received'); - callback(null, { message: 'Metrics received' }); - cb(null, 'metrics', data.request); + if (checkAndInjectFault('ExportMetrics', callback)) return; + injectDelay('ExportMetrics', () => { + callback(null, { message: 'Metrics received' }); + console.dir(data.metadata, { depth: null }); + cb(null, 'metrics', { request: data.request, metadata: data.metadata }); + }); }, }); @@ -64,8 +110,11 @@ async function startServer(cb) { const packageObjectTrace = grpc.loadPackageDefinition(packageDefinitionTrace); server.addService(packageObjectTrace.opentelemetry.proto.collector.trace.v1.TraceService.service, { Export: (data, callback) => { - callback(null, { message: 'Trace received' }); - cb(null, 'spans', data.request); + if (checkAndInjectFault('ExportSpans', callback)) return; + injectDelay('ExportSpans', () => { + callback(null, { message: 'Trace received' }); + cb(null, 'spans', { request: data.request, metadata: data.metadata }); + }); }, }); @@ -91,6 +140,7 @@ async function startServer(cb) { ExportAsset: async (call) => { console.log('ExportAsset'); console.dir(call.metadata, { depth: null }); + if (checkAndInjectFaultStreaming('ExportAsset', call)) return; const asset = { common: null, threadId: null, @@ -122,19 +172,26 @@ async function startServer(cb) { // Extract data from the request object console.dir(call.request, { depth: null }); console.dir(call.metadata, { depth: null }); - callback(null, {}); - process.send({ type: 'loop_blocked', - data: { msg: call.request, metadata: call.metadata } }); + if (checkAndInjectFault('ExportBlockedLoop', callback)) return; + injectDelay('ExportBlockedLoop', () => { + callback(null, {}); + process.send({ type: 'loop_blocked', + data: { msg: call.request, metadata: call.metadata } }); + }); }, ExportCommandError: (call, callback) => { // Extract data from the request object console.dir(call.request, { depth: null }); console.dir(call.metadata, { depth: null }); - callback(null, {}); + if (checkAndInjectFault('ExportCommandError', callback)) return; + injectDelay('ExportCommandError', () => { + callback(null, {}); + }); }, ExportContinuousProfile: async (call) => { console.log('ExportContinuousProfile'); console.dir(call.metadata, { depth: null }); + if (checkAndInjectFaultStreaming('ExportContinuousProfile', call)) return; const asset = { common: null, threadId: null, @@ -169,57 +226,81 @@ async function startServer(cb) { // Extract data from the request object console.dir(call.request, { depth: null }); console.dir(call.metadata, { depth: null }); - callback(null, {}); - process.send({ type: 'exit', data: { msg: call.request, metadata: call.metadata } }); + if (checkAndInjectFault('ExportExit', callback)) return; + injectDelay('ExportExit', () => { + callback(null, {}); + process.send({ type: 'exit', data: { msg: call.request, metadata: call.metadata } }); + }); }, ExportInfo: (call, callback) => { // Extract data from the request object console.dir(call.request, { depth: null }); console.dir(call.metadata, { depth: null }); - callback(null, {}); - process.send({ type: 'info', data: { msg: call.request, metadata: call.metadata } }); + if (checkAndInjectFault('ExportInfo', callback)) return; + injectDelay('ExportInfo', () => { + callback(null, {}); + process.send({ type: 'info', data: { msg: call.request, metadata: call.metadata } }); + }); }, ExportMetrics: (call, callback) => { // Extract data from the request object - console.dir(call.request, { depth: null }); - console.dir(call.metadata, { depth: null }); - callback(null, {}); - process.send({ type: 'metrics_cmd', data: { msg: call.request, metadata: call.metadata } }); + // console.dir(call.request, { depth: null }); + // console.dir(call.metadata, { depth: null }); + if (checkAndInjectFault('ExportMetricsCmd', callback)) return; + injectDelay('ExportMetricsCmd', () => { + callback(null, {}); + process.send({ type: 'metrics_cmd', data: { msg: call.request, metadata: call.metadata } }); + }); }, ExportPackages: (call, callback) => { // Extract data from the request object console.dir(call.request, { depth: null }); console.dir(call.metadata, { depth: null }); - callback(null, {}); - process.send({ type: 'packages', data: { msg: call.request, metadata: call.metadata } }); + if (checkAndInjectFault('ExportPackages', callback)) return; + injectDelay('ExportPackages', () => { + callback(null, {}); + process.send({ type: 'packages', data: { msg: call.request, metadata: call.metadata } }); + }); }, ExportReconfigure: (call, callback) => { // Extract data from the request object console.dir(call.request, { depth: null }); console.dir(call.metadata, { depth: null }); - callback(null, {}); - process.send({ type: 'reconfigure', data: { msg: call.request, metadata: call.metadata } }); + if (checkAndInjectFault('ExportReconfigure', callback)) return; + injectDelay('ExportReconfigure', () => { + callback(null, {}); + process.send({ type: 'reconfigure', data: { msg: call.request, metadata: call.metadata } }); + }); }, ExportSourceCode: (call, callback) => { // Extract data from the request object console.dir(call.request, { depth: null }); console.dir(call.metadata, { depth: null }); - callback(null, {}); - process.send({ type: 'source_code', data: { msg: call.request, metadata: call.metadata } }); + if (checkAndInjectFault('ExportSourceCode', callback)) return; + injectDelay('ExportSourceCode', () => { + callback(null, {}); + process.send({ type: 'source_code', data: { msg: call.request, metadata: call.metadata } }); + }); }, ExportStartupTimes: (call, callback) => { // Extract data from the request object console.dir(call.request, { depth: null }); console.dir(call.metadata, { depth: null }); - callback(null, {}); - process.send({ type: 'startup_times', data: { msg: call.request, metadata: call.metadata } }); + if (checkAndInjectFault('ExportStartupTimes', callback)) return; + injectDelay('ExportStartupTimes', () => { + callback(null, {}); + process.send({ type: 'startup_times', data: { msg: call.request, metadata: call.metadata } }); + }); }, ExportUnblockedLoop: (call, callback) => { // Extract data from the request object console.dir(call.request, { depth: null }); console.dir(call.metadata, { depth: null }); - callback(null, {}); - process.send({ type: 'loop_unblocked', data: { msg: call.request, metadata: call.metadata } }); + if (checkAndInjectFault('ExportUnblockedLoop', callback)) return; + injectDelay('ExportUnblockedLoop', () => { + callback(null, {}); + process.send({ type: 'loop_unblocked', data: { msg: call.request, metadata: call.metadata } }); + }); }, }); @@ -272,6 +353,15 @@ process.on('message', (message) => { sendSourceCode(message.agentId, message.requestId, message.options); } else if (message.type === 'startup_times') { sendStartupTimes(message.agentId, message.requestId); + } else if (message.type === 'inject_failure') { + // Inject failure for a service: { service, status, count } + faultInjections.set(message.service, { status: message.status || 'UNAVAILABLE', remaining: message.count || 1 }); + } else if (message.type === 'inject_delay') { + // Inject delay for a service: { service, delay } + delayInjections.set(message.service, message.delay || 0); + } else if (message.type === 'clear_faults') { + faultInjections.clear(); + delayInjections.clear(); } else if (message.type === 'close') { server.forceShutdown(); process.exit(0); diff --git a/test/parallel/test-nsolid-grpc-heap-profile.js b/test/parallel/test-nsolid-grpc-heap-profile.js index 8edd26fe44..09a4b6ab84 100644 --- a/test/parallel/test-nsolid-grpc-heap-profile.js +++ b/test/parallel/test-nsolid-grpc-heap-profile.js @@ -125,5 +125,5 @@ setTimeout(() => { })); })); })); - }, 100); + }, 2 * 5000); // To make sure all the retries are done }, 100); diff --git a/test/parallel/test-nsolid-grpc-heap-sampling.js b/test/parallel/test-nsolid-grpc-heap-sampling.js index 913ecc65cb..486ce88367 100644 --- a/test/parallel/test-nsolid-grpc-heap-sampling.js +++ b/test/parallel/test-nsolid-grpc-heap-sampling.js @@ -169,5 +169,5 @@ setTimeout(() => { })); }, 100); })); - }, 500); + }, 2 * 5000); // To make sure all the retries are done }, 100); diff --git a/test/parallel/test-nsolid-grpc-profile.js b/test/parallel/test-nsolid-grpc-profile.js index 27c3954140..aa3545911e 100644 --- a/test/parallel/test-nsolid-grpc-profile.js +++ b/test/parallel/test-nsolid-grpc-profile.js @@ -114,5 +114,5 @@ setTimeout(() => { })); }, 100); })); - }, 500); + }, 2 * 5000); // To make sure all the retries are done }, 100);