Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public void handleEvent(
}

private void stopApplication(String applicationId) throws InterruptedException {

LOGGER.info("Terminating {} running application: ", applicationId);

List<JobRunSummary> jobRuns = emrServerlessClient.listJobRuns(request -> request.applicationId(applicationId)
Expand All @@ -91,10 +92,12 @@ private void stopApplication(String applicationId) throws InterruptedException {
poll.pollUntil("all EMR Serverless jobs finished", () -> allJobsFinished(applicationId));
}

emrServerlessClient.stopApplication(request -> request.applicationId(applicationId));
if (!isApplicationStopped(applicationId)) {
emrServerlessClient.stopApplication(request -> request.applicationId(applicationId));

LOGGER.info("Waiting for applications to stop");
poll.pollUntil("all EMR Serverless applications stopped", () -> isApplicationStopped(applicationId));
LOGGER.info("Waiting for applications to stop");
poll.pollUntil("all EMR Serverless applications stopped", () -> isApplicationStopped(applicationId));
}
}

private boolean allJobsFinished(String applicationId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.github.tomakehurst.wiremock.junit5.WireMockTest;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.services.emrserverless.model.ApplicationState;
import software.amazon.awssdk.services.emrserverless.model.JobRunState;

import sleeper.core.util.PollWithRetries;
Expand All @@ -34,6 +35,7 @@
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static sleeper.cdk.custom.WiremockEmrServerlessTestHelper.aResponseWithJobRunWithState;
import static sleeper.cdk.custom.WiremockEmrServerlessTestHelper.aResponseWithNoJobRuns;
import static sleeper.cdk.custom.WiremockEmrServerlessTestHelper.aResponseWithStartedApplication;
import static sleeper.cdk.custom.WiremockEmrServerlessTestHelper.aResponseWithStoppedApplication;
import static sleeper.cdk.custom.WiremockEmrServerlessTestHelper.aResponseWithStoppingApplication;
import static sleeper.cdk.custom.WiremockEmrServerlessTestHelper.aResponseWithTerminatedApplication;
Expand Down Expand Up @@ -64,13 +66,22 @@ void shouldTimeOutWhenDeleting(WireMockRuntimeInfo runtimeInfo) throws Exception

// Given
stubFor(listRunningJobsForApplicationRequest(applicationId)
.willReturn(aResponseWithNoJobRuns()));
stubFor(stopApplicationRequest(applicationId).inScenario("StopApplication")
.inScenario("ShouldTimeOut")
.willReturn(aResponseWithStartedApplication(applicationId))
.willSetStateTo(STARTED));
stubFor(getApplicationRequest(applicationId)
.inScenario("ShouldTimeOut")
.willReturn(aResponseWithStartedApplication(applicationId))
.whenScenarioStateIs(STARTED));
stubFor(stopApplicationRequest(applicationId)
.inScenario("ShouldTimeOut")
.willReturn(aResponse().withStatus(200))
.whenScenarioStateIs(STARTED).willSetStateTo("ApplicationStopping"));
stubFor(getApplicationRequest(applicationId).inScenario("StopApplication")
.whenScenarioStateIs(STARTED)
.willSetStateTo(ApplicationState.STOPPING.toString()));
stubFor(getApplicationRequest(applicationId)
.inScenario("ShouldTimeOut")
.willReturn(aResponseWithStoppingApplication(applicationId))
.whenScenarioStateIs("ApplicationStopping"));
.whenScenarioStateIs(ApplicationState.STOPPING.toString()));

// Then
assertThatThrownBy(() -> lambda.handleEvent(applicationEvent(applicationId, "Delete"), null))
Expand All @@ -84,23 +95,34 @@ void shouldTrackStoppingApplication(WireMockRuntimeInfo runtimeInfo) throws Exce

// Given
stubFor(listRunningJobsForApplicationRequest(applicationId)
.willReturn(aResponseWithNoJobRuns()));
stubFor(stopApplicationRequest(applicationId).inScenario("StopApplication")
.inScenario("ShouldTrackStopping")
.willReturn(aResponseWithNoJobRuns())
.willSetStateTo(STARTED));
stubFor(getApplicationRequest(applicationId)
.inScenario("ShouldTrackStopping")
.willReturn(aResponseWithStartedApplication(applicationId))
.whenScenarioStateIs(STARTED));
stubFor(stopApplicationRequest(applicationId)
.inScenario("ShouldTrackStopping")
.willReturn(aResponse().withStatus(200))
.whenScenarioStateIs(STARTED).willSetStateTo("ApplicationStopping"));
stubFor(getApplicationRequest(applicationId).inScenario("StopApplication")
.whenScenarioStateIs(STARTED)
.willSetStateTo(ApplicationState.STOPPING.toString()));
stubFor(getApplicationRequest(applicationId)
.inScenario("ShouldTrackStopping")
.willReturn(aResponseWithStoppingApplication(applicationId))
.whenScenarioStateIs("ApplicationStopping").willSetStateTo("ApplicationStopped"));
stubFor(getApplicationRequest(applicationId).inScenario("StopApplication")
.whenScenarioStateIs(ApplicationState.STOPPING.toString())
.willSetStateTo(ApplicationState.STOPPED.toString()));
stubFor(getApplicationRequest(applicationId)
.inScenario("ShouldTrackStopping")
.willReturn(aResponseWithStoppedApplication(applicationId))
.whenScenarioStateIs("ApplicationStopped"));
.whenScenarioStateIs(ApplicationState.STOPPED.toString()));

// Then
lambda.handleEvent(applicationEvent(applicationId, "Delete"), null);
// Then
verify(4, anyRequestedForEmrServerless());
verify(5, anyRequestedForEmrServerless());
verify(1, stopApplicationRequested(applicationId));
verify(2, getApplicationRequested(applicationId));
verify(3, getApplicationRequested(applicationId));

}

Expand All @@ -110,32 +132,44 @@ void shouldStopEMRServerlessWhenApplicationIsStartedWithRunningJob(WireMockRunti
lambda = lambda(runtimeInfo, PollWithRetries.noRetries());

// Given
stubFor(listRunningJobsForApplicationRequest(applicationId).inScenario("StopJob")
stubFor(listRunningJobsForApplicationRequest(applicationId)
.inScenario("ShouldStopWithJobs")
.willReturn(aResponseWithJobRunWithState(applicationId, jobRunId, JobRunState.RUNNING))
.whenScenarioStateIs(STARTED));
stubFor(cancelJobRunRequest(applicationId, jobRunId).inScenario("StopJob")
.willSetStateTo(STARTED));
stubFor(cancelJobRunRequest(applicationId, jobRunId)
.inScenario("ShouldStopWithJobs")
.willReturn(ResponseDefinitionBuilder.okForEmptyJson())
.whenScenarioStateIs(STARTED).willSetStateTo("JobStopped"));
stubFor(listRunningOrCancellingJobsForApplicationRequest(applicationId).inScenario("StopJob")
.whenScenarioStateIs(STARTED)
.willSetStateTo(JobRunState.CANCELLING.toString()));
stubFor(listRunningOrCancellingJobsForApplicationRequest(applicationId)
.inScenario("ShouldStopWithJobs")
.willReturn(aResponseWithNoJobRuns())
.whenScenarioStateIs("JobStopped"));
stubFor(stopApplicationRequest(applicationId).inScenario("StopJob")
.whenScenarioStateIs(JobRunState.CANCELLING.toString())
.willSetStateTo(JobRunState.CANCELLED.toString()));
stubFor(getApplicationRequest(applicationId)
.inScenario("ShouldStopWithJobs")
.willReturn(aResponseWithStartedApplication(applicationId))
.whenScenarioStateIs(JobRunState.CANCELLED.toString()));
stubFor(stopApplicationRequest(applicationId)
.inScenario("ShouldStopWithJobs")
.willReturn(aResponse().withStatus(200))
.whenScenarioStateIs("JobStopped").willSetStateTo("AppStopped"));
stubFor(getApplicationRequest(applicationId).inScenario("StopJob")
.whenScenarioStateIs(JobRunState.CANCELLED.toString())
.willSetStateTo(ApplicationState.STOPPED.toString()));
stubFor(getApplicationRequest(applicationId)
.inScenario("ShouldStopWithJobs")
.willReturn(aResponseWithTerminatedApplication(applicationId))
.whenScenarioStateIs("AppStopped"));
.whenScenarioStateIs(ApplicationState.STOPPED.toString()));

// When
lambda.handleEvent(applicationEvent(applicationId, "Delete"), null);

// Then
verify(5, anyRequestedForEmrServerless());
verify(6, anyRequestedForEmrServerless());
verify(1, listRunningJobsForApplicationRequested(applicationId));
verify(1, cancelJobRunRequested(applicationId, jobRunId));
verify(1, listRunningOrCancellingJobsForApplicationRequested(applicationId));
verify(1, stopApplicationRequested(applicationId));
verify(1, getApplicationRequested(applicationId));
verify(2, getApplicationRequested(applicationId));
}

@Test
Expand All @@ -145,22 +179,31 @@ void shouldStopEMRServerlessWhenApplicationIsStartedWithNoRunningJobs(WireMockRu

// Given
stubFor(listRunningJobsForApplicationRequest(applicationId)
.willReturn(aResponseWithNoJobRuns()));
stubFor(stopApplicationRequest(applicationId).inScenario("StopApplication")
.inScenario("ShouldStopNoJobs")
.willReturn(aResponseWithNoJobRuns())
.willSetStateTo(STARTED));
stubFor(getApplicationRequest(applicationId)
.inScenario("ShouldStopNoJobs")
.willReturn(aResponseWithStartedApplication(applicationId))
.whenScenarioStateIs(STARTED));
stubFor(stopApplicationRequest(applicationId)
.inScenario("ShouldStopNoJobs")
.willReturn(aResponse().withStatus(200))
.whenScenarioStateIs(STARTED).willSetStateTo("ApplicationStopped"));
stubFor(getApplicationRequest(applicationId).inScenario("StopApplication")
.whenScenarioStateIs(STARTED)
.willSetStateTo(ApplicationState.STOPPED.toString()));
stubFor(getApplicationRequest(applicationId)
.inScenario("ShouldStopNoJobs")
.willReturn(aResponseWithTerminatedApplication(applicationId))
.whenScenarioStateIs("ApplicationStopped"));
.whenScenarioStateIs(ApplicationState.STOPPED.toString()));

// When
lambda.handleEvent(applicationEvent(applicationId, "Delete"), null);

// Then
verify(3, anyRequestedForEmrServerless());
verify(4, anyRequestedForEmrServerless());
verify(1, listRunningJobsForApplicationRequested(applicationId));
verify(1, stopApplicationRequested(applicationId));
verify(1, getApplicationRequested(applicationId));
verify(2, getApplicationRequested(applicationId));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,15 +194,15 @@ public static ResponseDefinitionBuilder aResponseWithTerminatedApplication(Strin
}

/**
* Build an EMR application response for a running application.
* Build an EMR application response for a started application.
*
* @param applicationId the application id
* @return a HTTP response
*/
public static ResponseDefinitionBuilder aResponseWithRunningApplication(String applicationId) {
public static ResponseDefinitionBuilder aResponseWithStartedApplication(String applicationId) {
return aResponse().withStatus(200).withBody("{\"application\":{" +
"\"applicationId\":\"" + applicationId + "\"," +
"\"state\":\"RUNNING\"" +
"\"state\":\"STARTED\"" +
"}}");
}

Expand Down
6 changes: 6 additions & 0 deletions java/cdk/src/main/java/sleeper/cdk/SleeperCdkApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import sleeper.cdk.stack.compaction.CompactionTrackerResources;
import sleeper.cdk.stack.core.AutoDeleteS3ObjectsStack;
import sleeper.cdk.stack.core.AutoStopEcsClusterTasksStack;
import sleeper.cdk.stack.core.AutoStopEmrServerlessApplicationStack;
import sleeper.cdk.stack.core.ConfigBucketStack;
import sleeper.cdk.stack.core.CoreStacks;
import sleeper.cdk.stack.core.LoggingStack;
Expand Down Expand Up @@ -106,6 +107,7 @@ public class SleeperCdkApp extends Stack {
private QueryQueueStack queryQueueStack;
private AutoDeleteS3ObjectsStack autoDeleteS3ObjectsStack;
private AutoStopEcsClusterTasksStack autoStopEcsClusterTasksStack;
private AutoStopEmrServerlessApplicationStack autoStopEmrServerlessApplicationStack;
private LoggingStack loggingStack;

// These flags are used to control when the stacks are deployed in the SystemTest CDK app.
Expand Down Expand Up @@ -149,6 +151,9 @@ public void create() {
// Auto stop ECS cluster tasks stack
autoStopEcsClusterTasksStack = new AutoStopEcsClusterTasksStack(this, "AutoStopEcsClusterTasks", instanceProperties, jars, loggingStack);

// Auto stop EMR Serverless application stack
autoStopEmrServerlessApplicationStack = new AutoStopEmrServerlessApplicationStack(this, "AutoStopEmrServerlessApplication", instanceProperties, jars, loggingStack);

// Stacks for tables
ManagedPoliciesStack policiesStack = new ManagedPoliciesStack(this, "Policies", instanceProperties);
TableDataStack dataStack = new TableDataStack(this, "TableData", instanceProperties, loggingStack, policiesStack, autoDeleteS3ObjectsStack, jars);
Expand Down Expand Up @@ -198,6 +203,7 @@ public void create() {
topicStack.getTopic(),
bulkImportBucketStack,
coreStacks,
autoStopEmrServerlessApplicationStack,
errorMetrics);

// Stack to created EMR studio to be used to access EMR Serverless
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import sleeper.bulkimport.core.configuration.BulkImportPlatform;
import sleeper.cdk.jars.BuiltJars;
import sleeper.cdk.jars.LambdaCode;
import sleeper.cdk.stack.core.AutoStopEmrServerlessApplicationStack;
import sleeper.cdk.stack.core.CoreStacks;
import sleeper.cdk.stack.core.LoggingStack.LogGroupRef;
import sleeper.cdk.util.Utils;
Expand Down Expand Up @@ -96,11 +97,12 @@ public EmrServerlessBulkImportStack(
Topic errorsTopic,
BulkImportBucketStack importBucketStack,
CoreStacks coreStacks,
AutoStopEmrServerlessApplicationStack autoStopEmrServerlessApplicationStack,
List<IMetric> errorMetrics) {
super(scope, id);
IBucket jarsBucket = Bucket.fromBucketName(scope, "JarsBucket", instanceProperties.get(JARS_BUCKET));
LambdaCode lambdaCode = jars.lambdaCode(jarsBucket);
createEmrServerlessApplication(instanceProperties);
createEmrServerlessApplication(instanceProperties, autoStopEmrServerlessApplicationStack);
IRole emrRole = createEmrServerlessRole(
instanceProperties, importBucketStack, coreStacks, jarsBucket);
CommonEmrBulkImportHelper commonHelper = new CommonEmrBulkImportHelper(this,
Expand Down Expand Up @@ -135,7 +137,7 @@ private static void configureJobStarterFunction(InstanceProperties instancePrope
.build());
}

public void createEmrServerlessApplication(InstanceProperties instanceProperties) {
public void createEmrServerlessApplication(InstanceProperties instanceProperties, AutoStopEmrServerlessApplicationStack autoStopEmrServerlessApplicationStack) {
CfnApplication emrServerlessCluster = CfnApplication.Builder.create(this, "BulkImportEMRServerless")
.name(String.join("-", "sleeper", Utils.cleanInstanceId(instanceProperties)))
.releaseLabel(instanceProperties.get(BULK_IMPORT_EMR_SERVERLESS_RELEASE))
Expand All @@ -155,6 +157,8 @@ public void createEmrServerlessApplication(InstanceProperties instanceProperties
emrServerlessCluster.getName());
instanceProperties.set(BULK_IMPORT_EMR_SERVERLESS_APPLICATION_ID,
emrServerlessCluster.getAttrApplicationId());

autoStopEmrServerlessApplicationStack.addAutoStopEmrServerlessApplication(this, emrServerlessCluster);
}

private String createSecurityGroup(InstanceProperties instanceProperties) {
Expand Down
Loading
Loading