Skip to content

Commit 82121a5

Browse files
committed
Fix workers failure handling in local chunking
Before this commit, even when a worker thread fails, the step was still marked as complete. This commit updates the ChunkTaskExecutorItemWriter to mark the step as failed when at least one of the workers fails. Resolves #5172
1 parent 5edb62f commit 82121a5

File tree

3 files changed

+37
-1
lines changed

3 files changed

+37
-1
lines changed

spring-batch-integration/src/main/java/org/springframework/batch/integration/chunk/ChunkTaskExecutorItemWriter.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package org.springframework.batch.integration.chunk;
1717

18+
import org.springframework.batch.core.BatchStatus;
1819
import org.springframework.batch.core.ExitStatus;
1920
import org.springframework.batch.core.step.StepContribution;
2021
import org.springframework.batch.core.step.StepExecution;
@@ -84,14 +85,21 @@ public void beforeStep(StepExecution stepExecution) {
8485
@Override
8586
public ExitStatus afterStep(StepExecution stepExecution) {
8687
try {
88+
ExitStatus exitStatus = ExitStatus.COMPLETED
89+
.addExitDescription("Waited for " + this.responses.size() + " results.");
8790
for (StepContribution contribution : getStepContributions()) {
8891
stepExecution.apply(contribution);
92+
if (ExitStatus.FAILED.getExitCode().equals(contribution.getExitStatus().getExitCode())) {
93+
exitStatus = contribution.getExitStatus();
94+
stepExecution.setStatus(BatchStatus.FAILED);
95+
}
8996
}
97+
return exitStatus;
9098
}
9199
catch (ExecutionException | InterruptedException e) {
100+
stepExecution.setStatus(BatchStatus.FAILED);
92101
return ExitStatus.FAILED.addExitDescription(e);
93102
}
94-
return ExitStatus.COMPLETED.addExitDescription("Waited for " + this.responses.size() + " results.");
95103
}
96104

97105
private Collection<StepContribution> getStepContributions() throws ExecutionException, InterruptedException {
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
foo1,bar1
2+
foo2,bar2
3+
fooooooooooooooooooooooooooooo3,baaaaaaaaaaaaaaaaaaaaaaaaaaaar3
4+
foo4,bar4
5+
foo5,bar5
6+
foo6,bar6

spring-batch-samples/src/test/java/org/springframework/batch/samples/chunking/local/LocalChunkingJobFunctionalTests.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.springframework.test.jdbc.JdbcTestUtils;
3030

3131
import static org.junit.jupiter.api.Assertions.assertEquals;
32+
import static org.junit.jupiter.api.Assertions.assertTrue;
3233

3334
public class LocalChunkingJobFunctionalTests {
3435

@@ -52,4 +53,25 @@ public void testLaunchJobWithJavaConfig() throws Exception {
5253
assertEquals(6, vetsCount);
5354
}
5455

56+
@Test
57+
public void testLaunchJobWithJavaConfigFailure() throws Exception {
58+
// given
59+
ApplicationContext context = new AnnotationConfigApplicationContext(LocalChunkingJobConfiguration.class);
60+
JdbcTemplate jdbcTemplate = context.getBean(JdbcTemplate.class);
61+
JobOperator jobOperator = context.getBean(JobOperator.class);
62+
Job job = context.getBean(Job.class);
63+
JobParameters jobParameters = new JobParametersBuilder()
64+
.addString("inputFile", "org/springframework/batch/samples/chunking/local/data/vets-bad-data.csv")
65+
.toJobParameters();
66+
67+
// when
68+
JobExecution jobExecution = jobOperator.start(job, jobParameters);
69+
70+
// then
71+
assertEquals(BatchStatus.FAILED, jobExecution.getStatus());
72+
assertTrue(jobExecution.getExitStatus().getExitDescription().contains("size limit: 30"));
73+
int vetsCount = JdbcTestUtils.countRowsInTable(jdbcTemplate, "vets");
74+
assertEquals(4, vetsCount);
75+
}
76+
5577
}

0 commit comments

Comments
 (0)