3232package com .rabbitmq .client .test .functional ;
3333
3434import com .rabbitmq .client .AMQP ;
35+ import com .rabbitmq .client .GetResponse ;
3536import com .rabbitmq .client .test .BrokerTestCase ;
3637
3738import java .io .IOException ;
4344 */
4445public class PerQueueTTL extends BrokerTestCase {
4546
47+ private static final String TTL_EXCHANGE = "ttl.exchange" ;
48+
4649 private static final String TTL_ARG = "x-message-ttl" ;
4750
4851 private static final String TTL_QUEUE_NAME = "queue.ttl" ;
4952
5053 private static final String TTL_INVALID_QUEUE_NAME = "invalid.queue.ttl" ;
5154
55+ @ Override
56+ protected void createResources () throws IOException {
57+ this .channel .exchangeDeclare (TTL_EXCHANGE , "direct" );
58+ }
59+
60+ @ Override
61+ protected void releaseResources () throws IOException {
62+ this .channel .exchangeDelete (TTL_EXCHANGE );
63+ }
64+
5265 public void testCreateQueueWithTTL () throws IOException {
53- AMQP .Queue .DeclareOk declareOk = declareQueue (TTL_QUEUE_NAME , 2000 );
66+ AMQP .Queue .DeclareOk declareOk = declareQueue (TTL_QUEUE_NAME , 2000L );
5467 assertNotNull (declareOk );
5568 }
5669
@@ -72,9 +85,70 @@ public void testCreateQueueWithZeroTTL() throws Exception {
7285 }
7386 }
7487
88+ /*
89+ * Test messages expire when using basic get.
90+ */
91+ public void testPublishAndGetWithExpiry () throws Exception {
92+ long ttl = 2000 ;
93+ declareQueue (TTL_QUEUE_NAME , ttl );
94+ this .channel .queueBind (TTL_QUEUE_NAME , TTL_EXCHANGE , TTL_QUEUE_NAME );
95+
96+ byte [] msg1 = "one" .getBytes ();
97+ byte [] msg2 = "two" .getBytes ();
98+ byte [] msg3 = "three" .getBytes ();
99+
100+ this .channel .basicPublish (TTL_EXCHANGE , TTL_QUEUE_NAME , null , msg1 );
101+ Thread .sleep (1500 );
102+
103+ this .channel .basicPublish (TTL_EXCHANGE , TTL_QUEUE_NAME , null , msg2 );
104+ Thread .sleep (1000 );
105+
106+ this .channel .basicPublish (TTL_EXCHANGE , TTL_QUEUE_NAME , null , msg3 );
107+
108+ assertEquals ("two" , new String (get ()));
109+ assertEquals ("three" , new String (get ()));
110+
111+ }
112+
113+ /*
114+ * Test get expiry for messages sent under a transaction
115+ */
116+ public void testTransactionalPublishWithGet () throws Exception {
117+ long ttl = 1000 ;
118+ declareQueue (TTL_QUEUE_NAME , ttl );
119+ this .channel .queueBind (TTL_QUEUE_NAME , TTL_EXCHANGE , TTL_QUEUE_NAME );
120+
121+ byte [] msg1 = "one" .getBytes ();
122+ byte [] msg2 = "two" .getBytes ();
123+
124+ this .channel .txSelect ();
125+
126+ this .channel .basicPublish (TTL_EXCHANGE , TTL_QUEUE_NAME , null , msg1 );
127+ Thread .sleep (1500 );
128+
129+ this .channel .basicPublish (TTL_EXCHANGE , TTL_QUEUE_NAME , null , msg2 );
130+ this .channel .txCommit ();
131+ Thread .sleep (500 );
132+
133+ assertEquals ("one" , new String (get ()));
134+ Thread .sleep (800 );
135+
136+ assertNull (get ());
137+ }
138+
139+
140+ private byte [] get () throws IOException {
141+ GetResponse response = this .channel .basicGet (TTL_QUEUE_NAME , false );
142+ if (response == null ) {
143+ return null ;
144+ }
145+ return response .getBody ();
146+ }
147+
75148 private AMQP .Queue .DeclareOk declareQueue (String name , Object ttlValue ) throws IOException {
76- AMQP .Queue .DeclareOk declareOk = this .channel .queueDeclare (name , false , false , false ,
77- Collections .<String , Object >singletonMap (TTL_ARG , ttlValue ));
78- return declareOk ;
149+ Map <String , Object > argMap = Collections .singletonMap (TTL_ARG , ttlValue );
150+ return this .channel .queueDeclare (name , false , true , false , argMap );
79151 }
152+
153+
80154}
0 commit comments