diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/MongoJobExecutionDao.java b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/MongoJobExecutionDao.java index da1d81ff78..26431df50d 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/MongoJobExecutionDao.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/MongoJobExecutionDao.java @@ -23,6 +23,7 @@ import org.springframework.batch.core.JobInstance; import org.springframework.batch.core.repository.persistence.converter.JobExecutionConverter; import org.springframework.batch.core.repository.persistence.converter.JobInstanceConverter; +import org.springframework.dao.OptimisticLockingFailureException; import org.springframework.data.domain.Sort; import org.springframework.data.mongodb.core.MongoOperations; import org.springframework.data.mongodb.core.query.Query; @@ -33,6 +34,7 @@ /** * @author Mahmoud Ben Hassine + * @author Yanming Zhou * @since 5.2.0 */ public class MongoJobExecutionDao implements JobExecutionDao { @@ -72,10 +74,16 @@ public void saveJobExecution(JobExecution jobExecution) { @Override public void updateJobExecution(JobExecution jobExecution) { - Query query = query(where("jobExecutionId").is(jobExecution.getId())); + Query query = query( + where("jobExecutionId").is(jobExecution.getId()).and("version").is(jobExecution.getVersion())); org.springframework.batch.core.repository.persistence.JobExecution jobExecutionToUpdate = this.jobExecutionConverter .fromJobExecution(jobExecution); - this.mongoOperations.findAndReplace(query, jobExecutionToUpdate, JOB_EXECUTIONS_COLLECTION_NAME); + jobExecutionToUpdate.incrementVersion(); + if (this.mongoOperations.findAndReplace(query, jobExecutionToUpdate, JOB_EXECUTIONS_COLLECTION_NAME) == null) { + throw new OptimisticLockingFailureException("Attempt to update step execution id=" + jobExecution.getId() + + " with wrong version (" + jobExecution.getVersion() + ")"); + } + jobExecution.incrementVersion(); } @Override diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/MongoStepExecutionDao.java b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/MongoStepExecutionDao.java index ec9067fe61..6dfdf1aed0 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/MongoStepExecutionDao.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/MongoStepExecutionDao.java @@ -26,6 +26,7 @@ import org.springframework.batch.core.StepExecution; import org.springframework.batch.core.repository.persistence.converter.JobExecutionConverter; import org.springframework.batch.core.repository.persistence.converter.StepExecutionConverter; +import org.springframework.dao.OptimisticLockingFailureException; import org.springframework.data.mongodb.core.MongoOperations; import org.springframework.data.mongodb.core.query.Query; import org.springframework.jdbc.support.incrementer.DataFieldMaxValueIncrementer; @@ -35,6 +36,7 @@ /** * @author Mahmoud Ben Hassine + * @author Yanming Zhou * @since 5.2.0 */ public class MongoStepExecutionDao implements StepExecutionDao { @@ -81,10 +83,17 @@ public void saveStepExecutions(Collection<StepExecution> stepExecutions) { @Override public void updateStepExecution(StepExecution stepExecution) { - Query query = query(where("stepExecutionId").is(stepExecution.getId())); + Query query = query( + where("stepExecutionId").is(stepExecution.getId()).and("version").is(stepExecution.getVersion())); org.springframework.batch.core.repository.persistence.StepExecution stepExecutionToUpdate = this.stepExecutionConverter .fromStepExecution(stepExecution); - this.mongoOperations.findAndReplace(query, stepExecutionToUpdate, STEP_EXECUTIONS_COLLECTION_NAME); + stepExecutionToUpdate.incrementVersion(); + if (this.mongoOperations.findAndReplace(query, stepExecutionToUpdate, + STEP_EXECUTIONS_COLLECTION_NAME) == null) { + throw new OptimisticLockingFailureException("Attempt to update step execution id=" + stepExecution.getId() + + " with wrong version (" + stepExecution.getVersion() + ")"); + } + stepExecution.incrementVersion(); } @Override diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/persistence/JobExecution.java b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/persistence/JobExecution.java index 2a0577417d..a9259254f4 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/persistence/JobExecution.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/persistence/JobExecution.java @@ -25,6 +25,7 @@ /** * @author Mahmoud Ben Hassine + * @author Yanming Zhou * @since 5.2.0 */ public class JobExecution { @@ -53,6 +54,8 @@ public class JobExecution { private ExecutionContext executionContext; + private Integer version; + public JobExecution() { } @@ -148,13 +151,30 @@ public void setExecutionContext(ExecutionContext executionContext) { this.executionContext = executionContext; } + public Integer getVersion() { + return version; + } + + public void setVersion(Integer version) { + this.version = version; + } + + public void incrementVersion() { + if (version == null) { + version = 0; + } + else { + version = version + 1; + } + } + @Override public String toString() { return "JobExecution{" + "id='" + id + '\'' + ", jobExecutionId=" + jobExecutionId + ", jobInstanceId=" + jobInstanceId + ", jobParameters=" + jobParameters + ", stepExecutions=" + stepExecutions + ", status=" + status + ", startTime=" + startTime + ", createTime=" + createTime + ", endTime=" + endTime + ", lastUpdated=" + lastUpdated + ", exitStatus=" + exitStatus + ", executionContext=" - + executionContext + '}'; + + executionContext + ", version=" + version + '}'; } } diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/persistence/StepExecution.java b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/persistence/StepExecution.java index 351fe34442..4bb0784ef4 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/persistence/StepExecution.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/persistence/StepExecution.java @@ -21,6 +21,7 @@ /** * @author Mahmoud Ben Hassine + * @author Yanming Zhou * @since 5.2.0 */ public class StepExecution { @@ -65,6 +66,8 @@ public class StepExecution { private boolean terminateOnly; + private Integer version; + public StepExecution() { } @@ -224,6 +227,23 @@ public void setTerminateOnly(boolean terminateOnly) { this.terminateOnly = terminateOnly; } + public Integer getVersion() { + return version; + } + + public void setVersion(Integer version) { + this.version = version; + } + + public void incrementVersion() { + if (version == null) { + version = 0; + } + else { + version = version + 1; + } + } + @Override public String toString() { return "StepExecution{" + "id='" + id + '\'' + ", stepExecutionId=" + stepExecutionId + ", jobExecutionId='" @@ -232,7 +252,8 @@ public String toString() { + ", readSkipCount=" + readSkipCount + ", processSkipCount=" + processSkipCount + ", writeSkipCount=" + writeSkipCount + ", filterCount=" + filterCount + ", startTime=" + startTime + ", createTime=" + createTime + ", endTime=" + endTime + ", lastUpdated=" + lastUpdated + ", executionContext=" - + executionContext + ", exitStatus=" + exitStatus + ", terminateOnly=" + terminateOnly + '}'; + + executionContext + ", exitStatus=" + exitStatus + ", terminateOnly=" + terminateOnly + ", version=" + + version + '}'; } } diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/persistence/converter/JobExecutionConverter.java b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/persistence/converter/JobExecutionConverter.java index 686c48464c..f1170a7bff 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/persistence/converter/JobExecutionConverter.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/persistence/converter/JobExecutionConverter.java @@ -27,6 +27,7 @@ /** * @author Mahmoud Ben Hassine + * @author Yanming Zhou * @since 5.2.0 */ public class JobExecutionConverter { @@ -54,6 +55,7 @@ public org.springframework.batch.core.JobExecution toJobExecution(JobExecution s source.getExitStatus().exitDescription())); jobExecution.setExecutionContext( new org.springframework.batch.item.ExecutionContext(source.getExecutionContext().map())); + jobExecution.setVersion(source.getVersion()); return jobExecution; } @@ -77,6 +79,7 @@ public JobExecution fromJobExecution(org.springframework.batch.core.JobExecution new ExitStatus(source.getExitStatus().getExitCode(), source.getExitStatus().getExitDescription())); org.springframework.batch.item.ExecutionContext executionContext = source.getExecutionContext(); jobExecution.setExecutionContext(new ExecutionContext(executionContext.toMap(), executionContext.isDirty())); + jobExecution.setVersion(source.getVersion()); return jobExecution; } diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/persistence/converter/StepExecutionConverter.java b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/persistence/converter/StepExecutionConverter.java index 221e9c50cf..6972872194 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/persistence/converter/StepExecutionConverter.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/persistence/converter/StepExecutionConverter.java @@ -22,6 +22,7 @@ /** * @author Mahmoud Ben Hassine + * @author Yanming Zhou * @since 5.2.0 */ public class StepExecutionConverter { @@ -50,6 +51,7 @@ public org.springframework.batch.core.StepExecution toStepExecution(StepExecutio if (source.isTerminateOnly()) { stepExecution.setTerminateOnly(); } + stepExecution.setVersion(source.getVersion()); return stepExecution; } @@ -77,6 +79,7 @@ public StepExecution fromStepExecution(org.springframework.batch.core.StepExecut org.springframework.batch.item.ExecutionContext executionContext = source.getExecutionContext(); stepExecution.setExecutionContext(new ExecutionContext(executionContext.toMap(), executionContext.isDirty())); stepExecution.setTerminateOnly(source.isTerminateOnly()); + stepExecution.setVersion(source.getVersion()); return stepExecution; }