From d0d2b6fb135eb8ff23f3ed424ce7759767f75c16 Mon Sep 17 00:00:00 2001 From: Matt Pavlovich Date: Sat, 22 Nov 2025 11:41:36 -0600 Subject: [PATCH] [AMQ-9811] Harden SendDuplicateFromStoreToDLQTest --- .../SendDuplicateFromStoreToDLQTest.java | 41 ++++++++++++------- 1 file changed, 27 insertions(+), 14 deletions(-) diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/SendDuplicateFromStoreToDLQTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/SendDuplicateFromStoreToDLQTest.java index 14b2b22e966..a96b337fcb0 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/SendDuplicateFromStoreToDLQTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/SendDuplicateFromStoreToDLQTest.java @@ -54,6 +54,9 @@ @RunWith(value = Parameterized.class) public class SendDuplicateFromStoreToDLQTest { + // [AMQ-9811] Async ack can lead to test instability + private static final Boolean FORCE_CONCURRENTSTOREANDDISPATCHQUEUES = Boolean.FALSE; + @Parameterized.Parameters(name="sendDupToDLQ={1},cacheEnable={2},auditEnabled={3},optimizedDispatch={4}") public static Collection data() { return Arrays.asList(new Object[][] { @@ -108,6 +111,9 @@ public void setUp() throws Exception { broker.getSystemUsage().getMemoryUsage().setLimit(1024l * 1024 * 64); KahaDBPersistenceAdapter persistenceAdapter = new KahaDBPersistenceAdapter(); persistenceAdapter.setDirectory(new File(testDataDir, "kahadb")); + if(FORCE_CONCURRENTSTOREANDDISPATCHQUEUES != null) { + persistenceAdapter.setConcurrentStoreAndDispatchQueues(FORCE_CONCURRENTSTOREANDDISPATCHQUEUES); + } broker.setPersistenceAdapter(persistenceAdapter); broker.addConnector("tcp://localhost:0"); broker.start(); @@ -127,7 +133,7 @@ public void tearDown() throws Exception { broker.stop(); } - @Test + @Test(timeout = 15000) public void testSendDuplicateFromStoreToDLQScenario() throws Exception { applyPolicy(sendDuplicateFromStoreToDLQEnabled, useCacheEnabled, auditEnabled, optimizedDispatch); boolean sendExpected = (sendDuplicateFromStoreToDLQEnabled != null ? sendDuplicateFromStoreToDLQEnabled : new PolicyEntry().isSendDuplicateFromStoreToDLQ()); @@ -137,8 +143,8 @@ public void testSendDuplicateFromStoreToDLQScenario() throws Exception { protected void doProcessSendDuplicateFromStoreToDLQ(boolean sendExpected) throws Exception { createQueue("AMQ.8397"); org.apache.activemq.broker.region.Queue queue = (org.apache.activemq.broker.region.Queue)broker.getDestination(new ActiveMQQueue("AMQ.8397")); - assertEquals(Long.valueOf(0l), Long.valueOf(queue.getDestinationStatistics().getMessages().getCount())); - assertEquals(Long.valueOf(0l), Long.valueOf(queue.getDestinationStatistics().getDuplicateFromStore().getCount())); + assertEquals(Long.valueOf(0L), Long.valueOf(queue.getDestinationStatistics().getMessages().getCount())); + assertEquals(Long.valueOf(0L), Long.valueOf(queue.getDestinationStatistics().getDuplicateFromStore().getCount())); MessageConsumer messageConsumer = session.createConsumer(session.createQueue("AMQ.8397")); producer.send(session.createTextMessage("Hello world!")); @@ -148,7 +154,7 @@ protected void doProcessSendDuplicateFromStoreToDLQ(boolean sendExpected) throws boolean found=false; Message recvMessage = null; do { - recvMessage = messageConsumer.receive(200l); + recvMessage = messageConsumer.receive(200L); if(recvMessage != null) { found = true; } @@ -164,19 +170,27 @@ protected void doProcessSendDuplicateFromStoreToDLQ(boolean sendExpected) throws org.apache.activemq.broker.region.Queue dlq = (org.apache.activemq.broker.region.Queue)broker.getDestination(new ActiveMQQueue("ActiveMQ.DLQ.Queue.AMQ.8397")); if(sendExpected) { - assertEquals(Long.valueOf(0l), Long.valueOf(queue.getDestinationStatistics().getMessages().getCount())); - assertEquals(Long.valueOf(1l), Long.valueOf(queue.getDestinationStatistics().getDuplicateFromStore().getCount())); - assertEquals(Long.valueOf(1l), Long.valueOf(dlq.getDestinationStatistics().getMessages().getCount())); - assertEquals(Long.valueOf(0l), Long.valueOf(dlq.getDestinationStatistics().getDuplicateFromStore().getCount())); + assertEquals(Long.valueOf(0L), Long.valueOf(queue.getDestinationStatistics().getMessages().getCount())); + assertEquals(Long.valueOf(1L), Long.valueOf(queue.getDestinationStatistics().getDuplicateFromStore().getCount())); + assertEquals(Long.valueOf(1L), Long.valueOf(dlq.getDestinationStatistics().getMessages().getCount())); + assertEquals(Long.valueOf(0L), Long.valueOf(dlq.getDestinationStatistics().getDuplicateFromStore().getCount())); } else { - assertEquals(Long.valueOf(0l), Long.valueOf(queue.getDestinationStatistics().getMessages().getCount())); - assertEquals(Long.valueOf(1l), Long.valueOf(queue.getDestinationStatistics().getDuplicateFromStore().getCount())); - assertEquals(Long.valueOf(0l), Long.valueOf(dlq.getDestinationStatistics().getMessages().getCount())); - assertEquals(Long.valueOf(0l), Long.valueOf(dlq.getDestinationStatistics().getDuplicateFromStore().getCount())); + // [AMQ-9811] Retry and delay a bit to allow value to be reflected + int maxRetries = 200; + int retryCount = 0; + + while(queue.getDestinationStatistics().getMessages().getCount() != 0L && retryCount < maxRetries) { + retryCount++; + Thread.sleep(50L); + } + assertEquals(Long.valueOf(0L), Long.valueOf(queue.getDestinationStatistics().getMessages().getCount())); + assertEquals(Long.valueOf(1L), Long.valueOf(queue.getDestinationStatistics().getDuplicateFromStore().getCount())); + assertEquals(Long.valueOf(0L), Long.valueOf(dlq.getDestinationStatistics().getMessages().getCount())); + assertEquals(Long.valueOf(0L), Long.valueOf(dlq.getDestinationStatistics().getDuplicateFromStore().getCount())); } } - private PolicyMap applyPolicy(Boolean sendDuplicateFromStoreToDLQEnabled, Boolean useCacheEnabled, Boolean auditEnabled, Boolean optimizedDispatch) { + private void applyPolicy(Boolean sendDuplicateFromStoreToDLQEnabled, Boolean useCacheEnabled, Boolean auditEnabled, Boolean optimizedDispatch) { PolicyMap policyMap = new PolicyMap(); PolicyEntry defaultEntry = new PolicyEntry(); @@ -196,7 +210,6 @@ private PolicyMap applyPolicy(Boolean sendDuplicateFromStoreToDLQEnabled, Boolea defaultEntry.setDeadLetterStrategy(new IndividualDeadLetterStrategy()); policyMap.setDefaultEntry(defaultEntry); broker.setDestinationPolicy(policyMap); - return policyMap; } private void createQueue(String queueName) throws Exception {