Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@
@RunWith(value = Parameterized.class)
public class SendDuplicateFromStoreToDLQTest {

// [AMQ-9811] Async ack can lead to test instability
private static final Boolean FORCE_CONCURRENTSTOREANDDISPATCHQUEUES = Boolean.FALSE;

@Parameterized.Parameters(name="sendDupToDLQ={1},cacheEnable={2},auditEnabled={3},optimizedDispatch={4}")
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] {
Expand Down Expand Up @@ -108,6 +111,9 @@ public void setUp() throws Exception {
broker.getSystemUsage().getMemoryUsage().setLimit(1024l * 1024 * 64);
KahaDBPersistenceAdapter persistenceAdapter = new KahaDBPersistenceAdapter();
persistenceAdapter.setDirectory(new File(testDataDir, "kahadb"));
if(FORCE_CONCURRENTSTOREANDDISPATCHQUEUES != null) {
persistenceAdapter.setConcurrentStoreAndDispatchQueues(FORCE_CONCURRENTSTOREANDDISPATCHQUEUES);
}
broker.setPersistenceAdapter(persistenceAdapter);
broker.addConnector("tcp://localhost:0");
broker.start();
Expand All @@ -127,7 +133,7 @@ public void tearDown() throws Exception {
broker.stop();
}

@Test
@Test(timeout = 15000)
public void testSendDuplicateFromStoreToDLQScenario() throws Exception {
applyPolicy(sendDuplicateFromStoreToDLQEnabled, useCacheEnabled, auditEnabled, optimizedDispatch);
boolean sendExpected = (sendDuplicateFromStoreToDLQEnabled != null ? sendDuplicateFromStoreToDLQEnabled : new PolicyEntry().isSendDuplicateFromStoreToDLQ());
Expand All @@ -137,8 +143,8 @@ public void testSendDuplicateFromStoreToDLQScenario() throws Exception {
protected void doProcessSendDuplicateFromStoreToDLQ(boolean sendExpected) throws Exception {
createQueue("AMQ.8397");
org.apache.activemq.broker.region.Queue queue = (org.apache.activemq.broker.region.Queue)broker.getDestination(new ActiveMQQueue("AMQ.8397"));
assertEquals(Long.valueOf(0l), Long.valueOf(queue.getDestinationStatistics().getMessages().getCount()));
assertEquals(Long.valueOf(0l), Long.valueOf(queue.getDestinationStatistics().getDuplicateFromStore().getCount()));
assertEquals(Long.valueOf(0L), Long.valueOf(queue.getDestinationStatistics().getMessages().getCount()));
assertEquals(Long.valueOf(0L), Long.valueOf(queue.getDestinationStatistics().getDuplicateFromStore().getCount()));

MessageConsumer messageConsumer = session.createConsumer(session.createQueue("AMQ.8397"));
producer.send(session.createTextMessage("Hello world!"));
Expand All @@ -148,7 +154,7 @@ protected void doProcessSendDuplicateFromStoreToDLQ(boolean sendExpected) throws
boolean found=false;
Message recvMessage = null;
do {
recvMessage = messageConsumer.receive(200l);
recvMessage = messageConsumer.receive(200L);
if(recvMessage != null) {
found = true;
}
Expand All @@ -164,19 +170,27 @@ protected void doProcessSendDuplicateFromStoreToDLQ(boolean sendExpected) throws
org.apache.activemq.broker.region.Queue dlq = (org.apache.activemq.broker.region.Queue)broker.getDestination(new ActiveMQQueue("ActiveMQ.DLQ.Queue.AMQ.8397"));

if(sendExpected) {
assertEquals(Long.valueOf(0l), Long.valueOf(queue.getDestinationStatistics().getMessages().getCount()));
assertEquals(Long.valueOf(1l), Long.valueOf(queue.getDestinationStatistics().getDuplicateFromStore().getCount()));
assertEquals(Long.valueOf(1l), Long.valueOf(dlq.getDestinationStatistics().getMessages().getCount()));
assertEquals(Long.valueOf(0l), Long.valueOf(dlq.getDestinationStatistics().getDuplicateFromStore().getCount()));
assertEquals(Long.valueOf(0L), Long.valueOf(queue.getDestinationStatistics().getMessages().getCount()));
assertEquals(Long.valueOf(1L), Long.valueOf(queue.getDestinationStatistics().getDuplicateFromStore().getCount()));
assertEquals(Long.valueOf(1L), Long.valueOf(dlq.getDestinationStatistics().getMessages().getCount()));
assertEquals(Long.valueOf(0L), Long.valueOf(dlq.getDestinationStatistics().getDuplicateFromStore().getCount()));
} else {
assertEquals(Long.valueOf(0l), Long.valueOf(queue.getDestinationStatistics().getMessages().getCount()));
assertEquals(Long.valueOf(1l), Long.valueOf(queue.getDestinationStatistics().getDuplicateFromStore().getCount()));
assertEquals(Long.valueOf(0l), Long.valueOf(dlq.getDestinationStatistics().getMessages().getCount()));
assertEquals(Long.valueOf(0l), Long.valueOf(dlq.getDestinationStatistics().getDuplicateFromStore().getCount()));
// [AMQ-9811] Retry and delay a bit to allow value to be reflected
int maxRetries = 200;
int retryCount = 0;

while(queue.getDestinationStatistics().getMessages().getCount() != 0L && retryCount < maxRetries) {
retryCount++;
Thread.sleep(50L);
}
assertEquals(Long.valueOf(0L), Long.valueOf(queue.getDestinationStatistics().getMessages().getCount()));
assertEquals(Long.valueOf(1L), Long.valueOf(queue.getDestinationStatistics().getDuplicateFromStore().getCount()));
assertEquals(Long.valueOf(0L), Long.valueOf(dlq.getDestinationStatistics().getMessages().getCount()));
assertEquals(Long.valueOf(0L), Long.valueOf(dlq.getDestinationStatistics().getDuplicateFromStore().getCount()));
}
}

private PolicyMap applyPolicy(Boolean sendDuplicateFromStoreToDLQEnabled, Boolean useCacheEnabled, Boolean auditEnabled, Boolean optimizedDispatch) {
private void applyPolicy(Boolean sendDuplicateFromStoreToDLQEnabled, Boolean useCacheEnabled, Boolean auditEnabled, Boolean optimizedDispatch) {
PolicyMap policyMap = new PolicyMap();
PolicyEntry defaultEntry = new PolicyEntry();

Expand All @@ -196,7 +210,6 @@ private PolicyMap applyPolicy(Boolean sendDuplicateFromStoreToDLQEnabled, Boolea
defaultEntry.setDeadLetterStrategy(new IndividualDeadLetterStrategy());
policyMap.setDefaultEntry(defaultEntry);
broker.setDestinationPolicy(policyMap);
return policyMap;
}

private void createQueue(String queueName) throws Exception {
Expand Down