Skip to content
This repository was archived by the owner on Nov 10, 2025. It is now read-only.

Commit 10d4652

Browse files
committed
Upgrade dependencies; prepare for release
Fixes #162 * Fix deprecations
1 parent 8a959d0 commit 10d4652

File tree

4 files changed

+26
-13
lines changed

4 files changed

+26
-13
lines changed

build.gradle

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,9 @@ plugins {
55
id 'jacoco'
66
id 'org.sonarqube' version '3.0'
77
id 'checkstyle'
8-
id 'org.ajoberstar.grgit' version '4.0.2'
8+
id 'org.ajoberstar.grgit' version '4.1.0'
99
id "io.spring.dependency-management" version '1.0.10.RELEASE'
10-
id 'com.jfrog.artifactory' version '4.17.0'
10+
id 'com.jfrog.artifactory' version '4.17.2'
1111
}
1212

1313
description = 'Spring Integration AWS Support'
@@ -23,17 +23,17 @@ repositories {
2323
}
2424

2525
ext {
26-
assertjVersion = '3.16.1'
26+
assertjVersion = '3.18.0'
2727
awaitilityVersion = '4.0.3'
2828
dynamodbLockClientVersion = '1.1.0'
29-
jacksonVersion = '2.11.2'
29+
jacksonVersion = '2.11.3'
3030
junitVersion = '5.6.2'
3131
servletApiVersion = '4.0.1'
3232
localstackVersion = '0.1.22'
3333
log4jVersion = '2.13.3'
3434
springCloudAwsVersion = '2.2.4.RELEASE'
35-
springIntegrationVersion = '5.2.8.RELEASE'
36-
kinesisClientVersion = '1.13.3'
35+
springIntegrationVersion = '5.3.3.RELEASE'
36+
kinesisClientVersion = '1.14.0'
3737
kinesisProducerVersion = '0.14.1'
3838

3939
idPrefix = 'aws'
@@ -95,7 +95,7 @@ jacoco {
9595

9696
checkstyle {
9797
configDirectory.set(rootProject.file("src/checkstyle"))
98-
toolVersion = '8.35'
98+
toolVersion = '8.36.2'
9999
}
100100

101101
dependencies {
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
distributionBase=GRADLE_USER_HOME
22
distributionPath=wrapper/dists
3-
distributionUrl=https\://services.gradle.org/distributions/gradle-6.6-bin.zip
3+
distributionUrl=https\://services.gradle.org/distributions/gradle-6.7-bin.zip
44
zipStoreBase=GRADLE_USER_HOME
55
zipStorePath=wrapper/dists

src/main/java/org/springframework/integration/aws/inbound/kinesis/KclMessageDrivenChannelAdapter.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@
6464
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
6565
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
6666
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
67+
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.SimpleRecordsFetcherFactory;
6768
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
6869
import com.amazonaws.services.kinesis.model.Record;
6970

@@ -273,6 +274,7 @@ protected void onInit() {
273274
new KinesisClientLibConfiguration(this.consumerGroup,
274275
this.stream,
275276
null,
277+
null,
276278
this.streamInitialSequence,
277279
this.kinesisProxyCredentialsProvider,
278280
null,
@@ -294,8 +296,11 @@ protected void onInit() {
294296
KinesisClientLibConfiguration.DEFAULT_VALIDATE_SEQUENCE_NUMBER_BEFORE_CHECKPOINTING,
295297
null,
296298
KinesisClientLibConfiguration.DEFAULT_SHUTDOWN_GRACE_MILLIS,
297-
KinesisClientLibConfiguration.DEFAULT_DDB_BILLING_MODE
298-
);
299+
KinesisClientLibConfiguration.DEFAULT_DDB_BILLING_MODE,
300+
new SimpleRecordsFetcherFactory(),
301+
0,
302+
0,
303+
0);
299304
}
300305

301306
this.consumerGroup = this.config.getApplicationName();

src/main/java/org/springframework/integration/aws/inbound/kinesis/KinesisMessageDrivenChannelAdapter.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -577,7 +577,7 @@ private List<Shard> readShardList(String stream, int retryCount) {
577577

578578
try {
579579
Thread.sleep(this.describeStreamBackoff);
580-
readShardList(stream, retryCount++);
580+
readShardList(stream, retryCount + 1);
581581
}
582582
catch (InterruptedException ex) {
583583
Thread.currentThread().interrupt();
@@ -653,10 +653,10 @@ private List<Shard> detectShardsToConsume(String stream, int retry) {
653653
String exceptionMessage = "Got an exception when processing shards in stream [" + stream + "]";
654654
logger.info(exceptionMessage + ".\n Retrying... ", e);
655655
if (retry > 5) {
656-
throw new IllegalStateException("Error processing shards in stream [\" + stream + \"].", e);
656+
throw new IllegalStateException(exceptionMessage, e);
657657
}
658658
//Retry
659-
detectShardsToConsume(stream, retry++);
659+
detectShardsToConsume(stream, retry + 1);
660660
sleep(this.describeStreamBackoff, new IllegalStateException(exceptionMessage), false);
661661
}
662662

@@ -694,6 +694,9 @@ private void populateShardsForStream(final String stream, final CountDownLatch s
694694
}
695695
}
696696
}
697+
catch (Exception ex) {
698+
logger.error("Error population shards for stream: " + stream, ex);
699+
}
697700
finally {
698701
if (shardsGatherLatch != null) {
699702
shardsGatherLatch.countDown();
@@ -866,6 +869,7 @@ public void run() {
866869
public boolean isLongLived() {
867870
return true;
868871
}
872+
869873
}
870874

871875
private final class ShardConsumer {
@@ -1268,6 +1272,7 @@ private void checkpointIfPeriodicMode(@Nullable Record record) {
12681272
public String toString() {
12691273
return "ShardConsumer{" + "shardOffset=" + this.shardOffset + ", state=" + this.state + '}';
12701274
}
1275+
12711276
}
12721277

12731278
private final class ConsumerInvoker implements SchedulingAwareRunnable {
@@ -1346,6 +1351,7 @@ public void run() {
13461351
public boolean isLongLived() {
13471352
return true;
13481353
}
1354+
13491355
}
13501356

13511357
private final class ShardConsumerManager implements SchedulingAwareRunnable {
@@ -1445,5 +1451,7 @@ public void run() {
14451451
public boolean isLongLived() {
14461452
return true;
14471453
}
1454+
14481455
}
1456+
14491457
}

0 commit comments

Comments
 (0)