Skip to content
This repository was archived by the owner on Nov 10, 2025. It is now read-only.

Commit 2dd8a89

Browse files
committed
Use Flux for KCL multi stream load
* Upgrade to `com.gradle.enterprise:3.15` * Use `setFanOut(false)` in the `KclMessageDrivenChannelAdapterTests` to cover the polling mode for KCL
1 parent 3a4f355 commit 2dd8a89

File tree

3 files changed

+22
-24
lines changed

3 files changed

+22
-24
lines changed

settings.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
plugins {
2-
id 'com.gradle.enterprise' version '3.14.1'
2+
id 'com.gradle.enterprise' version '3.15'
33
id 'io.spring.ge.conventions' version '0.0.13'
44
}
55

src/main/java/org/springframework/integration/aws/inbound/kinesis/KclMessageDrivenChannelAdapter.java

Lines changed: 20 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,14 @@
2626
import javax.annotation.Nullable;
2727

2828
import com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryDeserializer;
29+
import reactor.core.publisher.Flux;
30+
import reactor.core.publisher.Mono;
2931
import software.amazon.awssdk.arns.Arn;
3032
import software.amazon.awssdk.regions.Region;
3133
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
3234
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
3335
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
34-
import software.amazon.awssdk.services.kinesis.model.StreamDescriptionSummary;
36+
import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryResponse;
3537
import software.amazon.awssdk.utils.BinaryUtils;
3638
import software.amazon.kinesis.common.ConfigsBuilder;
3739
import software.amazon.kinesis.common.InitialPositionInStream;
@@ -274,6 +276,7 @@ protected void onInit() {
274276
this.cloudWatchClient,
275277
this.workerId,
276278
this.recordProcessorFactory);
279+
277280
this.config.lifecycleConfig().taskBackoffTimeMillis(this.consumerBackoff);
278281

279282
RetrievalSpecificConfig retrievalSpecificConfig;
@@ -317,7 +320,6 @@ protected void doStart() {
317320
+ "because it does not make sense in case of [ListenerMode.batch].");
318321
}
319322

320-
321323
this.scheduler =
322324
new Scheduler(
323325
this.config.checkpointConfig(),
@@ -339,7 +341,6 @@ protected void doStart() {
339341
protected void doStop() {
340342
super.doStop();
341343
this.scheduler.shutdown();
342-
343344
}
344345

345346
@Override
@@ -396,32 +397,28 @@ public Duration waitPeriodToDeleteFormerStreams() {
396397

397398
};
398399

399-
private List<StreamConfig> streamConfigs = null;
400+
private final Flux<StreamConfig> streamConfigs =
401+
Flux.fromArray(KclMessageDrivenChannelAdapter.this.streams)
402+
.flatMap((streamName) ->
403+
Mono.fromFuture(KclMessageDrivenChannelAdapter.this.kinesisClient
404+
.describeStreamSummary(request -> request.streamName(streamName))))
405+
.map(DescribeStreamSummaryResponse::streamDescriptionSummary)
406+
.map((summary) ->
407+
StreamIdentifier.multiStreamInstance(
408+
Arn.fromString(summary.streamARN()),
409+
summary.streamCreationTimestamp()
410+
.getEpochSecond()))
411+
.map((streamIdentifier) ->
412+
new StreamConfig(streamIdentifier,
413+
KclMessageDrivenChannelAdapter.this.streamInitialSequence))
414+
.cache();
400415

401416
StreamsTracker() {
402417
}
403418

404-
private StreamConfig buildStreamConfig(String streamName) {
405-
StreamDescriptionSummary descriptionSummary =
406-
KclMessageDrivenChannelAdapter.this.kinesisClient
407-
.describeStreamSummary(request -> request.streamName(streamName))
408-
.join().streamDescriptionSummary();
409-
return new StreamConfig(StreamIdentifier.multiStreamInstance(
410-
Arn.fromString(descriptionSummary.streamARN()),
411-
descriptionSummary.streamCreationTimestamp().getEpochSecond()),
412-
KclMessageDrivenChannelAdapter.this.streamInitialSequence);
413-
}
414-
415419
@Override
416420
public List<StreamConfig> streamConfigList() {
417-
if (this.streamConfigs == null) {
418-
// Lazy loading the Stream Configs, only during inquiry.
419-
this.streamConfigs = new ArrayList<>();
420-
Arrays.stream(KclMessageDrivenChannelAdapter.this.streams)
421-
.forEach(streamName -> this.streamConfigs.add(buildStreamConfig(streamName)));
422-
}
423-
424-
return this.streamConfigs;
421+
return this.streamConfigs.collectList().block();
425422
}
426423

427424
@Override

src/test/java/org/springframework/integration/aws/kinesis/KclMessageDrivenChannelAdapterTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ public KclMessageDrivenChannelAdapter kclMessageDrivenChannelAdapter() {
128128
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON));
129129
adapter.setConverter(String::new);
130130
adapter.setConsumerGroup("single_stream_group");
131+
adapter.setFanOut(false);
131132
adapter.setBindSourceRecord(true);
132133
return adapter;
133134
}

0 commit comments

Comments
 (0)