From 262165833e2857f7985f6991c11c8d1403dd3744 Mon Sep 17 00:00:00 2001 From: Lavanya0513 <145364950+Lavanya0513@users.noreply.github.com> Date: Tue, 28 Jan 2025 21:39:07 -0500 Subject: [PATCH 1/2] Added support for Streaming Lambda --- lambda/executor.js | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/lambda/executor.js b/lambda/executor.js index 9b7034b..dbe3c5d 100644 --- a/lambda/executor.js +++ b/lambda/executor.js @@ -23,8 +23,10 @@ module.exports.handler = async(event, context) => { onlyColdStarts, sleepBetweenRunsMs, disablePayloadLogs, + streamresponse, } = await extractDataFromInput(event); + validateInput(lambdaARN, value, num); // may throw // force only 1 execution if dryRun @@ -56,6 +58,7 @@ module.exports.handler = async(event, context) => { onlyColdStarts: onlyColdStarts, sleepBetweenRunsMs: sleepBetweenRunsMs, disablePayloadLogs: disablePayloadLogs, + streamresponse: streamresponse, }; // wait if the function/alias state is Pending @@ -143,10 +146,11 @@ const extractDataFromInput = async(event) => { onlyColdStarts: !!input.onlyColdStarts, sleepBetweenRunsMs: sleepBetweenRunsMs, disablePayloadLogs: !!input.disablePayloadLogs, + streamresponse: input.streamresponse === true, }; }; -const runInParallel = async({num, lambdaARN, lambdaAlias, payloads, preARN, postARN, disablePayloadLogs, onlyColdStarts}) => { +const runInParallel = async({num, lambdaARN, lambdaAlias, payloads, preARN, postARN, disablePayloadLogs, onlyColdStarts, streamresponse}) => { const results = []; // run all invocations in parallel ... const invocations = utils.range(num).map(async(_, i) => { @@ -155,7 +159,7 @@ const runInParallel = async({num, lambdaARN, lambdaAlias, payloads, preARN, post await utils.waitForAliasActive(lambdaARN, aliasToInvoke); console.log(`${aliasToInvoke} is active`); } - const {invocationResults, actualPayload} = await utils.invokeLambdaWithProcessors(lambdaARN, aliasToInvoke, payloads[i], preARN, postARN, disablePayloadLogs); + const {invocationResults, actualPayload} = await utils.invokeLambdaWithProcessors(lambdaARN, aliasToInvoke, payloads[i], preARN, postARN, disablePayloadLogs, streamresponse); // invocation errors return 200 and contain FunctionError and Payload if (invocationResults.FunctionError) { let errorMessage = 'Invocation error (running in parallel)'; @@ -168,7 +172,7 @@ const runInParallel = async({num, lambdaARN, lambdaAlias, payloads, preARN, post return results; }; -const runInSeries = async({num, lambdaARN, lambdaAlias, payloads, preARN, postARN, sleepBetweenRunsMs, disablePayloadLogs, onlyColdStarts}) => { +const runInSeries = async({num, lambdaARN, lambdaAlias, payloads, preARN, postARN, sleepBetweenRunsMs, disablePayloadLogs, onlyColdStarts, streamresponse}) => { const results = []; for (let i = 0; i < num; i++) { let aliasToInvoke = utils.buildAliasString(lambdaAlias, onlyColdStarts, i); @@ -177,8 +181,8 @@ const runInSeries = async({num, lambdaARN, lambdaAlias, payloads, preARN, postAR await utils.waitForAliasActive(lambdaARN, aliasToInvoke); console.log(`${aliasToInvoke} is active`); } - const {invocationResults, actualPayload} = await utils.invokeLambdaWithProcessors(lambdaARN, aliasToInvoke, payloads[i], preARN, postARN, disablePayloadLogs); - // invocation errors return 200 and contain FunctionError and Payload + const {invocationResults, actualPayload} = await utils.invokeLambdaWithProcessors(lambdaARN, aliasToInvoke, payloads[i], preARN, postARN, disablePayloadLogs, streamresponse); + // invocation errors return 200 and contain FunctionError and Payload, if (invocationResults.FunctionError) { let errorMessage = 'Invocation error (running in series)'; utils.handleLambdaInvocationError(errorMessage, invocationResults, actualPayload, disablePayloadLogs); From 1fe00c3eafad60cf87ff02cd4577f09b68a89c69 Mon Sep 17 00:00:00 2001 From: Lavanya0513 <145364950+Lavanya0513@users.noreply.github.com> Date: Tue, 28 Jan 2025 21:40:05 -0500 Subject: [PATCH 2/2] Added support for Streaming Lambda --- lambda/utils.js | 60 +++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 56 insertions(+), 4 deletions(-) diff --git a/lambda/utils.js b/lambda/utils.js index 745c58d..9786525 100644 --- a/lambda/utils.js +++ b/lambda/utils.js @@ -4,7 +4,7 @@ const { CreateAliasCommand, DeleteAliasCommand, DeleteFunctionCommand, GetAliasCommand, GetFunctionConfigurationCommand, InvokeCommand, LambdaClient, PublishVersionCommand, UpdateAliasCommand, UpdateFunctionConfigurationCommand, - waitUntilFunctionActive, waitUntilFunctionUpdated, ResourceNotFoundException, + waitUntilFunctionActive, waitUntilFunctionUpdated, ResourceNotFoundException,InvokeWithResponseStreamCommand, } = require('@aws-sdk/client-lambda'); const { GetObjectCommand, S3Client } = require('@aws-sdk/client-s3'); const url = require('url'); @@ -315,7 +315,7 @@ module.exports.invokeLambdaProcessor = async(processorARN, payload, preOrPost = /** * Wrapper around Lambda function invocation with pre/post-processor functions. */ -module.exports.invokeLambdaWithProcessors = async(lambdaARN, alias, payload, preARN, postARN, disablePayloadLogs) => { +module.exports.invokeLambdaWithProcessors = async(lambdaARN, alias, payload, preARN, postARN, disablePayloadLogs,streamresponse) => { var actualPayload = payload; // might change based on pre-processor @@ -330,7 +330,7 @@ module.exports.invokeLambdaWithProcessors = async(lambdaARN, alias, payload, pre } // invoke function to be power-tuned - const invocationResults = await utils.invokeLambda(lambdaARN, alias, actualPayload, disablePayloadLogs); + const invocationResults = await utils.invokeLambda(lambdaARN, alias, actualPayload, disablePayloadLogs,streamresponse ) // then invoke post-processor, if provided if (postARN) { @@ -348,7 +348,7 @@ module.exports.invokeLambdaWithProcessors = async(lambdaARN, alias, payload, pre /** * Invoke a given Lambda Function:Alias with payload and return its logs. */ -module.exports.invokeLambda = (lambdaARN, alias, payload, disablePayloadLogs) => { +/** module.exports.invokeLambda = (lambdaARN, alias, payload, disablePayloadLogs) => { let consoleLogMessage = `Invoking function ${lambdaARN}:${alias || '$LATEST'}`; if (!disablePayloadLogs) { consoleLogMessage += ` with payload ${JSON.stringify(payload)}`; @@ -362,7 +362,59 @@ module.exports.invokeLambda = (lambdaARN, alias, payload, disablePayloadLogs) => }; const lambda = utils.lambdaClientFromARN(lambdaARN); return lambda.send(new InvokeCommand(params)); +}; */ +function Decodeuint8arr(uint8array) { + return new TextDecoder("utf-8").decode(uint8array); +} +module.exports.invokeLambda = async (lambdaARN, alias, payload, disablePayloadLogs, streamresponse) => { + let consoleLogMessage = `Invoking function ${lambdaARN}:${alias || '$LATEST'}`; + if (!disablePayloadLogs) { + consoleLogMessage += ` with payload ${JSON.stringify(payload)}`; + } + console.log(consoleLogMessage); + + const params = { + FunctionName: lambdaARN, + Qualifier: alias, + Payload: payload, + LogType: 'Tail', // will return logs + }; + const lambda = utils.lambdaClientFromARN(lambdaARN); + if (streamresponse){ + const command = new InvokeWithResponseStreamCommand(params); + const response = await lambda.send(command); + + let streampayload=''; + let logres=''; + + for await (const item of response.EventStream) { + if (typeof item.PayloadChunk !== "undefined") { + streampayload+=Decodeuint8arr(item.PayloadChunk.Payload) + + } + if (typeof item.InvokeComplete !== "undefined") { + const buff = Buffer.from(item.InvokeComplete.LogResult, 'base64'); + console.log("Logs:", buff.toString("utf-8")); + logres=item.InvokeComplete.LogResult; + } + } + return { + Payload: streampayload, + LogResult: logres + }; + + } + else + { + return lambda.send(new InvokeCommand(params)); + + } + + + }; + + /** * Handle a Lambda invocation error and generate an error message containing original error type, message and trace.