-
Notifications
You must be signed in to change notification settings - Fork 394
/
Copy pathexecutor.js
222 lines (194 loc) · 8.11 KB
/
executor.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
'use strict';
const utils = require('./utils');
const minRAM = parseInt(process.env.minRAM, 10);
/**
* Execute the given function N times in series or in parallel.
* Then compute execution statistics (average cost and duration).
*/
module.exports.handler = async(event, context) => {
// read input from event
let {
lambdaARN,
value,
num,
enableParallel,
payload,
dryRun,
preProcessorARN,
postProcessorARN,
discardTopBottom,
onlyColdStarts,
sleepBetweenRunsMs,
disablePayloadLogs,
streamresponse,
} = await extractDataFromInput(event);
validateInput(lambdaARN, value, num); // may throw
// force only 1 execution if dryRun
if (dryRun) {
console.log('[Dry-run] forcing num=1');
num = 1;
}
const lambdaAlias = 'RAM' + value;
let results;
// defaulting the index to 0 as the index is required for onlyColdStarts
let aliasToInvoke = utils.buildAliasString(lambdaAlias, onlyColdStarts, 0);
// We need the architecture, regardless of onlyColdStarts or not
const {architecture, isPending} = await utils.getLambdaConfig(lambdaARN, aliasToInvoke);
console.log(`Detected architecture type: ${architecture}, isPending: ${isPending}`);
// pre-generate an array of N payloads
const payloads = utils.generatePayloads(num, payload);
const runInput = {
num: num,
lambdaARN: lambdaARN,
lambdaAlias: lambdaAlias,
payloads: payloads,
preARN: preProcessorARN,
postARN: postProcessorARN,
onlyColdStarts: onlyColdStarts,
sleepBetweenRunsMs: sleepBetweenRunsMs,
disablePayloadLogs: disablePayloadLogs,
streamresponse: streamresponse,
};
// wait if the function/alias state is Pending
// in the case of onlyColdStarts, we will verify each alias in the runInParallel or runInSeries
if (isPending && !onlyColdStarts) {
await utils.waitForAliasActive(lambdaARN, lambdaAlias);
console.log('Alias active');
}
if (enableParallel) {
results = await runInParallel(runInput);
} else {
results = await runInSeries(runInput);
}
// get base cost for Lambda
const baseCost = utils.lambdaBaseCost(utils.regionFromARN(lambdaARN), architecture);
return computeStatistics(baseCost, results, value, discardTopBottom);
};
const validateInput = (lambdaARN, value, num) => {
if (!lambdaARN) {
throw new Error('Missing or empty lambdaARN');
}
if (!value || isNaN(value)) {
throw new Error('Invalid value: ' + value);
}
if (!num || isNaN(num)) {
throw new Error('Invalid num: ' + num);
}
};
const extractPayloadValue = async(input) => {
if (input.payloadS3) {
return await utils.fetchPayloadFromS3(input.payloadS3); // might throw if access denied or 404
} else if (input.payload) {
return input.payload;
}
return null;
};
const extractDiscardTopBottomValue = (event) => {
// extract discardTopBottom used to trim values from average duration
let discardTopBottom = event.discardTopBottom;
if (typeof discardTopBottom === 'undefined') {
// default value for discardTopBottom
discardTopBottom = 0.2;
}
// In case of onlyColdStarts, we only have 1 invocation per alias, therefore we shouldn't discard any execution
if (event.onlyColdStarts){
discardTopBottom = 0;
console.log('Setting discardTopBottom to 0, every invocation should be accounted when onlyColdStarts');
}
// discardTopBottom must be between 0 and 0.4
return Math.min(Math.max(discardTopBottom, 0.0), 0.4);
};
const extractSleepTime = (event) => {
let sleepBetweenRunsMs = event.sleepBetweenRunsMs;
if (isNaN(sleepBetweenRunsMs)) {
sleepBetweenRunsMs = 0;
} else {
sleepBetweenRunsMs = parseInt(sleepBetweenRunsMs, 10);
}
return sleepBetweenRunsMs;
};
const extractDataFromInput = async(event) => {
const input = event.input; // original state machine input
const payload = await extractPayloadValue(input);
const discardTopBottom = extractDiscardTopBottomValue(input);
const sleepBetweenRunsMs = extractSleepTime(input);
return {
value: parseInt(event.value, 10),
lambdaARN: input.lambdaARN,
num: parseInt(input.num, 10),
enableParallel: !!input.parallelInvocation,
payload: payload,
dryRun: input.dryRun === true,
preProcessorARN: input.preProcessorARN,
postProcessorARN: input.postProcessorARN,
discardTopBottom: discardTopBottom,
onlyColdStarts: !!input.onlyColdStarts,
sleepBetweenRunsMs: sleepBetweenRunsMs,
disablePayloadLogs: !!input.disablePayloadLogs,
streamresponse: input.streamresponse === true,
};
};
const runInParallel = async({num, lambdaARN, lambdaAlias, payloads, preARN, postARN, disablePayloadLogs, onlyColdStarts, streamresponse}) => {
const results = [];
// run all invocations in parallel ...
const invocations = utils.range(num).map(async(_, i) => {
let aliasToInvoke = utils.buildAliasString(lambdaAlias, onlyColdStarts, i);
if (onlyColdStarts){
await utils.waitForAliasActive(lambdaARN, aliasToInvoke);
console.log(`${aliasToInvoke} is active`);
}
const {invocationResults, actualPayload} = await utils.invokeLambdaWithProcessors(lambdaARN, aliasToInvoke, payloads[i], preARN, postARN, disablePayloadLogs, streamresponse);
// invocation errors return 200 and contain FunctionError and Payload
if (invocationResults.FunctionError) {
let errorMessage = 'Invocation error (running in parallel)';
utils.handleLambdaInvocationError(errorMessage, invocationResults, actualPayload, disablePayloadLogs);
}
results.push(invocationResults);
});
// ... and wait for results
await Promise.all(invocations);
return results;
};
const runInSeries = async({num, lambdaARN, lambdaAlias, payloads, preARN, postARN, sleepBetweenRunsMs, disablePayloadLogs, onlyColdStarts, streamresponse}) => {
const results = [];
for (let i = 0; i < num; i++) {
let aliasToInvoke = utils.buildAliasString(lambdaAlias, onlyColdStarts, i);
// run invocations in series
if (onlyColdStarts){
await utils.waitForAliasActive(lambdaARN, aliasToInvoke);
console.log(`${aliasToInvoke} is active`);
}
const {invocationResults, actualPayload} = await utils.invokeLambdaWithProcessors(lambdaARN, aliasToInvoke, payloads[i], preARN, postARN, disablePayloadLogs, streamresponse);
// invocation errors return 200 and contain FunctionError and Payload,
if (invocationResults.FunctionError) {
let errorMessage = 'Invocation error (running in series)';
utils.handleLambdaInvocationError(errorMessage, invocationResults, actualPayload, disablePayloadLogs);
}
if (sleepBetweenRunsMs > 0) {
await utils.sleep(sleepBetweenRunsMs);
}
results.push(invocationResults);
}
return results;
};
const computeStatistics = (baseCost, results, value, discardTopBottom) => {
// use results (which include logs) to compute average duration ...
const totalDurations = utils.parseLogAndExtractDurations(results);
const averageDuration = utils.computeAverageDuration(totalDurations, discardTopBottom);
console.log('Average duration: ', averageDuration);
// ... and overall cost statistics
const billedDurations = utils.parseLogAndExtractBilledDurations(results);
const averageBilledDuration = utils.computeAverageDuration(billedDurations, discardTopBottom);
console.log('Average Billed duration: ', averageBilledDuration);
const averagePrice = utils.computePrice(baseCost, minRAM, value, averageBilledDuration);
// .. and total cost (exact $)
const totalCost = utils.computeTotalCost(baseCost, minRAM, value, billedDurations);
const stats = {
averagePrice,
averageDuration,
totalCost,
value,
};
console.log('Stats: ', stats);
return stats;
};