2
2
3
3
import static org .junit .Assert .assertEquals ;
4
4
import static org .junit .Assert .fail ;
5
+ import static org .mockito .ArgumentMatchers .any ;
6
+ import static org .mockito .Mockito .mock ;
7
+ import static org .mockito .Mockito .verify ;
5
8
6
- import java .util .Optional ;
7
9
import java .util .concurrent .TimeUnit ;
10
+ import java .util .function .BiConsumer ;
8
11
12
+ import com .amazonaws .services .sqs .model .CreateQueueRequest ;
13
+ import com .amazonaws .services .sqs .model .Message ;
14
+ import com .amazonaws .services .sqs .model .QueueDoesNotExistException ;
15
+ import com .amazonaws .services .sqs .model .ReceiveMessageRequest ;
16
+ import com .amazonaws .services .sqs .model .SendMessageRequest ;
9
17
import org .junit .After ;
10
18
import org .junit .Before ;
11
19
import org .junit .Test ;
12
20
13
- import com .amazonaws .services .sqs .model .CreateQueueRequest ;
14
- import com .amazonaws .services .sqs .model .QueueDoesNotExistException ;
15
- import com .amazonaws .services .sqs .model .ReceiveMessageRequest ;
16
21
import com .amazonaws .services .sqs .util .IntegrationTest ;
17
22
18
23
public class AmazonSQSVirtualQueuesClientIT extends IntegrationTest {
19
24
20
25
private static String hostQueueUrl ;
21
26
private static AmazonSQS client ;
22
27
28
+ BiConsumer <String , Message > orphanedMessageHandlerMock ;
29
+
23
30
@ Before
24
31
public void setup () {
25
- client = AmazonSQSVirtualQueuesClientBuilder .standard ().withAmazonSQS (sqs ).build ();
32
+ orphanedMessageHandlerMock = mock (BiConsumer .class );
33
+ client = AmazonSQSVirtualQueuesClientBuilder .standard ().withAmazonSQS (sqs ).withOrphanedMessageHandler (orphanedMessageHandlerMock ).build ();
26
34
hostQueueUrl = client .createQueue (queueNamePrefix + "-HostQueue" ).getQueueUrl ();
27
35
}
28
36
@@ -43,7 +51,7 @@ public void expiringVirtualQueue() throws InterruptedException {
43
51
.addAttributesEntry (AmazonSQSVirtualQueuesClient .VIRTUAL_QUEUE_HOST_QUEUE_ATTRIBUTE , hostQueueUrl )
44
52
.addAttributesEntry (AmazonSQSIdleQueueDeletingClient .IDLE_QUEUE_RETENTION_PERIOD , "10" );
45
53
String virtualQueueUrl = client .createQueue (request ).getQueueUrl ();
46
-
54
+
47
55
// Do a few long poll receives and validate the queue stays alive.
48
56
// We expect empty receives but not errors.
49
57
ReceiveMessageRequest receiveRequest = new ReceiveMessageRequest ()
@@ -108,4 +116,26 @@ public void virtualQueueShouldNotExpireDuringLongReceive() throws InterruptedExc
108
116
// Delete the queue so we don't get a spurious message about it expiring during the test shutdown
109
117
client .deleteQueue (virtualQueueUrl );
110
118
}
119
+
120
+ @ Test
121
+ public void missingMessageAttributeIsReceivedAndDeleted () throws InterruptedException {
122
+ CreateQueueRequest request = new CreateQueueRequest ()
123
+ .withQueueName ("ShortLived" )
124
+ .addAttributesEntry (AmazonSQSVirtualQueuesClient .VIRTUAL_QUEUE_HOST_QUEUE_ATTRIBUTE , hostQueueUrl )
125
+ .addAttributesEntry (AmazonSQSIdleQueueDeletingClient .IDLE_QUEUE_RETENTION_PERIOD , "10" );
126
+ String virtualQueueUrl = client .createQueue (request ).getQueueUrl ();
127
+
128
+ ReceiveMessageRequest receiveRequest = new ReceiveMessageRequest ()
129
+ .withQueueUrl (virtualQueueUrl )
130
+ .withWaitTimeSeconds (20 );
131
+ SendMessageRequest sendMessageRequest = new SendMessageRequest ()
132
+ .withQueueUrl (hostQueueUrl )
133
+ .withMessageBody ("Missing Message attributes!" )
134
+ .withDelaySeconds (5 );
135
+
136
+ client .sendMessage (sendMessageRequest );
137
+ // Message sent with missing attribute is deleted
138
+ assertEquals (0 , client .receiveMessage (receiveRequest ).getMessages ().size ());
139
+ verify (orphanedMessageHandlerMock ).accept (any (), any ());
140
+ }
111
141
}
0 commit comments