Skip to content
This repository was archived by the owner on Nov 10, 2025. It is now read-only.

Commit 3a4f355

Browse files
siddharthjain210artembilan
authored andcommitted
GH-232: Fix KCL support for multi streams
Fixes #232 * Corrected issues with indentations * Fixing Code Review Comments. Lazy initializing the Stream Configs, using kinesis stream summary API, removing lease table from the tests * Fixing Code Review Comments #2, handling formatting. * Fixing Code Review Comments #3, handling formatting and reducing line lengths to improve readability. * Some code clean up
1 parent 5fc8659 commit 3a4f355

File tree

3 files changed

+220
-12
lines changed

3 files changed

+220
-12
lines changed

src/main/java/org/springframework/integration/aws/inbound/kinesis/KclMessageDrivenChannelAdapter.java

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,12 @@
2626
import javax.annotation.Nullable;
2727

2828
import com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryDeserializer;
29+
import software.amazon.awssdk.arns.Arn;
2930
import software.amazon.awssdk.regions.Region;
3031
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
3132
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
3233
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
34+
import software.amazon.awssdk.services.kinesis.model.StreamDescriptionSummary;
3335
import software.amazon.awssdk.utils.BinaryUtils;
3436
import software.amazon.kinesis.common.ConfigsBuilder;
3537
import software.amazon.kinesis.common.InitialPositionInStream;
@@ -85,6 +87,7 @@
8587
* @author Hervé Fortin
8688
* @author Artem Bilan
8789
* @author Dirk Bonhomme
90+
* @author Siddharth Jain
8891
*
8992
* @since 2.2.0
9093
*/
@@ -271,7 +274,6 @@ protected void onInit() {
271274
this.cloudWatchClient,
272275
this.workerId,
273276
this.recordProcessorFactory);
274-
275277
this.config.lifecycleConfig().taskBackoffTimeMillis(this.consumerBackoff);
276278

277279
RetrievalSpecificConfig retrievalSpecificConfig;
@@ -394,18 +396,31 @@ public Duration waitPeriodToDeleteFormerStreams() {
394396

395397
};
396398

397-
private final List<StreamConfig> streamConfigs =
398-
Arrays.stream(KclMessageDrivenChannelAdapter.this.streams)
399-
.map(streamName ->
400-
new StreamConfig(StreamIdentifier.singleStreamInstance(streamName),
401-
KclMessageDrivenChannelAdapter.this.streamInitialSequence))
402-
.toList();
399+
private List<StreamConfig> streamConfigs = null;
403400

404401
StreamsTracker() {
405402
}
406403

404+
private StreamConfig buildStreamConfig(String streamName) {
405+
StreamDescriptionSummary descriptionSummary =
406+
KclMessageDrivenChannelAdapter.this.kinesisClient
407+
.describeStreamSummary(request -> request.streamName(streamName))
408+
.join().streamDescriptionSummary();
409+
return new StreamConfig(StreamIdentifier.multiStreamInstance(
410+
Arn.fromString(descriptionSummary.streamARN()),
411+
descriptionSummary.streamCreationTimestamp().getEpochSecond()),
412+
KclMessageDrivenChannelAdapter.this.streamInitialSequence);
413+
}
414+
407415
@Override
408416
public List<StreamConfig> streamConfigList() {
417+
if (this.streamConfigs == null) {
418+
// Lazy loading the Stream Configs, only during inquiry.
419+
this.streamConfigs = new ArrayList<>();
420+
Arrays.stream(KclMessageDrivenChannelAdapter.this.streams)
421+
.forEach(streamName -> this.streamConfigs.add(buildStreamConfig(streamName)));
422+
}
423+
409424
return this.streamConfigs;
410425
}
411426

Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
/*
2+
* Copyright 2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.aws.kinesis;
18+
19+
import java.util.List;
20+
import java.util.concurrent.CompletableFuture;
21+
22+
import org.junit.jupiter.api.AfterAll;
23+
import org.junit.jupiter.api.BeforeAll;
24+
import org.junit.jupiter.api.Test;
25+
import software.amazon.awssdk.core.SdkBytes;
26+
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
27+
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
28+
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
29+
import software.amazon.awssdk.services.kinesis.model.Consumer;
30+
import software.amazon.kinesis.common.InitialPositionInStream;
31+
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
32+
33+
import org.springframework.beans.factory.annotation.Autowired;
34+
import org.springframework.context.annotation.Bean;
35+
import org.springframework.context.annotation.Configuration;
36+
import org.springframework.integration.aws.LocalstackContainerTest;
37+
import org.springframework.integration.aws.inbound.kinesis.KclMessageDrivenChannelAdapter;
38+
import org.springframework.integration.aws.support.AwsHeaders;
39+
import org.springframework.integration.channel.QueueChannel;
40+
import org.springframework.integration.config.EnableIntegration;
41+
import org.springframework.messaging.Message;
42+
import org.springframework.messaging.PollableChannel;
43+
import org.springframework.test.annotation.DirtiesContext;
44+
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
45+
46+
import static org.assertj.core.api.Assertions.assertThat;
47+
48+
/**
49+
* @author Siddharth Jain
50+
* @author Artem Bilan
51+
*
52+
* @since 3.0
53+
*/
54+
@SpringJUnitConfig
55+
@DirtiesContext
56+
57+
public class KclMessageDrivenChannelAdapterMultiStreamTests implements LocalstackContainerTest {
58+
59+
private static final String TEST_STREAM1 = "MultiStreamKcl1";
60+
61+
private static final String TEST_STREAM2 = "MultiStreamKcl2";
62+
63+
private static KinesisAsyncClient AMAZON_KINESIS;
64+
65+
private static DynamoDbAsyncClient DYNAMO_DB;
66+
67+
private static CloudWatchAsyncClient CLOUD_WATCH;
68+
69+
@Autowired
70+
private PollableChannel kinesisReceiveChannel;
71+
72+
@BeforeAll
73+
static void setup() {
74+
AMAZON_KINESIS = LocalstackContainerTest.kinesisClient();
75+
DYNAMO_DB = LocalstackContainerTest.dynamoDbClient();
76+
CLOUD_WATCH = LocalstackContainerTest.cloudWatchClient();
77+
78+
CompletableFuture<?> completableFuture1 =
79+
AMAZON_KINESIS.createStream(request -> request.streamName(TEST_STREAM1).shardCount(1))
80+
.thenCompose(result -> AMAZON_KINESIS.waiter()
81+
.waitUntilStreamExists(request -> request.streamName(TEST_STREAM1)));
82+
83+
CompletableFuture<?> completableFuture2 =
84+
AMAZON_KINESIS.createStream(request -> request.streamName(TEST_STREAM2).shardCount(1))
85+
.thenCompose(result -> AMAZON_KINESIS.waiter()
86+
.waitUntilStreamExists(request -> request.streamName(TEST_STREAM2)));
87+
88+
CompletableFuture.allOf(completableFuture1, completableFuture2).join();
89+
}
90+
91+
@AfterAll
92+
static void tearDown() {
93+
CompletableFuture<?> completableFuture1 =
94+
AMAZON_KINESIS.deleteStream(request -> request.streamName(TEST_STREAM1).enforceConsumerDeletion(true))
95+
.thenCompose(result -> AMAZON_KINESIS.waiter()
96+
.waitUntilStreamNotExists(request -> request.streamName(TEST_STREAM1)));
97+
98+
CompletableFuture<?> completableFuture2 =
99+
AMAZON_KINESIS.deleteStream(request -> request.streamName(TEST_STREAM2).enforceConsumerDeletion(true))
100+
.thenCompose(result -> AMAZON_KINESIS.waiter()
101+
.waitUntilStreamNotExists(request -> request.streamName(TEST_STREAM2)));
102+
103+
CompletableFuture.allOf(completableFuture1, completableFuture2).join();
104+
}
105+
106+
@Test
107+
public void kclChannelAdapterMultiStream() {
108+
String testData = "test data";
109+
AMAZON_KINESIS.putRecord(request -> request
110+
.streamName(TEST_STREAM1)
111+
.data(SdkBytes.fromUtf8String(testData))
112+
.partitionKey("test"));
113+
114+
String testData2 = "test data 2";
115+
AMAZON_KINESIS.putRecord(request -> request
116+
.streamName(TEST_STREAM2)
117+
.data(SdkBytes.fromUtf8String(testData2))
118+
.partitionKey("test"));
119+
120+
// The below statement works but with a higher timeout. For 2 streams, this takes too long.
121+
Message<?> receive = this.kinesisReceiveChannel.receive(300_000);
122+
assertThat(receive).isNotNull();
123+
assertThat(receive.getPayload()).isEqualTo(testData);
124+
assertThat(receive.getHeaders().get(AwsHeaders.RECEIVED_SEQUENCE_NUMBER, String.class)).isNotEmpty();
125+
126+
receive = this.kinesisReceiveChannel.receive(10_000);
127+
assertThat(receive).isNotNull();
128+
assertThat(receive.getPayload()).isEqualTo(testData2);
129+
130+
List<Consumer> stream1Consumers =
131+
AMAZON_KINESIS.describeStream(request -> request.streamName(TEST_STREAM1))
132+
.thenCompose(describeStreamResponse ->
133+
AMAZON_KINESIS.listStreamConsumers(request ->
134+
request.streamARN(describeStreamResponse.streamDescription().streamARN())))
135+
.join()
136+
.consumers();
137+
138+
List<Consumer> stream2Consumers = AMAZON_KINESIS
139+
.describeStream(request -> request.streamName(TEST_STREAM2))
140+
.thenCompose(describeStreamResponse ->
141+
AMAZON_KINESIS.listStreamConsumers(request ->
142+
request.streamARN(describeStreamResponse.streamDescription().streamARN())))
143+
.join()
144+
.consumers();
145+
146+
assertThat(stream1Consumers).hasSize(1);
147+
assertThat(stream2Consumers).hasSize(1);
148+
}
149+
150+
@Configuration
151+
@EnableIntegration
152+
public static class TestConfiguration {
153+
154+
@Bean
155+
public KclMessageDrivenChannelAdapter kclMessageDrivenChannelAdapter() {
156+
KclMessageDrivenChannelAdapter adapter = new KclMessageDrivenChannelAdapter(
157+
AMAZON_KINESIS, CLOUD_WATCH, DYNAMO_DB, TEST_STREAM1, TEST_STREAM2);
158+
adapter.setOutputChannel(kinesisReceiveChannel());
159+
adapter.setStreamInitialSequence(
160+
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON));
161+
adapter.setConverter(String::new);
162+
adapter.setConsumerGroup("multi_stream_group");
163+
return adapter;
164+
}
165+
166+
@Bean
167+
public PollableChannel kinesisReceiveChannel() {
168+
return new QueueChannel();
169+
}
170+
171+
}
172+
173+
}

src/test/java/org/springframework/integration/aws/kinesis/KclMessageDrivenChannelAdapterTests.java

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,16 @@
1616

1717
package org.springframework.integration.aws.kinesis;
1818

19+
import java.util.List;
20+
1921
import org.junit.jupiter.api.AfterAll;
2022
import org.junit.jupiter.api.BeforeAll;
2123
import org.junit.jupiter.api.Test;
2224
import software.amazon.awssdk.core.SdkBytes;
2325
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
2426
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
2527
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
28+
import software.amazon.awssdk.services.kinesis.model.Consumer;
2629
import software.amazon.kinesis.common.InitialPositionInStream;
2730
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
2831

@@ -44,6 +47,7 @@
4447

4548
/**
4649
* @author Artem Bilan
50+
* @author Siddharth Jain
4751
*
4852
* @since 3.0
4953
*/
@@ -74,26 +78,41 @@ static void setup() {
7478
.join();
7579
}
7680

81+
7782
@AfterAll
7883
static void tearDown() {
79-
AMAZON_KINESIS.deleteStream(request -> request.streamName(TEST_STREAM));
84+
AMAZON_KINESIS
85+
.deleteStream(request -> request.streamName(TEST_STREAM).enforceConsumerDeletion(true))
86+
.thenCompose(result -> AMAZON_KINESIS.waiter()
87+
.waitUntilStreamNotExists(request -> request.streamName(TEST_STREAM)))
88+
.join();
8089
}
8190

8291
@Test
8392
void kclChannelAdapterReceivesRecords() {
8493
String testData = "test data";
8594

8695
AMAZON_KINESIS.putRecord(request ->
87-
request.streamName(TEST_STREAM)
88-
.data(SdkBytes.fromUtf8String(testData))
89-
.partitionKey("test"));
96+
request.streamName(TEST_STREAM)
97+
.data(SdkBytes.fromUtf8String(testData))
98+
.partitionKey("test"));
9099

91100
// We need so long delay because KCL has a more than a minute setup phase.
92-
Message<?> receive = this.kinesisReceiveChannel.receive(120_000);
101+
Message<?> receive = this.kinesisReceiveChannel.receive(300_000);
93102
assertThat(receive).isNotNull();
94103
assertThat(receive.getPayload()).isEqualTo(testData);
95104
assertThat(receive.getHeaders()).containsKey(IntegrationMessageHeaderAccessor.SOURCE_DATA);
96105
assertThat(receive.getHeaders().get(AwsHeaders.RECEIVED_SEQUENCE_NUMBER, String.class)).isNotEmpty();
106+
107+
List<Consumer> streamConsumers =
108+
AMAZON_KINESIS.describeStream(r -> r.streamName(TEST_STREAM))
109+
.thenCompose(describeStreamResponse ->
110+
AMAZON_KINESIS.listStreamConsumers(r ->
111+
r.streamARN(describeStreamResponse.streamDescription().streamARN())))
112+
.join()
113+
.consumers();
114+
115+
assertThat(streamConsumers).hasSize(1);
97116
}
98117

99118
@Configuration
@@ -108,6 +127,7 @@ public KclMessageDrivenChannelAdapter kclMessageDrivenChannelAdapter() {
108127
adapter.setStreamInitialSequence(
109128
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON));
110129
adapter.setConverter(String::new);
130+
adapter.setConsumerGroup("single_stream_group");
111131
adapter.setBindSourceRecord(true);
112132
return adapter;
113133
}

0 commit comments

Comments
 (0)