|
| 1 | + |
| 2 | +# 基于大模型的 OrchestratorWorkersWorkflow 智能编排 |
| 3 | + |
| 4 | +## 一、背景与目标 |
| 5 | + |
| 6 | +在智能体(AI Agent)和自动化编排领域,复杂任务往往需要拆解为多个子任务,并由不同的“worker”并行处理。本文将介绍如何基于 Spring AI 和大模型(LLM),实现一个通用的 OrchestratorWorkersWorkflow,自动完成任务拆解、分发与结果汇总,并通过单元测试进行验证和评估。 |
| 7 | + |
| 8 | +--- |
| 9 | + |
| 10 | +## 二、核心设计思路 |
| 11 | + |
| 12 | +### 1. 任务拆解(Orchestration) |
| 13 | + |
| 14 | +- 利用大模型(如 OpenAI、Qwen 等)对用户输入的复杂任务进行智能拆解,输出结构化的子任务列表。 |
| 15 | +- 拆解 prompt 采用系统提示词,要求模型以 JSON 数组返回所有可独立执行的子任务。 |
| 16 | + |
| 17 | +### 2. 子任务处理(Workers) |
| 18 | + |
| 19 | +- 对每个子任务,worker 再次调用大模型,让其“执行”或“回答”该子任务,实现通用型处理。 |
| 20 | +- 支持并行处理,提升效率。 |
| 21 | + |
| 22 | +### 3. 结果汇总 |
| 23 | + |
| 24 | +- 汇总所有 worker 的输出,形成最终的 content。 |
| 25 | +- 同时保留原始拆解、子任务列表、各 worker 输出,便于溯源和分析。 |
| 26 | + |
| 27 | +--- |
| 28 | + |
| 29 | +## 三、核心代码实现 |
| 30 | + |
| 31 | +### 1. WorkflowResponse 结构 |
| 32 | + |
| 33 | +```java |
| 34 | +@Data |
| 35 | +@Builder |
| 36 | +public class WorkflowResponse { |
| 37 | + private String analysis; // 大模型原始拆解输出 |
| 38 | + private List<String> subtasks; // 拆解出的所有子任务 |
| 39 | + private List<String> workerResponses; // 每个 worker 的输出 |
| 40 | + private String content; // 最终合成内容 |
| 41 | + private boolean success; // 是否成功 |
| 42 | + private String errorMessage; // 错误信息 |
| 43 | +} |
| 44 | +``` |
| 45 | + |
| 46 | +### 2. OrchestratorWorkersWorkflow 主要逻辑 |
| 47 | + |
| 48 | +```java |
| 49 | +@Component |
| 50 | +public class OrchestratorWorkersWorkflow { |
| 51 | + |
| 52 | + @Autowired |
| 53 | + private ChatClient chatClient; |
| 54 | + |
| 55 | + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); |
| 56 | + |
| 57 | + public WorkflowResponse process(String taskDescription) { |
| 58 | + try { |
| 59 | + // 1. 用大模型拆解任务 |
| 60 | + LlmSubtaskResult subtaskResult = callLlmForSubtasks(taskDescription); |
| 61 | + List<String> subtasks = subtaskResult.subtasks; |
| 62 | + String analysis = subtaskResult.analysis; |
| 63 | + if (subtasks == null || subtasks.isEmpty()) { |
| 64 | + return WorkflowResponse.builder() |
| 65 | + .success(false) |
| 66 | + .errorMessage("大模型未能拆解出子任务") |
| 67 | + .analysis(analysis) |
| 68 | + .subtasks(subtasks) |
| 69 | + .build(); |
| 70 | + } |
| 71 | + |
| 72 | + // 2. Workers process subtasks in parallel |
| 73 | + List<String> workerResponses = subtasks.stream() |
| 74 | + .map(subtask -> CompletableFuture.supplyAsync(() -> workerProcess(subtask))) |
| 75 | + .collect(Collectors.toList()) |
| 76 | + .stream() |
| 77 | + .map(CompletableFuture::join) |
| 78 | + .collect(Collectors.toList()); |
| 79 | + |
| 80 | + // 3. Results are combined into final response |
| 81 | + String combined = String.join("\n", workerResponses); |
| 82 | + return WorkflowResponse.builder() |
| 83 | + .content(combined) |
| 84 | + .success(true) |
| 85 | + .analysis(analysis) |
| 86 | + .subtasks(subtasks) |
| 87 | + .workerResponses(workerResponses) |
| 88 | + .build(); |
| 89 | + } catch (Exception e) { |
| 90 | + return WorkflowResponse.builder() |
| 91 | + .success(false) |
| 92 | + .errorMessage("Orchestrator/Worker 执行失败: " + e.getMessage()) |
| 93 | + .build(); |
| 94 | + } |
| 95 | + } |
| 96 | + |
| 97 | + // 用大模型拆解任务 |
| 98 | + private LlmSubtaskResult callLlmForSubtasks(String taskDescription) throws Exception { |
| 99 | + List messages = List.of( |
| 100 | + new SystemMessage("你是一个任务拆解专家。请将用户输入的复杂任务描述拆解为若干可以独立执行的子任务,输出格式为 JSON 数组,每个元素为一个子任务字符串。"), |
| 101 | + new UserMessage(taskDescription) |
| 102 | + ); |
| 103 | + Prompt prompt = new Prompt(messages); |
| 104 | + String modelResult = chatClient.prompt(prompt).call().content(); |
| 105 | + List<String> subtasks; |
| 106 | + try { |
| 107 | + subtasks = OBJECT_MAPPER.readValue(modelResult, new TypeReference<List<String>>() {}); |
| 108 | + } catch (Exception e) { |
| 109 | + subtasks = Arrays.stream(modelResult.split("[。,.,\n]")).map(String::trim).filter(s -> !s.isEmpty()).collect(Collectors.toList()); |
| 110 | + } |
| 111 | + return new LlmSubtaskResult(modelResult, subtasks); |
| 112 | + } |
| 113 | + |
| 114 | + // 通用 worker 处理逻辑:再次用大模型处理子任务 |
| 115 | + private String workerProcess(String subtask) { |
| 116 | + try { |
| 117 | + String systemPrompt = "你是一个高效的AI助手,请认真完成以下子任务:"; |
| 118 | + List messages = List.of( |
| 119 | + new SystemMessage(systemPrompt), |
| 120 | + new UserMessage(subtask) |
| 121 | + ); |
| 122 | + Prompt prompt = new Prompt(messages); |
| 123 | + return chatClient.prompt(prompt).call().content(); |
| 124 | + } catch (Exception e) { |
| 125 | + return "[Worker Error] " + e.getMessage(); |
| 126 | + } |
| 127 | + } |
| 128 | + |
| 129 | + // 内部类:封装大模型分析结果 |
| 130 | + private static class LlmSubtaskResult { |
| 131 | + final String analysis; |
| 132 | + final List<String> subtasks; |
| 133 | + LlmSubtaskResult(String analysis, List<String> subtasks) { |
| 134 | + this.analysis = analysis; |
| 135 | + this.subtasks = subtasks; |
| 136 | + } |
| 137 | + } |
| 138 | +} |
| 139 | +``` |
| 140 | + |
| 141 | +--- |
| 142 | + |
| 143 | +## 四、测试用例与验证 |
| 144 | + |
| 145 | +### 1. 测试用例设计 |
| 146 | + |
| 147 | +- 输入任务描述:"Generate both technical and user-friendly documentation for a REST API endpoint" |
| 148 | +- 期望:大模型能拆解出合理的文档编写子任务,worker 能对每个子任务给出专业输出。 |
| 149 | + |
| 150 | +### 2. 测试结果(test_result.md 摘要) |
| 151 | + |
| 152 | +- **Analysis**(大模型拆解原始输出): |
| 153 | + ```json |
| 154 | + [ |
| 155 | + "Identify the REST API endpoint to be documented", |
| 156 | + "Gather technical specifications of the endpoint (e.g., HTTP method, URL, request/response formats)", |
| 157 | + "Document the endpoint's purpose and functionality", |
| 158 | + ... |
| 159 | + "Format the documentation for readability and consistency" |
| 160 | + ] |
| 161 | + ``` |
| 162 | +- **Worker Outputs**(每个子任务的处理结果): |
| 163 | + - 针对“Identify the REST API endpoint to be documented”,worker 会主动询问 API 细节,体现了智能 agent 的交互性。 |
| 164 | + - 针对“Gather technical specifications...”,worker 会列举常见的技术规格项,并给出格式建议。 |
| 165 | + - 针对“Write clear, step-by-step usage instructions for developers”,worker 会输出结构化的开发者指引。 |
| 166 | + - 其他子任务也都能得到合理、专业的响应。 |
| 167 | + |
| 168 | +- **最终 content**:为所有 worker 输出的合成文本,便于直接展示或下游处理。 |
| 169 | + |
| 170 | +--- |
| 171 | + |
| 172 | +## 五、结果评估与客观分析 |
| 173 | + |
| 174 | +### 1. 优点 |
| 175 | + |
| 176 | +- **高度自动化**:无需手写拆解规则,完全依赖大模型智能分析。 |
| 177 | +- **通用性强**:worker 逻辑可适配各种类型的子任务,适合多场景复用。 |
| 178 | +- **可扩展性好**:支持并行处理,易于横向扩展。 |
| 179 | +- **结构化输出**:便于前端展示、流程追踪和后续分析。 |
| 180 | + |
| 181 | +### 2. 局限与改进空间 |
| 182 | + |
| 183 | +- **大模型输出稳定性**:部分模型可能输出非严格 JSON,需做好容错处理。 |
| 184 | +- **worker 处理深度**:如需更专业的子任务处理,可为不同类型子任务定制专属 worker。 |
| 185 | +- **性能优化**:大规模并发时可引入线程池、限流等机制。 |
| 186 | + |
| 187 | +### 3. 适用场景 |
| 188 | + |
| 189 | +- 智能文档生成、API 文档自动化 |
| 190 | +- 复杂任务的自动拆解与执行 |
| 191 | +- AI Agent、RPA、智能问答等 |
| 192 | + |
| 193 | +--- |
| 194 | + |
| 195 | +## 六、结语 |
| 196 | + |
| 197 | +通过 OrchestratorWorkersWorkflow 的实现与验证,我们可以看到大模型驱动的智能编排在实际业务中的巨大潜力。只需简单 prompt,即可实现复杂任务的自动拆解与高效执行。未来,结合更细粒度的 worker 能力和更强的模型,智能体编排将更加智能和实用。 |
0 commit comments