Skip to content
This repository was archived by the owner on Nov 10, 2025. It is now read-only.

Commit aefb5c2

Browse files
javarnoartembilan
authored andcommitted
GH-142: Configure workerId in KCL adapter
* Allow to modify the worker identifier used in KCL adapter. Fixes #142
1 parent f635861 commit aefb5c2

File tree

1 file changed

+13
-1
lines changed

1 file changed

+13
-1
lines changed

src/main/java/org/springframework/integration/aws/inbound/kinesis/KclMessageDrivenChannelAdapter.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,8 @@ public class KclMessageDrivenChannelAdapter extends MessageProducerSupport {
103103

104104
private CheckpointMode checkpointMode = CheckpointMode.batch;
105105

106+
private String workerId = UUID.randomUUID().toString();
107+
106108
public KclMessageDrivenChannelAdapter(String streams) {
107109
this(streams, AmazonKinesisClientBuilder.defaultClient(),
108110
AmazonCloudWatchClientBuilder.defaultClient(), AmazonDynamoDBClientBuilder.defaultClient(),
@@ -176,6 +178,16 @@ public void setCheckpointMode(CheckpointMode checkpointMode) {
176178
this.checkpointMode = checkpointMode;
177179
}
178180

181+
/**
182+
* Sets the worker identifier used to distinguish different
183+
* workers/processes of a Kinesis application.
184+
* @param workerId the worker identifier to use
185+
*/
186+
public void setWorkerId(String workerId) {
187+
Assert.hasText(workerId, "'workerId' must not be null or empty");
188+
this.workerId = workerId;
189+
}
190+
179191
@Override
180192
protected void onInit() {
181193
super.onInit();
@@ -189,7 +201,7 @@ protected void onInit() {
189201
this.kinesisProxyCredentialsProvider,
190202
null, null,
191203
KinesisClientLibConfiguration.DEFAULT_FAILOVER_TIME_MILLIS,
192-
UUID.randomUUID().toString(),
204+
this.workerId,
193205
KinesisClientLibConfiguration.DEFAULT_MAX_RECORDS,
194206
this.idleBetweenPolls,
195207
false,

0 commit comments

Comments
 (0)