Skip to content

Conversation

@igormq
Copy link

@igormq igormq commented Nov 24, 2025

📢 Type of change

  • Bugfix
  • New feature
  • Enhancement
  • Refactoring

📜 Description

Fixes trace context propagation for FIFO queues when using SqsTemplate.sendAsync(). The issue occurred on the first call to a FIFO queue where trace headers (traceparent, tracestate, etc.) were not propagated due to observation creation happening on the wrong thread after an async operation.

💡 Motivation and Context

When using SqsTemplate.sendAsync() to send messages to a FIFO queue (.fifo), the trace context (traceId/spanId from Micrometer Observation or distributed tracing) was not propagated on the first call, but worked correctly on subsequent calls when queue attributes were cached.

Observed Behavior

// Test starts with traceId: 32dc16b916b8efd6d701100c325a157e

// First message - WRONG traceId (trace headers missing or new trace generated)
thread=[sdk-async-thread] traceId=e9e60bf0ee86c01ff6fcc78339dd9145 message=Processing message...

// Second message - CORRECT traceId (matches calling thread's context)
thread=[main] traceId=32dc16b916b8efd6d701100c325a157e message=Processing message...

Root Cause Analysis

Since Spring Cloud AWS 3.0.1 (PR #799), SqsTemplate automatically detects whether a FIFO queue has ContentBasedDeduplication enabled. This detection is asynchronous and involves calling the AWS SQS API to fetch queue attributes.

The Bug

The observation (which captures trace context and adds trace headers) was created in AbstractMessagingTemplate.observeAndSendAsync(), which is called in a thenCompose callback:

// BEFORE (Broken)
return preProcessMessageForSendAsync(endpointToUse, message)  // Async for FIFO queues
        .thenCompose(messageToUse -> observeAndSendAsync(messageToUse, endpointToUse));
        //                           ↑ Observation created HERE - wrong thread!

Why It Failed on First Call But Worked on Second Call

CompletableFuture.thenCompose() behavior:

  • First call (queue attributes not cached): The future completes on the SDK async thread (sdk-async-*), so the thenCompose callback runs on that thread. The observation is created on the SDK thread, which doesn't have the calling thread's trace context.
  • Second call (queue attributes cached): The future is already completed, so thenCompose runs synchronously on the calling thread, which has the correct trace context.

Move observation creation and trace header capture to before the async preprocessing, ensuring it always happens on the calling thread with the correct trace context:

// AFTER (Fixed)
// Step 1: Create observation and capture trace headers on calling thread
AbstractTemplateObservation.Context context = this.observationSpecifics.createContext(message, endpointToUse);
Observation observation = startObservation(context);
Map<String, Object> carrier = Objects.requireNonNull(context.getCarrier(), "No carrier found in context.");
Message<T> messageWithObservationHeaders = MessageHeaderUtils.addHeadersIfAbsent(message, carrier);

// Step 2: Pass message with trace headers through async preprocessing
return preProcessMessageForSendAsync(endpointToUse, messageWithObservationHeaders)
        .thenCompose(messageToUse -> doSendAndCompleteObservation(messageToUse, endpointToUse, context, observation));

Key Changes

File: spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/operations/AbstractMessagingTemplate.java

  1. Moved observation creation (startObservation()) to execute before preProcessMessageForSendAsync()
  2. Added trace headers to the message before async operations
  3. Introduced doSendAndCompleteObservation() helper method that sends and completes an existing observation (rather than creating a new one)
  4. Removed observeAndSendAsync()

💚 How did you test it?

  • Added SqsTemplateFifoTracingIntegrationTest. If you run this test against the old code, the first one sendAsync_toFifoQueue_shouldPropagateObservationScopeOnFirstCall will fail.

📝 Checklist

  • I reviewed submitted code
  • I added tests to verify changes
  • I updated reference documentation to reflect the change - Not needed
  • All tests passing
  • No breaking changes

🔮 Next steps

@github-actions github-actions bot added the component: sqs SQS integration related issue label Nov 24, 2025
@igormq igormq force-pushed the fix/context-propagation-fifo branch from e25fafe to 2bc3c08 Compare November 24, 2025 11:03
@igormq igormq force-pushed the fix/context-propagation-fifo branch from 2bc3c08 to f793125 Compare November 24, 2025 11:05
@igormq
Copy link
Author

igormq commented Nov 24, 2025

If you agree to this pr, can we put this in the next release?

@MatejNedic
Copy link
Member

@tomazfernandes after your review we can include this in 4.0.0-M2

4.0.0-M1 is to be released really soon and we sadly won't have time to review and merge before release.

@tomazfernandes
Copy link
Contributor

Hey @igormq, thanks for the detailed analysis and PR.

You're correct that, when TemplateContentBasedDeduplication is set to AUTO, the first call to fetch ContentBasedDeduplication settings changes threads and hence loses tracing / observation information.

The implementation you propose addresses that, but given Message is an immutable object, the ObservationContext is created without the FIFO-specific headers that are generated by the framework. Please check the failing observesMessageFifo test.

A short-term workaround is to explicitly set TemplateContentBasedDeduplication to ENABLED and supply your own MessageDeduplicationId header, or to DISABLED and let the framework one.

For context, are you using sendAsync as part of a CompletableFuture chain, or as a fire-and-forget inside a synchronous flow?

Let me know your thoughts, thanks.

@igormq
Copy link
Author

igormq commented Dec 4, 2025

Hey @tomazfernandes , thanks for the detailed review and for identifying the issue!

You're absolutely right about the problem - my implementation creates the ObservationContext before the FIFO-specific headers are added by the framework, which means those headers are missing from the observation.

To answer your question about the use case: I'm using send inside a synchronous flow where I care about whether the message was sent successfully (so not fire-and-forget).

I understand the short-term workaround you suggested (explicitly setting TemplateContentBasedDeduplication to ENABLED or DISABLED), but this would require users to be aware of this limitation and work around it, which isn't ideal.

The proper fix is to:

  1. Capture the parent observation on the calling thread
  2. Allow preProcessMessageForSendAsync to add the FIFO headers asynchronously
  3. Create the observation context AFTER the FIFO headers are added (so it captures them correctly)
  4. Link the new observation to the parent observation to maintain trace continuity

I'll update the PR with this approach. Let me know if you have any concerns with this solution!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

component: sqs SQS integration related issue

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants