Skip to content
This repository was archived by the owner on Nov 10, 2025. It is now read-only.

Commit 993982d

Browse files
chrylisartembilan
authored andcommitted
GH-196: Add SNS FIFO support
Fixes #196 * Add support for SNS FIFO message group and deduplication IDs * Add Javadoc, clean up formatting * Fix Javadoc formatting * Add SNS FIFO information to README * Change XML snippet indentation to tabs * Code samples throughout the README mix tabs and spaces, but this snippet was using tabs, so this updates the new lines to match. * Minor code style clean up **Cherry-pick to `main`**
1 parent 484ac67 commit 993982d

File tree

6 files changed

+129
-8
lines changed

6 files changed

+129
-8
lines changed

README.md

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -437,9 +437,14 @@ The Java Config looks like:
437437
@Bean
438438
public MessageHandler snsMessageHandler() {
439439
SnsMessageHandler handler = new SnsMessageHandler(amazonSns());
440-
adapter.setTopicArn("arn:aws:sns:eu-west:123456789012:test);
440+
adapter.setTopicArn("arn:aws:sns:eu-west:123456789012:test");
441441
String bodyExpression = "SnsBodyBuilder.withDefault(payload).forProtocols(payload.substring(0, 140), 'sms')";
442442
handler.setBodyExpression(spelExpressionParser.parseExpression(bodyExpression));
443+
444+
// message-group ID and deduplication ID are used for FIFO topics
445+
handler.setMessageGroupId("foo-messages");
446+
String deduplicationExpression = "headers.id";
447+
handler.setMessageDeduplicationIdExpression(spelExpressionParser.parseExpression(deduplicationExpression))''
443448
return handler;
444449
}
445450
````
@@ -457,12 +462,17 @@ The XML variant may look like:
457462
channel="notificationChannel"
458463
topic-arn="foo"
459464
subject="bar"
465+
message-group-id="foo-messages"
466+
message-deduplication-id-expression="headers.id"
460467
body-expression="payload.toUpperCase()"/>
461468
````
462469

463470
Starting with _version 2.0_, the `SnsMessageHandler` can be configured with the `HeaderMapper` to map message headers to the SNS message attributes.
464471
See `SnsHeaderMapper` implementation for more information and also consult with [Amazon SNS Message Attributes][] about value types and restrictions.
465472

473+
Starting with _version 2.5.3_, the `SnsMessageHandler` supports sending to SNS FIFO topics using the `messageGroupId`/`messageGroupIdExpression`
474+
and `messageDeduplicationIdExpression` properties.
475+
466476
## Metadata Store for Amazon DynamoDB
467477

468478
The `DynamoDbMetadataStore`, a `ConcurrentMetadataStore` implementation, is provided to keep the metadata for Spring Integration components in the distributed Amazon DynamoDB store.

src/main/java/org/springframework/integration/aws/config/xml/SnsOutboundChannelAdapterParser.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2019 the original author or authors.
2+
* Copyright 2016-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -30,6 +30,7 @@
3030
* The parser for the {@code <int-aws:sns-outbound-channel-adapter>}.
3131
*
3232
* @author Artem Bilan
33+
* @author Christopher Smith
3334
*/
3435
public class SnsOutboundChannelAdapterParser extends AbstractOutboundChannelAdapterParser {
3536

@@ -42,6 +43,8 @@ protected AbstractBeanDefinition parseConsumer(Element element, ParserContext pa
4243

4344
AwsParserUtils.populateExpressionAttribute("topic-arn", builder, element, parserContext);
4445
AwsParserUtils.populateExpressionAttribute("subject", builder, element, parserContext);
46+
AwsParserUtils.populateExpressionAttribute("message-group-id", builder, element, parserContext);
47+
AwsParserUtils.populateExpressionAttribute("message-deduplication-id", builder, element, parserContext);
4548

4649
BeanDefinition message = IntegrationNamespaceUtils.createExpressionDefIfAttributeDefined("body-expression",
4750
element);

src/main/java/org/springframework/integration/aws/outbound/SnsMessageHandler.java

Lines changed: 69 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2021 the original author or authors.
2+
* Copyright 2016-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -20,6 +20,7 @@
2020
import java.util.Map;
2121
import java.util.concurrent.Future;
2222

23+
import org.springframework.core.log.LogMessage;
2324
import org.springframework.expression.Expression;
2425
import org.springframework.expression.TypeLocator;
2526
import org.springframework.expression.common.LiteralExpression;
@@ -71,6 +72,8 @@
7172
* </ul>
7273
*
7374
* @author Artem Bilan
75+
* @author Christopher Smith
76+
*
7477
* @see AmazonSNSAsync
7578
* @see PublishRequest
7679
* @see SnsBodyBuilder
@@ -83,6 +86,10 @@ public class SnsMessageHandler extends AbstractAwsMessageHandler<Map<String, Mes
8386

8487
private Expression subjectExpression;
8588

89+
private Expression messageGroupIdExpression;
90+
91+
private Expression messageDeduplicationIdExpression;
92+
8693
private Expression bodyExpression;
8794

8895
private ResourceIdResolver resourceIdResolver;
@@ -113,10 +120,51 @@ public void setSubjectExpression(Expression subjectExpression) {
113120
this.subjectExpression = subjectExpression;
114121
}
115122

123+
/**
124+
* A fixed message-group ID to be set for messages sent to an SNS FIFO topic
125+
* from this handler.
126+
* Equivalent to calling {{@link #setMessageGroupIdExpression(Expression)} with
127+
* a literal string expression.
128+
* @param messageGroupId the group ID to be used for all messages sent from this handler
129+
* @since 2.5.3
130+
*/
131+
public void setMessageGroupId(String messageGroupId) {
132+
Assert.hasText(messageGroupId, "messageGroupId must not be empty.");
133+
this.messageGroupIdExpression = new LiteralExpression(messageGroupId);
134+
}
135+
136+
137+
/**
138+
* The {@link Expression} to determine the
139+
* <a href="https://docs.aws.amazon.com/sns/latest/dg/fifo-message-grouping.html">message group</a>
140+
* for messages sent to an SNS FIFO topic from this handler.
141+
* @param messageGroupIdExpression the {@link Expression} to produce the message-group ID
142+
* @since 2.5.3
143+
*/
144+
public void setMessageGroupIdExpression(Expression messageGroupIdExpression) {
145+
Assert.notNull(messageGroupIdExpression, "messageGroupIdExpression must not be null.");
146+
this.messageGroupIdExpression = messageGroupIdExpression;
147+
}
148+
149+
/**
150+
* The {@link Expression} to determine the deduplication ID for this message.
151+
* SNS FIFO topics
152+
* <a href="https://docs.aws.amazon.com/sns/latest/dg/fifo-message-dedup.html">require a message deduplication ID to be specified</a>,
153+
* either in the adapter configuration or on a {@link PublishRequest} payload
154+
* of the request {@link Message}, unless content-based deduplication is enabled
155+
* on the topic.
156+
* @param messageDeduplicationIdExpression the {@link Expression} to produce the message deduplication ID
157+
* @since 2.5.3
158+
*/
159+
public void setMessageDeduplicationIdExpression(Expression messageDeduplicationIdExpression) {
160+
Assert.notNull(messageDeduplicationIdExpression, "messageDeduplicationIdExpression must not be null.");
161+
this.messageDeduplicationIdExpression = messageDeduplicationIdExpression;
162+
}
163+
116164
/**
117165
* The {@link Expression} to produce the SNS notification message. If it evaluates to
118166
* the {@link SnsBodyBuilder} the {@code messageStructure} of the
119-
* {@link PublishRequest} is set to {@code json}. Otherwise the
167+
* {@link PublishRequest} is set to {@code json}. Otherwise, the
120168
* {@link #getConversionService()} is used to convert the evaluation result to the
121169
* {@link String} without setting the {@code messageStructure}.
122170
* @param bodyExpression the {@link Expression} to produce the SNS notification
@@ -172,6 +220,25 @@ protected Future<?> handleMessageToAws(Message<?> message) {
172220
publishRequest.setSubject(subject);
173221
}
174222

223+
if (this.messageGroupIdExpression != null) {
224+
if (!topicArn.endsWith(".fifo")) {
225+
logger.warn(LogMessage.format("a messageGroupId will be set for non-FIFO topic '%s'", topicArn));
226+
}
227+
String messageGroupId =
228+
this.messageGroupIdExpression.getValue(getEvaluationContext(), message, String.class);
229+
publishRequest.setMessageGroupId(messageGroupId);
230+
}
231+
232+
if (this.messageDeduplicationIdExpression != null) {
233+
if (!topicArn.endsWith(".fifo")) {
234+
logger.warn(
235+
LogMessage.format("a messageDeduplicationId will be set for non-FIFO topic '%s'", topicArn));
236+
}
237+
String messageDeduplicationId =
238+
this.messageDeduplicationIdExpression.getValue(getEvaluationContext(), message, String.class);
239+
publishRequest.setMessageDeduplicationId(messageDeduplicationId);
240+
}
241+
175242
Object snsMessage = message.getPayload();
176243

177244
if (this.bodyExpression != null) {

src/main/resources/org/springframework/integration/aws/config/spring-integration-aws.xsd

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -531,7 +531,7 @@
531531
Asynchronous callback handler for events in the lifecycle of the request. Users can provide an
532532
implementation of the callback methods in this interface to receive notification of successful or
533533
unsuccessful completion of the operation.
534-
By default successful reply is sent to the 'success-channel' and error message to the
534+
By default, successful reply is sent to the 'success-channel' and error message to the
535535
'failure-channel' if they are provided.
536536
</xsd:documentation>
537537
</xsd:annotation>
@@ -960,7 +960,7 @@
960960
A SpEL expression that resolves to an Amazon SNS Topic ARN.
961961
The 'requestMessage' is the root object for evaluation context.
962962
Mutually exclusive with 'topic-arn'.
963-
This attribute isn't mandatory and the the topic can be specified on the
963+
This attribute isn't mandatory and the topic can be specified on the
964964
'com.amazonaws.services.sns.model.PublishRequest'
965965
payload of the request Message.
966966
</xsd:documentation>
@@ -985,6 +985,39 @@
985985
</xsd:documentation>
986986
</xsd:annotation>
987987
</xsd:attribute>
988+
<xsd:attribute name="message-group-id">
989+
<xsd:annotation>
990+
<xsd:documentation>
991+
The message group ID.
992+
Mutually exclusive with 'message-group-id-expression'.
993+
SNS FIFO topics require a message group to be specified, either in
994+
the adapter configuration or on a 'PublishRequest' payload
995+
of the request Message.
996+
</xsd:documentation>
997+
</xsd:annotation>
998+
</xsd:attribute>
999+
<xsd:attribute name="message-group-id-expression">
1000+
<xsd:annotation>
1001+
<xsd:documentation>
1002+
The SpEL expression for the message group ID.
1003+
Mutually exclusive with 'message-group-id'.
1004+
SNS FIFO topics require a message group to be specified, either in
1005+
the adapter configuration or on a 'PublishRequest' payload
1006+
of the request Message.
1007+
</xsd:documentation>
1008+
</xsd:annotation>
1009+
</xsd:attribute>
1010+
<xsd:attribute name="message-deduplication-id-expression">
1011+
<xsd:annotation>
1012+
<xsd:documentation>
1013+
The SpEL expression for the message deduplication ID.
1014+
SNS FIFO topics require a message deduplication ID to be specified, either in
1015+
the adapter configuration or on a 'PublishRequest' payload
1016+
of the request Message, unless content-based deduplication is enabled
1017+
on the topic.
1018+
</xsd:documentation>
1019+
</xsd:annotation>
1020+
</xsd:attribute>
9881021
<xsd:attribute name="body-expression">
9891022
<xsd:annotation>
9901023
<xsd:documentation>

src/test/java/org/springframework/integration/aws/config/xml/SnsOutboundChannelAdapterParserTests.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2021 the original author or authors.
2+
* Copyright 2016-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -36,6 +36,7 @@
3636

3737
/**
3838
* @author Artem Bilan
39+
* @author Christopher Smith
3940
*/
4041
@SpringJUnitConfig
4142
@DirtiesContext
@@ -79,6 +80,8 @@ void testSnsOutboundChannelAdapterDefaultParser() {
7980
assertThat(TestUtils.getPropertyValue(this.defaultAdapterHandler, "amazonSns")).isSameAs(this.amazonSns);
8081
assertThat(TestUtils.getPropertyValue(this.defaultAdapterHandler, "evaluationContext")).isNotNull();
8182
assertThat(TestUtils.getPropertyValue(this.defaultAdapterHandler, "topicArnExpression")).isNull();
83+
assertThat(TestUtils.getPropertyValue(this.defaultAdapterHandler, "messageGroupIdExpression")).isNull();
84+
assertThat(TestUtils.getPropertyValue(this.defaultAdapterHandler, "messageDeduplicationIdExpression")).isNull();
8285
assertThat(TestUtils.getPropertyValue(this.defaultAdapterHandler, "subjectExpression")).isNull();
8386
assertThat(TestUtils.getPropertyValue(this.defaultAdapterHandler, "bodyExpression")).isNull();
8487
assertThat(TestUtils.getPropertyValue(this.defaultAdapterHandler, "resourceIdResolver"))

src/test/java/org/springframework/integration/aws/outbound/SnsMessageHandlerTests.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2019 the original author or authors.
2+
* Copyright 2016-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -55,6 +55,7 @@
5555

5656
/**
5757
* @author Artem Bilan
58+
* @author Christopher Smith
5859
*/
5960
@SpringJUnitConfig
6061
@DirtiesContext
@@ -92,6 +93,8 @@ void testSnsMessageHandler() {
9293
assertThat(publishRequest.getMessageStructure()).isEqualTo("json");
9394
assertThat(publishRequest.getTopicArn()).isEqualTo("topic");
9495
assertThat(publishRequest.getSubject()).isEqualTo("subject");
96+
assertThat(publishRequest.getMessageGroupId()).isEqualTo("SUBJECT");
97+
assertThat(publishRequest.getMessageDeduplicationId()).isEqualTo("BAR");
9598
assertThat(publishRequest.getMessage())
9699
.isEqualTo("{\"default\":\"foo\",\"sms\":\"{\\\"foo\\\" : \\\"bar\\\"}\"}");
97100

@@ -136,6 +139,8 @@ public PollableChannel resultChannel() {
136139
public MessageHandler snsMessageHandler() {
137140
SnsMessageHandler snsMessageHandler = new SnsMessageHandler(amazonSNS());
138141
snsMessageHandler.setTopicArnExpression(PARSER.parseExpression("headers.topic"));
142+
snsMessageHandler.setMessageGroupIdExpression(PARSER.parseExpression("headers.subject.toUpperCase()"));
143+
snsMessageHandler.setMessageDeduplicationIdExpression(PARSER.parseExpression("headers.foo.toUpperCase()"));
139144
snsMessageHandler.setSubjectExpression(PARSER.parseExpression("headers.subject"));
140145
snsMessageHandler.setBodyExpression(PARSER.parseExpression("payload"));
141146
snsMessageHandler.setOutputChannel(resultChannel());

0 commit comments

Comments
 (0)