Skip to content

Commit 836b6b8

Browse files
authored
Merge pull request #5846 from gchq/5810-add-new-shutdown-emr-serverless-application-lambda-into-the-cdk-app
5810: Add new shutdown emr serverless application lambda into the cdk app
2 parents 47e8852 + 88d3094 commit 836b6b8

File tree

8 files changed

+213
-41
lines changed

8 files changed

+213
-41
lines changed

java/cdk-custom-resources/src/main/java/sleeper/cdk/custom/AutoStopEmrServerlessApplicationLambda.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ public void handleEvent(
7777
}
7878

7979
private void stopApplication(String applicationId) throws InterruptedException {
80+
8081
LOGGER.info("Terminating {} running application: ", applicationId);
8182

8283
List<JobRunSummary> jobRuns = emrServerlessClient.listJobRuns(request -> request.applicationId(applicationId)
@@ -91,10 +92,12 @@ private void stopApplication(String applicationId) throws InterruptedException {
9192
poll.pollUntil("all EMR Serverless jobs finished", () -> allJobsFinished(applicationId));
9293
}
9394

94-
emrServerlessClient.stopApplication(request -> request.applicationId(applicationId));
95+
if (!isApplicationStopped(applicationId)) {
96+
emrServerlessClient.stopApplication(request -> request.applicationId(applicationId));
9597

96-
LOGGER.info("Waiting for applications to stop");
97-
poll.pollUntil("all EMR Serverless applications stopped", () -> isApplicationStopped(applicationId));
98+
LOGGER.info("Waiting for applications to stop");
99+
poll.pollUntil("all EMR Serverless applications stopped", () -> isApplicationStopped(applicationId));
100+
}
98101
}
99102

100103
private boolean allJobsFinished(String applicationId) {

java/cdk-custom-resources/src/test/java/sleeper/cdk/custom/AutoStopEmrServerlessApplicationLambdaIT.java

Lines changed: 76 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.github.tomakehurst.wiremock.junit5.WireMockTest;
2222
import org.junit.jupiter.api.DisplayName;
2323
import org.junit.jupiter.api.Test;
24+
import software.amazon.awssdk.services.emrserverless.model.ApplicationState;
2425
import software.amazon.awssdk.services.emrserverless.model.JobRunState;
2526

2627
import sleeper.core.util.PollWithRetries;
@@ -34,6 +35,7 @@
3435
import static org.assertj.core.api.Assertions.assertThatThrownBy;
3536
import static sleeper.cdk.custom.WiremockEmrServerlessTestHelper.aResponseWithJobRunWithState;
3637
import static sleeper.cdk.custom.WiremockEmrServerlessTestHelper.aResponseWithNoJobRuns;
38+
import static sleeper.cdk.custom.WiremockEmrServerlessTestHelper.aResponseWithStartedApplication;
3739
import static sleeper.cdk.custom.WiremockEmrServerlessTestHelper.aResponseWithStoppedApplication;
3840
import static sleeper.cdk.custom.WiremockEmrServerlessTestHelper.aResponseWithStoppingApplication;
3941
import static sleeper.cdk.custom.WiremockEmrServerlessTestHelper.aResponseWithTerminatedApplication;
@@ -64,13 +66,22 @@ void shouldTimeOutWhenDeleting(WireMockRuntimeInfo runtimeInfo) throws Exception
6466

6567
// Given
6668
stubFor(listRunningJobsForApplicationRequest(applicationId)
67-
.willReturn(aResponseWithNoJobRuns()));
68-
stubFor(stopApplicationRequest(applicationId).inScenario("StopApplication")
69+
.inScenario("ShouldTimeOut")
70+
.willReturn(aResponseWithStartedApplication(applicationId))
71+
.willSetStateTo(STARTED));
72+
stubFor(getApplicationRequest(applicationId)
73+
.inScenario("ShouldTimeOut")
74+
.willReturn(aResponseWithStartedApplication(applicationId))
75+
.whenScenarioStateIs(STARTED));
76+
stubFor(stopApplicationRequest(applicationId)
77+
.inScenario("ShouldTimeOut")
6978
.willReturn(aResponse().withStatus(200))
70-
.whenScenarioStateIs(STARTED).willSetStateTo("ApplicationStopping"));
71-
stubFor(getApplicationRequest(applicationId).inScenario("StopApplication")
79+
.whenScenarioStateIs(STARTED)
80+
.willSetStateTo(ApplicationState.STOPPING.toString()));
81+
stubFor(getApplicationRequest(applicationId)
82+
.inScenario("ShouldTimeOut")
7283
.willReturn(aResponseWithStoppingApplication(applicationId))
73-
.whenScenarioStateIs("ApplicationStopping"));
84+
.whenScenarioStateIs(ApplicationState.STOPPING.toString()));
7485

7586
// Then
7687
assertThatThrownBy(() -> lambda.handleEvent(applicationEvent(applicationId, "Delete"), null))
@@ -84,23 +95,34 @@ void shouldTrackStoppingApplication(WireMockRuntimeInfo runtimeInfo) throws Exce
8495

8596
// Given
8697
stubFor(listRunningJobsForApplicationRequest(applicationId)
87-
.willReturn(aResponseWithNoJobRuns()));
88-
stubFor(stopApplicationRequest(applicationId).inScenario("StopApplication")
98+
.inScenario("ShouldTrackStopping")
99+
.willReturn(aResponseWithNoJobRuns())
100+
.willSetStateTo(STARTED));
101+
stubFor(getApplicationRequest(applicationId)
102+
.inScenario("ShouldTrackStopping")
103+
.willReturn(aResponseWithStartedApplication(applicationId))
104+
.whenScenarioStateIs(STARTED));
105+
stubFor(stopApplicationRequest(applicationId)
106+
.inScenario("ShouldTrackStopping")
89107
.willReturn(aResponse().withStatus(200))
90-
.whenScenarioStateIs(STARTED).willSetStateTo("ApplicationStopping"));
91-
stubFor(getApplicationRequest(applicationId).inScenario("StopApplication")
108+
.whenScenarioStateIs(STARTED)
109+
.willSetStateTo(ApplicationState.STOPPING.toString()));
110+
stubFor(getApplicationRequest(applicationId)
111+
.inScenario("ShouldTrackStopping")
92112
.willReturn(aResponseWithStoppingApplication(applicationId))
93-
.whenScenarioStateIs("ApplicationStopping").willSetStateTo("ApplicationStopped"));
94-
stubFor(getApplicationRequest(applicationId).inScenario("StopApplication")
113+
.whenScenarioStateIs(ApplicationState.STOPPING.toString())
114+
.willSetStateTo(ApplicationState.STOPPED.toString()));
115+
stubFor(getApplicationRequest(applicationId)
116+
.inScenario("ShouldTrackStopping")
95117
.willReturn(aResponseWithStoppedApplication(applicationId))
96-
.whenScenarioStateIs("ApplicationStopped"));
118+
.whenScenarioStateIs(ApplicationState.STOPPED.toString()));
97119

98120
// Then
99121
lambda.handleEvent(applicationEvent(applicationId, "Delete"), null);
100122
// Then
101-
verify(4, anyRequestedForEmrServerless());
123+
verify(5, anyRequestedForEmrServerless());
102124
verify(1, stopApplicationRequested(applicationId));
103-
verify(2, getApplicationRequested(applicationId));
125+
verify(3, getApplicationRequested(applicationId));
104126

105127
}
106128

@@ -110,32 +132,44 @@ void shouldStopEMRServerlessWhenApplicationIsStartedWithRunningJob(WireMockRunti
110132
lambda = lambda(runtimeInfo, PollWithRetries.noRetries());
111133

112134
// Given
113-
stubFor(listRunningJobsForApplicationRequest(applicationId).inScenario("StopJob")
135+
stubFor(listRunningJobsForApplicationRequest(applicationId)
136+
.inScenario("ShouldStopWithJobs")
114137
.willReturn(aResponseWithJobRunWithState(applicationId, jobRunId, JobRunState.RUNNING))
115-
.whenScenarioStateIs(STARTED));
116-
stubFor(cancelJobRunRequest(applicationId, jobRunId).inScenario("StopJob")
138+
.willSetStateTo(STARTED));
139+
stubFor(cancelJobRunRequest(applicationId, jobRunId)
140+
.inScenario("ShouldStopWithJobs")
117141
.willReturn(ResponseDefinitionBuilder.okForEmptyJson())
118-
.whenScenarioStateIs(STARTED).willSetStateTo("JobStopped"));
119-
stubFor(listRunningOrCancellingJobsForApplicationRequest(applicationId).inScenario("StopJob")
142+
.whenScenarioStateIs(STARTED)
143+
.willSetStateTo(JobRunState.CANCELLING.toString()));
144+
stubFor(listRunningOrCancellingJobsForApplicationRequest(applicationId)
145+
.inScenario("ShouldStopWithJobs")
120146
.willReturn(aResponseWithNoJobRuns())
121-
.whenScenarioStateIs("JobStopped"));
122-
stubFor(stopApplicationRequest(applicationId).inScenario("StopJob")
147+
.whenScenarioStateIs(JobRunState.CANCELLING.toString())
148+
.willSetStateTo(JobRunState.CANCELLED.toString()));
149+
stubFor(getApplicationRequest(applicationId)
150+
.inScenario("ShouldStopWithJobs")
151+
.willReturn(aResponseWithStartedApplication(applicationId))
152+
.whenScenarioStateIs(JobRunState.CANCELLED.toString()));
153+
stubFor(stopApplicationRequest(applicationId)
154+
.inScenario("ShouldStopWithJobs")
123155
.willReturn(aResponse().withStatus(200))
124-
.whenScenarioStateIs("JobStopped").willSetStateTo("AppStopped"));
125-
stubFor(getApplicationRequest(applicationId).inScenario("StopJob")
156+
.whenScenarioStateIs(JobRunState.CANCELLED.toString())
157+
.willSetStateTo(ApplicationState.STOPPED.toString()));
158+
stubFor(getApplicationRequest(applicationId)
159+
.inScenario("ShouldStopWithJobs")
126160
.willReturn(aResponseWithTerminatedApplication(applicationId))
127-
.whenScenarioStateIs("AppStopped"));
161+
.whenScenarioStateIs(ApplicationState.STOPPED.toString()));
128162

129163
// When
130164
lambda.handleEvent(applicationEvent(applicationId, "Delete"), null);
131165

132166
// Then
133-
verify(5, anyRequestedForEmrServerless());
167+
verify(6, anyRequestedForEmrServerless());
134168
verify(1, listRunningJobsForApplicationRequested(applicationId));
135169
verify(1, cancelJobRunRequested(applicationId, jobRunId));
136170
verify(1, listRunningOrCancellingJobsForApplicationRequested(applicationId));
137171
verify(1, stopApplicationRequested(applicationId));
138-
verify(1, getApplicationRequested(applicationId));
172+
verify(2, getApplicationRequested(applicationId));
139173
}
140174

141175
@Test
@@ -145,22 +179,31 @@ void shouldStopEMRServerlessWhenApplicationIsStartedWithNoRunningJobs(WireMockRu
145179

146180
// Given
147181
stubFor(listRunningJobsForApplicationRequest(applicationId)
148-
.willReturn(aResponseWithNoJobRuns()));
149-
stubFor(stopApplicationRequest(applicationId).inScenario("StopApplication")
182+
.inScenario("ShouldStopNoJobs")
183+
.willReturn(aResponseWithNoJobRuns())
184+
.willSetStateTo(STARTED));
185+
stubFor(getApplicationRequest(applicationId)
186+
.inScenario("ShouldStopNoJobs")
187+
.willReturn(aResponseWithStartedApplication(applicationId))
188+
.whenScenarioStateIs(STARTED));
189+
stubFor(stopApplicationRequest(applicationId)
190+
.inScenario("ShouldStopNoJobs")
150191
.willReturn(aResponse().withStatus(200))
151-
.whenScenarioStateIs(STARTED).willSetStateTo("ApplicationStopped"));
152-
stubFor(getApplicationRequest(applicationId).inScenario("StopApplication")
192+
.whenScenarioStateIs(STARTED)
193+
.willSetStateTo(ApplicationState.STOPPED.toString()));
194+
stubFor(getApplicationRequest(applicationId)
195+
.inScenario("ShouldStopNoJobs")
153196
.willReturn(aResponseWithTerminatedApplication(applicationId))
154-
.whenScenarioStateIs("ApplicationStopped"));
197+
.whenScenarioStateIs(ApplicationState.STOPPED.toString()));
155198

156199
// When
157200
lambda.handleEvent(applicationEvent(applicationId, "Delete"), null);
158201

159202
// Then
160-
verify(3, anyRequestedForEmrServerless());
203+
verify(4, anyRequestedForEmrServerless());
161204
verify(1, listRunningJobsForApplicationRequested(applicationId));
162205
verify(1, stopApplicationRequested(applicationId));
163-
verify(1, getApplicationRequested(applicationId));
206+
verify(2, getApplicationRequested(applicationId));
164207
}
165208

166209
@Test

java/cdk-custom-resources/src/test/java/sleeper/cdk/custom/WiremockEmrServerlessTestHelper.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -194,15 +194,15 @@ public static ResponseDefinitionBuilder aResponseWithTerminatedApplication(Strin
194194
}
195195

196196
/**
197-
* Build an EMR application response for a running application.
197+
* Build an EMR application response for a started application.
198198
*
199199
* @param applicationId the application id
200200
* @return a HTTP response
201201
*/
202-
public static ResponseDefinitionBuilder aResponseWithRunningApplication(String applicationId) {
202+
public static ResponseDefinitionBuilder aResponseWithStartedApplication(String applicationId) {
203203
return aResponse().withStatus(200).withBody("{\"application\":{" +
204204
"\"applicationId\":\"" + applicationId + "\"," +
205-
"\"state\":\"RUNNING\"" +
205+
"\"state\":\"STARTED\"" +
206206
"}}");
207207
}
208208

java/cdk/src/main/java/sleeper/cdk/SleeperCdkApp.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import sleeper.cdk.stack.compaction.CompactionTrackerResources;
4646
import sleeper.cdk.stack.core.AutoDeleteS3ObjectsStack;
4747
import sleeper.cdk.stack.core.AutoStopEcsClusterTasksStack;
48+
import sleeper.cdk.stack.core.AutoStopEmrServerlessApplicationStack;
4849
import sleeper.cdk.stack.core.ConfigBucketStack;
4950
import sleeper.cdk.stack.core.CoreStacks;
5051
import sleeper.cdk.stack.core.LoggingStack;
@@ -106,6 +107,7 @@ public class SleeperCdkApp extends Stack {
106107
private QueryQueueStack queryQueueStack;
107108
private AutoDeleteS3ObjectsStack autoDeleteS3ObjectsStack;
108109
private AutoStopEcsClusterTasksStack autoStopEcsClusterTasksStack;
110+
private AutoStopEmrServerlessApplicationStack autoStopEmrServerlessApplicationStack;
109111
private LoggingStack loggingStack;
110112

111113
// These flags are used to control when the stacks are deployed in the SystemTest CDK app.
@@ -149,6 +151,9 @@ public void create() {
149151
// Auto stop ECS cluster tasks stack
150152
autoStopEcsClusterTasksStack = new AutoStopEcsClusterTasksStack(this, "AutoStopEcsClusterTasks", instanceProperties, jars, loggingStack);
151153

154+
// Auto stop EMR Serverless application stack
155+
autoStopEmrServerlessApplicationStack = new AutoStopEmrServerlessApplicationStack(this, "AutoStopEmrServerlessApplication", instanceProperties, jars, loggingStack);
156+
152157
// Stacks for tables
153158
ManagedPoliciesStack policiesStack = new ManagedPoliciesStack(this, "Policies", instanceProperties);
154159
TableDataStack dataStack = new TableDataStack(this, "TableData", instanceProperties, loggingStack, policiesStack, autoDeleteS3ObjectsStack, jars);
@@ -198,6 +203,7 @@ public void create() {
198203
topicStack.getTopic(),
199204
bulkImportBucketStack,
200205
coreStacks,
206+
autoStopEmrServerlessApplicationStack,
201207
errorMetrics);
202208

203209
// Stack to created EMR studio to be used to access EMR Serverless

java/cdk/src/main/java/sleeper/cdk/stack/bulkimport/EmrServerlessBulkImportStack.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import sleeper.bulkimport.core.configuration.BulkImportPlatform;
4848
import sleeper.cdk.jars.BuiltJars;
4949
import sleeper.cdk.jars.LambdaCode;
50+
import sleeper.cdk.stack.core.AutoStopEmrServerlessApplicationStack;
5051
import sleeper.cdk.stack.core.CoreStacks;
5152
import sleeper.cdk.stack.core.LoggingStack.LogGroupRef;
5253
import sleeper.cdk.util.Utils;
@@ -96,11 +97,12 @@ public EmrServerlessBulkImportStack(
9697
Topic errorsTopic,
9798
BulkImportBucketStack importBucketStack,
9899
CoreStacks coreStacks,
100+
AutoStopEmrServerlessApplicationStack autoStopEmrServerlessApplicationStack,
99101
List<IMetric> errorMetrics) {
100102
super(scope, id);
101103
IBucket jarsBucket = Bucket.fromBucketName(scope, "JarsBucket", instanceProperties.get(JARS_BUCKET));
102104
LambdaCode lambdaCode = jars.lambdaCode(jarsBucket);
103-
createEmrServerlessApplication(instanceProperties);
105+
createEmrServerlessApplication(instanceProperties, autoStopEmrServerlessApplicationStack);
104106
IRole emrRole = createEmrServerlessRole(
105107
instanceProperties, importBucketStack, coreStacks, jarsBucket);
106108
CommonEmrBulkImportHelper commonHelper = new CommonEmrBulkImportHelper(this,
@@ -135,7 +137,7 @@ private static void configureJobStarterFunction(InstanceProperties instancePrope
135137
.build());
136138
}
137139

138-
public void createEmrServerlessApplication(InstanceProperties instanceProperties) {
140+
public void createEmrServerlessApplication(InstanceProperties instanceProperties, AutoStopEmrServerlessApplicationStack autoStopEmrServerlessApplicationStack) {
139141
CfnApplication emrServerlessCluster = CfnApplication.Builder.create(this, "BulkImportEMRServerless")
140142
.name(String.join("-", "sleeper", Utils.cleanInstanceId(instanceProperties)))
141143
.releaseLabel(instanceProperties.get(BULK_IMPORT_EMR_SERVERLESS_RELEASE))
@@ -155,6 +157,8 @@ public void createEmrServerlessApplication(InstanceProperties instanceProperties
155157
emrServerlessCluster.getName());
156158
instanceProperties.set(BULK_IMPORT_EMR_SERVERLESS_APPLICATION_ID,
157159
emrServerlessCluster.getAttrApplicationId());
160+
161+
autoStopEmrServerlessApplicationStack.addAutoStopEmrServerlessApplication(this, emrServerlessCluster);
158162
}
159163

160164
private String createSecurityGroup(InstanceProperties instanceProperties) {

0 commit comments

Comments
 (0)