Skip to content

Commit c60d4e6

Browse files
artembilanspring-builds
authored andcommitted
Fix transformer for async mode (#10625)
The `AbstractMessageProducingHandler` has `async` mode, which is handled by the reply payload type. However, the `AbstractTransformer` produces the whole `Message` with that async reply payload. * Fix `AbstractMessageProducingHandler` to check for a `Message` type on a reply object. Extract its payload for the async logic. And recreate the final message for reply after async container fulfilment * Disable `WebFluxDslTests.testHttpReactivePostWithError()` as failing against the latest SF (cherry picked from commit 4f35761)
1 parent a4b8dfe commit c60d4e6

File tree

2 files changed

+90
-12
lines changed

2 files changed

+90
-12
lines changed

spring-integration-core/src/main/java/org/springframework/integration/handler/AbstractMessageProducingHandler.java

Lines changed: 41 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -329,25 +329,42 @@ private void doProduceOutput(Message<?> requestMessage, MessageHeaders requestHe
329329
replyChannel = getOutputChannel();
330330
}
331331

332+
Object replyPayload = reply;
333+
Message<?> replyMessage = reply instanceof Message<?> message ? message : null;
334+
335+
if (replyMessage != null) {
336+
replyPayload = replyMessage.getPayload();
337+
}
338+
332339
if (this.async) {
333-
boolean isFutureReply = reply instanceof CompletableFuture<?>;
340+
boolean isFutureReply = replyPayload instanceof CompletableFuture<?>;
334341

335342
ReactiveAdapter reactiveAdapter = null;
336343
if (!isFutureReply) {
337-
reactiveAdapter = ReactiveAdapterRegistry.getSharedInstance().getAdapter(null, reply);
344+
reactiveAdapter = ReactiveAdapterRegistry.getSharedInstance().getAdapter(null, replyPayload);
338345
}
339346

340347
if (isFutureReply || reactiveAdapter != null) {
341348
if (replyChannel instanceof ReactiveStreamsSubscribableChannel reactiveStreamsSubscribableChannel) {
342-
Publisher<?> reactiveReply = toPublisherReply(reply, reactiveAdapter);
349+
Publisher<?> reactiveReply = toPublisherReply(replyPayload, reactiveAdapter);
343350
reactiveStreamsSubscribableChannel
344351
.subscribeTo(
345352
Flux.from(reactiveReply)
346353
.doOnError((ex) -> sendErrorMessage(requestMessage, ex))
347-
.map(result -> createOutputMessage(result, requestHeaders)));
354+
.map(result -> {
355+
if (replyMessage != null) {
356+
return getMessageBuilderFactory()
357+
.withPayload(result)
358+
.copyHeaders(replyMessage.getHeaders())
359+
.build();
360+
}
361+
else {
362+
return createOutputMessage(result, requestHeaders);
363+
}
364+
}));
348365
}
349366
else {
350-
CompletableFuture<?> futureReply = toFutureReply(reply, reactiveAdapter);
367+
CompletableFuture<?> futureReply = toFutureReply(replyPayload, replyMessage, reactiveAdapter);
351368
futureReply.whenComplete(new ReplyFutureCallback(requestMessage, replyChannel));
352369
}
353370

@@ -367,8 +384,12 @@ private Publisher<?> toPublisherReply(Object reply, @Nullable ReactiveAdapter re
367384
}
368385
}
369386

370-
@SuppressWarnings("try")
371-
private CompletableFuture<?> toFutureReply(Object reply, @Nullable ReactiveAdapter reactiveAdapter) {
387+
@SuppressWarnings({"try", "unchecked"})
388+
private CompletableFuture<?> toFutureReply(Object reply, @Nullable Message<?> replyMessage,
389+
@Nullable ReactiveAdapter reactiveAdapter) {
390+
391+
CompletableFuture<Object> replyFuture;
392+
372393
if (reactiveAdapter != null) {
373394
Mono<?> reactiveReply;
374395
Publisher<?> publisher = reactiveAdapter.toPublisher(reply);
@@ -379,15 +400,15 @@ private CompletableFuture<?> toFutureReply(Object reply, @Nullable ReactiveAdapt
379400
reactiveReply = Mono.from(publisher);
380401
}
381402

382-
CompletableFuture<Object> replyFuture = new CompletableFuture<>();
403+
replyFuture = new CompletableFuture<>();
383404

384405
reactiveReply
385406
/*
386407
The MonoToCompletableFuture in Project Reactor does not support context propagation,
387408
and it does not suppose to, since there is no guarantee how this Future is going to
388409
be handled downstream.
389410
However, in our case we process it directly in this class in the doProduceOutput()
390-
via whenComplete() callback. So, when value is set into the Future, it is available
411+
via whenComplete() callback. So, when the value is set into the Future, it is available
391412
in the callback in the same thread immediately.
392413
*/
393414
.doOnEach((signal) -> {
@@ -408,12 +429,20 @@ via whenComplete() callback. So, when value is set into the Future, it is availa
408429
})
409430
.contextCapture()
410431
.subscribe();
411-
412-
return replyFuture;
413432
}
414433
else {
415-
return (CompletableFuture<?>) reply;
434+
replyFuture = (CompletableFuture<Object>) reply;
416435
}
436+
437+
if (replyMessage == null) {
438+
return replyFuture;
439+
}
440+
441+
return replyFuture.thenApply(result ->
442+
getMessageBuilderFactory()
443+
.withPayload(result)
444+
.copyHeaders(replyMessage.getHeaders())
445+
.build());
417446
}
418447

419448
private AbstractIntegrationMessageBuilder<?> addRoutingSlipHeader(Object reply, List<?> routingSlip,

spring-integration-core/src/test/java/org/springframework/integration/dsl/transformers/TransformerTests.java

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,15 @@
1818

1919
import java.io.InputStream;
2020
import java.io.OutputStream;
21+
import java.time.Duration;
2122
import java.util.Collections;
2223
import java.util.Date;
2324
import java.util.Map;
25+
import java.util.concurrent.CompletableFuture;
2426

2527
import org.junit.jupiter.api.Test;
28+
import reactor.core.publisher.Flux;
29+
import reactor.test.StepVerifier;
2630

2731
import org.springframework.beans.factory.annotation.Autowired;
2832
import org.springframework.beans.factory.annotation.Qualifier;
@@ -32,6 +36,7 @@
3236
import org.springframework.integration.annotation.Transformer;
3337
import org.springframework.integration.channel.DirectChannel;
3438
import org.springframework.integration.channel.FixedSubscriberChannel;
39+
import org.springframework.integration.channel.FluxMessageChannel;
3540
import org.springframework.integration.channel.QueueChannel;
3641
import org.springframework.integration.codec.Codec;
3742
import org.springframework.integration.config.EnableIntegration;
@@ -273,6 +278,41 @@ public void testFailedTransformWithRequestHeadersCopy() {
273278
.isEqualTo("transform failed");
274279
}
275280

281+
@Autowired
282+
@Qualifier("asyncTransformerFlow.input")
283+
MessageChannel asyncTransformerFlowInput;
284+
285+
@Test
286+
void asyncTransformerReplyIsProcessed() {
287+
QueueChannel replyChannel = new QueueChannel();
288+
this.asyncTransformerFlowInput.send(
289+
MessageBuilder.withPayload("test")
290+
.setReplyChannel(replyChannel)
291+
.build());
292+
293+
Message<?> receive = replyChannel.receive(10_000);
294+
295+
assertThat(receive).extracting(Message::getPayload).isEqualTo("test async");
296+
297+
}
298+
299+
@Test
300+
void reactiveTransformerReplyIsProcessed() {
301+
FluxMessageChannel replyChannel = new FluxMessageChannel();
302+
this.asyncTransformerFlowInput.send(
303+
MessageBuilder.withPayload("test")
304+
.setReplyChannel(replyChannel)
305+
.build());
306+
307+
StepVerifier.create(
308+
Flux.from(replyChannel)
309+
.map(Message::getPayload)
310+
.cast(String.class))
311+
.expectNext("test async")
312+
.thenCancel()
313+
.verify(Duration.ofSeconds(10));
314+
}
315+
276316
@Configuration
277317
@EnableIntegration
278318
public static class ContextConfiguration {
@@ -465,6 +505,15 @@ public IntegrationFlow transformFlowWithError() {
465505
.log();
466506
}
467507

508+
@Bean
509+
public IntegrationFlow asyncTransformerFlow() {
510+
return f -> f
511+
.transformWith(endpoint -> endpoint
512+
.<String, CompletableFuture<String>>transformer(payload ->
513+
CompletableFuture.completedFuture(payload + " async"))
514+
.async(true));
515+
}
516+
468517
}
469518

470519
private static final class TestPojo {

0 commit comments

Comments
 (0)