diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/step/tasklet/TaskletStep.java b/spring-batch-core/src/main/java/org/springframework/batch/core/step/tasklet/TaskletStep.java index 077a9c530d..c1c9242829 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/step/tasklet/TaskletStep.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/step/tasklet/TaskletStep.java @@ -19,6 +19,7 @@ import org.apache.commons.logging.LogFactory; import org.springframework.batch.core.BatchStatus; import org.springframework.batch.core.ChunkListener; +import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.JobInterruptedException; import org.springframework.batch.core.StepContribution; import org.springframework.batch.core.StepExecution; @@ -339,6 +340,7 @@ private class ChunkTransactionCallback extends TransactionSynchronizationAdapter private boolean stepExecutionUpdated = false; private StepExecution oldVersion; + private ExecutionContext oldExecutionContext; private boolean locked = false; @@ -360,6 +362,7 @@ public void afterCompletion(int status) { logger.info("Commit failed while step execution data was already updated. " + "Reverting to old version."); copy(oldVersion, stepExecution); + stepExecution.setExecutionContext(oldExecutionContext); if (status == TransactionSynchronization.STATUS_ROLLED_BACK) { rollback(stepExecution); } @@ -371,7 +374,6 @@ public void afterCompletion(int status) { logger.error("Rolling back with transaction in unknown state"); rollback(stepExecution); stepExecution.upgradeStatus(BatchStatus.UNKNOWN); - stepExecution.setTerminateOnly(); } } finally { @@ -397,8 +399,7 @@ public RepeatStatus doInTransaction(TransactionStatus status) { // In case we need to push it back to its old value // after a commit fails... - oldVersion = new StepExecution(stepExecution.getStepName(), stepExecution.getJobExecution()); - copy(stepExecution, oldVersion); + oldExecutionContext = new ExecutionContext(stepExecution.getExecutionContext()); try { @@ -433,6 +434,23 @@ public RepeatStatus doInTransaction(TransactionStatus status) { Thread.currentThread().interrupt(); } + // Refresh stepExecution to the latest correctly persisted + // state in order to apply the contribution on the latest version + String stepName = stepExecution.getStepName(); + JobExecution jobExecution = stepExecution.getJobExecution(); + StepExecution lastStepExecution = getJobRepository() + .getLastStepExecution(jobExecution.getJobInstance(), stepName); + if (lastStepExecution != null && + !lastStepExecution.getVersion().equals(stepExecution.getVersion())) { + copy(lastStepExecution, stepExecution); + } + + // Take a copy of the stepExecution in case we need to + // undo the current contribution to the in memory instance + // if the commit fails + oldVersion = new StepExecution(stepName, jobExecution); + copy(stepExecution, oldVersion); + // Apply the contribution to the step // even if unsuccessful if (logger.isDebugEnabled()) { @@ -499,11 +517,9 @@ private void rollback(StepExecution stepExecution) { } private void copy(final StepExecution source, final StepExecution target) { - target.setVersion(source.getVersion()); target.setWriteCount(source.getWriteCount()); target.setFilterCount(source.getFilterCount()); target.setCommitCount(source.getCommitCount()); - target.setExecutionContext(new ExecutionContext(source.getExecutionContext())); } } diff --git a/spring-batch-core/src/test/java/org/springframework/batch/core/step/item/MultiThreadedTaskletStepIntegrationTests.java b/spring-batch-core/src/test/java/org/springframework/batch/core/step/item/MultiThreadedTaskletStepIntegrationTests.java new file mode 100644 index 0000000000..0518e8d957 --- /dev/null +++ b/spring-batch-core/src/test/java/org/springframework/batch/core/step/item/MultiThreadedTaskletStepIntegrationTests.java @@ -0,0 +1,264 @@ +/* + * Copyright 2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.batch.core.step.item; + +import org.junit.Test; +import org.springframework.batch.core.BatchStatus; +import org.springframework.batch.core.Job; +import org.springframework.batch.core.JobExecution; +import org.springframework.batch.core.JobParameters; +import org.springframework.batch.core.StepExecution; +import org.springframework.batch.core.configuration.annotation.BatchConfigurer; +import org.springframework.batch.core.configuration.annotation.DefaultBatchConfigurer; +import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; +import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; +import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; +import org.springframework.batch.core.launch.JobLauncher; +import org.springframework.batch.core.listener.JobExecutionListenerSupport; +import org.springframework.batch.core.step.tasklet.TaskletStep; +import org.springframework.batch.item.ItemReader; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationContext; +import org.springframework.context.annotation.AnnotationConfigApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; +import org.springframework.jdbc.datasource.DataSourceTransactionManager; +import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder; +import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.transaction.PlatformTransactionManager; +import org.springframework.transaction.support.DefaultTransactionStatus; + +import javax.sql.DataSource; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +/** + * Tests for the behavior of a multi-threaded TaskletStep. + * + * @author Mahmoud Ben Hassine + */ +public class MultiThreadedTaskletStepIntegrationTests { + + @Test + public void testMultiThreadedTaskletExecutionWhenNoErrors() throws Exception { + // given + Class[] configurationClasses = {JobConfiguration.class, TransactionManagerConfiguration.class}; + ApplicationContext context = new AnnotationConfigApplicationContext(configurationClasses); + JobLauncher jobLauncher = context.getBean(JobLauncher.class); + Job job = context.getBean(Job.class); + JobParameters jobParameters = new JobParameters(); + + // when + JobExecution jobExecution = jobLauncher.run(job, jobParameters); + + // then + assertNotNull(jobExecution); + assertEquals(BatchStatus.COMPLETED, jobExecution.getStatus()); + StepExecution stepExecution = jobExecution.getStepExecutions().iterator().next(); + assertEquals(BatchStatus.COMPLETED, stepExecution.getStatus()); + assertEquals(0, stepExecution.getFailureExceptions().size()); + } + + @Test + public void testMultiThreadedTaskletExecutionWhenCommitFails() throws Exception { + // given + Class[] configurationClasses = {JobConfiguration.class, CommitFailingTransactionManagerConfiguration.class}; + ApplicationContext context = new AnnotationConfigApplicationContext(configurationClasses); + JobLauncher jobLauncher = context.getBean(JobLauncher.class); + Job job = context.getBean(Job.class); + JobParameters jobParameters = new JobParameters(); + + // when + JobExecution jobExecution = jobLauncher.run(job, jobParameters); + + // then + assertNotNull(jobExecution); + assertEquals(BatchStatus.FAILED, jobExecution.getStatus()); + StepExecution stepExecution = jobExecution.getStepExecutions().iterator().next(); + assertEquals(BatchStatus.FAILED, stepExecution.getStatus()); + Throwable e = stepExecution.getFailureExceptions().get(0); + assertEquals("Planned commit exception!", e.getMessage()); + // No assertions on execution context because it is undefined in this case + } + + @Test + public void testMultiThreadedTaskletExecutionWhenRollbackFails() throws Exception { + // given + Class[] configurationClasses = {JobConfiguration.class, RollbackFailingTransactionManagerConfiguration.class}; + ApplicationContext context = new AnnotationConfigApplicationContext(configurationClasses); + JobLauncher jobLauncher = context.getBean(JobLauncher.class); + Job job = context.getBean(Job.class); + JobParameters jobParameters = new JobParameters(); + + // when + JobExecution jobExecution = jobLauncher.run(job, jobParameters); + + // then + assertNotNull(jobExecution); + assertEquals(BatchStatus.UNKNOWN, jobExecution.getStatus()); + StepExecution stepExecution = jobExecution.getStepExecutions().iterator().next(); + assertEquals(BatchStatus.UNKNOWN, stepExecution.getStatus()); + Throwable e = stepExecution.getFailureExceptions().get(0); + assertEquals("Planned rollback exception!", e.getMessage()); + // No assertions on execution context because it is undefined in this case + } + + @Configuration + @EnableBatchProcessing + public static class JobConfiguration { + + @Autowired + private JobBuilderFactory jobBuilderFactory; + @Autowired + private StepBuilderFactory stepBuilderFactory; + + @Bean + public TaskletStep step() { + return stepBuilderFactory.get("step") + .chunk(3) + .reader(itemReader()) + .writer(items -> {}) + .taskExecutor(taskExecutor()) + .build(); + } + + @Bean + public Job job(ThreadPoolTaskExecutor taskExecutor) { + return jobBuilderFactory.get("job") + .start(step()) + .listener(new JobExecutionListenerSupport() { + @Override + public void afterJob(JobExecution jobExecution) { + taskExecutor.shutdown(); + } + }) + .build(); + } + + @Bean + public ThreadPoolTaskExecutor taskExecutor() { + ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); + taskExecutor.setCorePoolSize(3); + taskExecutor.setMaxPoolSize(3); + taskExecutor.setThreadNamePrefix("spring-batch-worker-thread-"); + return taskExecutor; + } + + @Bean + public ItemReader itemReader() { + return new ItemReader() { + private final AtomicInteger atomicInteger = new AtomicInteger(); + @Override + public synchronized Integer read() { + int value = atomicInteger.incrementAndGet(); + return value <= 9 ? value : null; + } + }; + } + + } + + @Configuration + public static class DataSourceConfiguration { + + @Bean + public DataSource dataSource() { + return new EmbeddedDatabaseBuilder() + .setType(EmbeddedDatabaseType.HSQL) + .addScript("org/springframework/batch/core/schema-drop-hsqldb.sql") + .addScript("org/springframework/batch/core/schema-hsqldb.sql") + .build(); + } + + } + + @Configuration + @Import(DataSourceConfiguration.class) + public static class TransactionManagerConfiguration { + + @Bean + public BatchConfigurer batchConfigurer(DataSource dataSource) { + return new DefaultBatchConfigurer(dataSource) { + @Override + public PlatformTransactionManager getTransactionManager() { + return new DataSourceTransactionManager(dataSource); + } + }; + } + + } + + @Configuration + @Import(DataSourceConfiguration.class) + public static class CommitFailingTransactionManagerConfiguration { + + @Bean + public BatchConfigurer batchConfigurer(DataSource dataSource) { + return new DefaultBatchConfigurer(dataSource) { + @Override + public PlatformTransactionManager getTransactionManager() { + return new DataSourceTransactionManager(dataSource) { + @Override + protected void doCommit(DefaultTransactionStatus status) { + super.doCommit(status); + if (Thread.currentThread().getName().equals("spring-batch-worker-thread-2")) { + throw new RuntimeException("Planned commit exception!"); + } + } + }; + } + }; + } + + } + + @Configuration + @Import(DataSourceConfiguration.class) + public static class RollbackFailingTransactionManagerConfiguration { + + @Bean + public BatchConfigurer batchConfigurer(DataSource dataSource) { + return new DefaultBatchConfigurer(dataSource) { + @Override + public PlatformTransactionManager getTransactionManager() { + return new DataSourceTransactionManager(dataSource) { + @Override + protected void doCommit(DefaultTransactionStatus status) { + super.doCommit(status); + if (Thread.currentThread().getName().equals("spring-batch-worker-thread-2")) { + throw new RuntimeException("Planned commit exception!"); + } + } + + @Override + protected void doRollback(DefaultTransactionStatus status) { + super.doRollback(status); + if (Thread.currentThread().getName().equals("spring-batch-worker-thread-2")) { + throw new RuntimeException("Planned rollback exception!"); + } + } + }; + } + }; + } + + } + +} diff --git a/spring-batch-docs/src/main/asciidoc/scalability.adoc b/spring-batch-docs/src/main/asciidoc/scalability.adoc index 76997fcb9c..3df42524d0 100644 --- a/spring-batch-docs/src/main/asciidoc/scalability.adoc +++ b/spring-batch-docs/src/main/asciidoc/scalability.adoc @@ -123,7 +123,9 @@ your step, such as a `DataSource`. Be sure to make the pool in those resources as large as the desired number of concurrent threads in the step. There are some practical limitations of using multi-threaded `Step` implementations for -some common batch use cases. Many participants in a `Step` (such as readers and writers) +some common batch use cases: + +* Many participants in a `Step` (such as readers and writers) are stateful. If the state is not segregated by thread, then those components are not usable in a multi-threaded `Step`. In particular, most of the off-the-shelf readers and writers from Spring Batch are not designed for multi-threaded use. It is, however, @@ -132,9 +134,8 @@ possible to work with stateless or thread safe readers and writers, and there is https://github.com/spring-projects/spring-batch/tree/master/spring-batch-samples[Spring Batch Samples] that shows the use of a process indicator (see <>) to keep track -of items that have been processed in a database input table. - -Spring Batch provides some implementations of `ItemWriter` and `ItemReader`. Usually, +of items that have been processed in a database input table. Spring Batch provides some + implementations of `ItemWriter` and `ItemReader`. Usually, they say in the Javadoc if they are thread safe or not or what you have to do to avoid problems in a concurrent environment. If there is no information in the Javadoc, you can check the implementation to see if there is any state. If a reader is not thread safe, @@ -143,6 +144,11 @@ synchronizing delegator. You can synchronize the call to `read()` and as long as processing and writing is the most expensive part of the chunk, your step may still complete much faster than it would in a single threaded configuration. +* In a multi-threaded `Step`, each thread runs in its own transaction and the `ChunkContext` +is shared between threads. This shared state might end up in an inconsistent state +if one of the transactions is rolled back. Hence, we recommend avoiding `ExecutionContext` +manipulation in a multi-threaded `Step`. + [[scalabilityParallelSteps]]