Skip to content

Commit 42cbaab

Browse files
committed
✨ 添加批处理流程模板
1 parent 319ec11 commit 42cbaab

File tree

16 files changed

+550
-0
lines changed

16 files changed

+550
-0
lines changed

pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
<modules>
1212
<module>summer-beans</module>
1313
<module>summer-validator</module>
14+
<module>summer-engines</module>
1415
</modules>
1516

1617
<properties>
@@ -61,6 +62,11 @@
6162
<version>${servlet-api.version}</version>
6263
<scope>provided</scope>
6364
</dependency>
65+
<dependency>
66+
<groupId>org.projectlombok</groupId>
67+
<artifactId>lombok</artifactId>
68+
<version>1.18.12</version>
69+
</dependency>
6470

6571

6672
<!--test-->

summer-engines/pom.xml

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<modelVersion>4.0.0</modelVersion>
6+
<parent>
7+
<groupId>com.dianpoint.summer</groupId>
8+
<artifactId>summer</artifactId>
9+
<version>0.1-SNAPSHOT</version>
10+
</parent>
11+
12+
<artifactId>summer-engines</artifactId>
13+
14+
<properties>
15+
<maven.compiler.source>23</maven.compiler.source>
16+
<maven.compiler.target>23</maven.compiler.target>
17+
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
18+
</properties>
19+
20+
<dependencies>
21+
<dependency>
22+
<groupId>org.projectlombok</groupId>
23+
<artifactId>lombok</artifactId>
24+
</dependency>
25+
<dependency>
26+
<groupId>org.springframework.boot</groupId>
27+
<artifactId>spring-boot-starter-data-jpa</artifactId>
28+
<version>3.4.3</version>
29+
</dependency>
30+
</dependencies>
31+
</project>
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package com.dianpoint.summer.engines.batch;
2+
3+
/**
4+
* @author: congccoder
5+
* @email: congccoder@gmail.com | <a href="https://github.com/ccoderJava">github-homepage</a>
6+
* @date: 2025/6/3 19:13
7+
*/
8+
9+
//TIP To <b>Run</b> code, press <shortcut actionId="Run"/> or
10+
// click the <icon src="AllIcons.Actions.Execute"/> icon in the gutter.
11+
public class Main {
12+
public static void main(String[] args) {
13+
//TIP Press <shortcut actionId="ShowIntentionActions"/> with your caret at the highlighted text
14+
// to see how IntelliJ IDEA suggests fixing it.
15+
System.out.printf("Hello and welcome!");
16+
17+
for (int i = 1; i <= 5; i++) {
18+
//TIP Press <shortcut actionId="Debug"/> to start debugging your code. We have set one <icon src="AllIcons.Debugger.Db_set_breakpoint"/> breakpoint
19+
// for you, but you can always add more by pressing <shortcut actionId="ToggleLineBreakpoint"/>.
20+
System.out.println("i = " + i);
21+
}
22+
}
23+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package com.dianpoint.summer.engines.batch.common;
2+
3+
/**
4+
* @author: congccoder
5+
* @email: congccoder@gmail.com | <a href="https://github.com/ccoderJava">github-homepage</a>
6+
* @date: 2025/6/3 19:35
7+
*/
8+
9+
public class AcceptResponse {
10+
11+
private Long taskId;
12+
private String taskStatus;
13+
14+
public AcceptResponse(Long taskId, String taskStatus) {
15+
this.taskId = taskId;
16+
this.taskStatus = taskStatus;
17+
}
18+
}
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
package com.dianpoint.summer.engines.batch.dao;
2+
3+
import com.dianpoint.summer.engines.batch.entity.MainTask;
4+
import com.dianpoint.summer.engines.batch.enums.TaskStatus;
5+
import com.dianpoint.summer.engines.batch.service.TaskCoordinator;
6+
import org.springframework.beans.factory.annotation.Autowired;
7+
import org.springframework.jdbc.core.JdbcTemplate;
8+
import org.springframework.stereotype.Repository;
9+
import org.springframework.transaction.annotation.Transactional;
10+
11+
import java.time.LocalDateTime;
12+
13+
/**
14+
* @author: congccoder
15+
* @email: congccoder@gmail.com | <a href="https://github.com/ccoderJava">github-homepage</a>
16+
* @date: 2025/6/3 19:25
17+
*/
18+
19+
@Repository
20+
public class MainTaskDao {
21+
22+
@Autowired
23+
private JdbcTemplate jdbcTemplate;
24+
25+
@Autowired
26+
private TaskCoordinator taskCoordinator;
27+
28+
public void insert(MainTask task) {
29+
String sql = "INSERT INTO batch_main_task (biz_type, task_status, total_shards, finished_shards, params, completion_token, expire_time) VALUES (?, ?, ?, ?, ?, ?, ?)";
30+
jdbcTemplate.update(sql,
31+
task.getBizType(),
32+
task.getTaskStatus().getValue(),
33+
task.getTotalShards(),
34+
task.getFinishedShards(),
35+
task.getParams(),
36+
task.getCompletionToken(),
37+
task.getExpireTime());
38+
// 设置ID
39+
Long id = jdbcTemplate.queryForObject("SELECT LAST_INSERT_ID()", Long.class);
40+
task.setId(id);
41+
}
42+
43+
public void setCompletionToken(Long taskId, String token, LocalDateTime expireTime) {
44+
String sql = "UPDATE batch_main_task SET completion_token = ?, expire_time = ? WHERE id = ?";
45+
jdbcTemplate.update(sql, token, expireTime, taskId);
46+
}
47+
48+
public void clearCompletionToken(Long taskId) {
49+
String sql = "UPDATE batch_main_task SET completion_token = NULL, expire_time = NULL WHERE id = ?";
50+
jdbcTemplate.update(sql, taskId);
51+
52+
}
53+
54+
// MainTaskDao.java
55+
@Transactional
56+
public void onMainTaskComplete(Long taskId) {
57+
// 更新主任务状态
58+
jdbcTemplate.update(
59+
"UPDATE batch_main_task SET task_status=2, update_time=NOW() " +
60+
"WHERE id=? AND finished_shards >= total_shards",
61+
taskId
62+
);
63+
64+
// 触发阻塞调用完成
65+
MainTask task = getById(taskId);
66+
if (task.getCompletionToken() != null) {
67+
taskCoordinator.notifyCompletion(task.getCompletionToken());
68+
clearCompletionToken(taskId);
69+
}
70+
}
71+
72+
public MainTask getById(Long taskId) {
73+
String sql = "SELECT * FROM batch_main_task WHERE id = ?";
74+
return jdbcTemplate.queryForObject(sql, MainTask.class, taskId);
75+
}
76+
77+
public TaskStatus getStatus(Long taskId) {
78+
String sql = "SELECT * FROM batch_main_task WHERE id = ?";
79+
MainTask mainTask = jdbcTemplate.queryForObject(sql, MainTask.class, taskId);
80+
return mainTask.getTaskStatus();
81+
}
82+
83+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package com.dianpoint.summer.engines.batch.dao;
2+
3+
import com.dianpoint.summer.engines.batch.entity.SubTask;
4+
import com.dianpoint.summer.engines.batch.enums.TaskStatus;
5+
import org.springframework.beans.factory.annotation.Autowired;
6+
import org.springframework.jdbc.core.JdbcTemplate;
7+
import org.springframework.stereotype.Repository;
8+
import org.springframework.transaction.annotation.Transactional;
9+
10+
import java.util.List;
11+
12+
/**
13+
* @author: congccoder
14+
* @email: congccoder@gmail.com | <a href="https://github.com/ccoderJava">github-homepage</a>
15+
* @date: 2025/6/3 19:23
16+
*/
17+
18+
// SubTaskDao.java
19+
@Repository
20+
public class SubTaskDao {
21+
private static final String LOCK_SQL =
22+
"SELECT * FROM batch_sub_task WHERE task_status = 0 ORDER BY id LIMIT ? FOR UPDATE SKIP LOCKED";
23+
24+
@Autowired
25+
private JdbcTemplate jdbcTemplate;
26+
27+
public List<SubTask> lockTasks(int limit, String instanceId) {
28+
return jdbcTemplate.query(LOCK_SQL, (rs, rowNum) -> {
29+
SubTask task = new SubTask();
30+
// ...字段映射
31+
task.setTaskStatus(TaskStatus.PROCESSING);
32+
task.setInstanceId(instanceId);
33+
return task;
34+
}, limit);
35+
}
36+
37+
@Transactional
38+
public boolean updateWithVersion(SubTask task) {
39+
String sql = "UPDATE batch_sub_task SET task_status=?, result=?, "
40+
+ "execute_count=?, instance_id=?, version=version+1 "
41+
+ "WHERE id=? AND version=?";
42+
int updated = jdbcTemplate.update(sql,
43+
task.getTaskStatus().getValue(),
44+
task.getResult(),
45+
task.getExecuteCount(),
46+
task.getInstanceId(),
47+
task.getId(),
48+
task.getVersion());
49+
return updated > 0;
50+
}
51+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package com.dianpoint.summer.engines.batch.entity;
2+
3+
import com.dianpoint.summer.engines.batch.enums.TaskStatus;
4+
import lombok.Data;
5+
6+
import java.time.LocalDateTime;
7+
8+
/**
9+
* @author: congccoder
10+
* @email: congccoder@gmail.com | <a href="https://github.com/ccoderJava">github-homepage</a>
11+
* @date: 2025/6/3 19:16
12+
*/
13+
@Data
14+
public class MainTask {
15+
private Long id;
16+
private String bizType;
17+
private TaskStatus taskStatus;
18+
private Integer totalShards;
19+
private Integer finishedShards;
20+
private String params;
21+
private String completionToken;
22+
private LocalDateTime expireTime;
23+
private LocalDateTime createTime;
24+
private LocalDateTime updateTime;
25+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package com.dianpoint.summer.engines.batch.entity;
2+
3+
import com.dianpoint.summer.engines.batch.enums.TaskStatus;
4+
import lombok.Data;
5+
6+
import java.time.LocalDateTime;
7+
8+
/**
9+
* @author: congccoder
10+
* @email: congccoder@gmail.com | <a href="https://github.com/ccoderJava">github-homepage</a>
11+
* @date: 2025/6/3 19:22
12+
*/
13+
@Data
14+
public class SubTask {
15+
16+
private Long id;
17+
private Long mainTaskId;
18+
private Integer shardIndex;
19+
private String shardKey;
20+
private TaskStatus taskStatus;
21+
private Integer executeCount;
22+
private String result;
23+
private String instanceId;
24+
private Integer version;
25+
private LocalDateTime createTime;
26+
private LocalDateTime updateTime;
27+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package com.dianpoint.summer.engines.batch.enums;
2+
3+
/**
4+
* @author: congccoder
5+
* @email: congccoder@gmail.com | <a href="https://github.com/ccoderJava">github-homepage</a>
6+
* @date: 2025/6/3 19:16
7+
*/
8+
9+
public enum TaskStatus {
10+
PENDING(0), // 待处理
11+
12+
PROCESSING(1), // 处理中
13+
14+
SUCCESS(2), // 成功
15+
16+
FAILED(3); // 失败
17+
18+
private final int value;
19+
20+
TaskStatus(int value) {
21+
this.value = value;
22+
}
23+
24+
public int getValue() {
25+
return value;
26+
}
27+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package com.dianpoint.summer.engines.batch.scheduler;
2+
3+
import com.dianpoint.summer.engines.batch.dao.SubTaskDao;
4+
import com.dianpoint.summer.engines.batch.entity.SubTask;
5+
import com.dianpoint.summer.engines.batch.service.TaskProcessor;
6+
import org.springframework.beans.factory.annotation.Autowired;
7+
import org.springframework.core.task.TaskExecutor;
8+
import org.springframework.scheduling.annotation.Scheduled;
9+
import org.springframework.stereotype.Component;
10+
import org.springframework.transaction.annotation.Transactional;
11+
12+
import java.util.List;
13+
import java.util.UUID;
14+
import java.util.concurrent.CompletableFuture;
15+
16+
/**
17+
* @author: congccoder
18+
* @email: congccoder@gmail.com | <a href="https://github.com/ccoderJava">github-homepage</a>
19+
* @date: 2025/6/3 19:28
20+
*/
21+
22+
// DistributedTaskScheduler.java
23+
@Component
24+
public class DistributedTaskScheduler {
25+
private final String instanceId = UUID.randomUUID().toString();
26+
27+
28+
@Autowired
29+
private TaskProcessor taskProcessor;
30+
31+
32+
@Autowired
33+
private SubTaskDao subTaskDao;
34+
35+
@Scheduled(fixedDelay = 3000)
36+
@Transactional(rollbackFor = Exception.class)
37+
public void scheduleTasks() {
38+
List<SubTask> tasks = subTaskDao.lockTasks(20, instanceId);
39+
tasks.forEach(task -> {
40+
// 异步处理确保事务及时提交
41+
CompletableFuture.runAsync(() -> taskProcessor.process(task))
42+
.exceptionally(ex -> {
43+
taskProcessor.handleTaskFailure(task, ex);
44+
return null;
45+
});
46+
});
47+
}
48+
}

0 commit comments

Comments
 (0)