1
1
package com .amazonaws .services .sqs ;
2
2
3
+ import java .util .Arrays ;
3
4
import java .util .HashMap ;
5
+ import java .util .HashSet ;
4
6
import java .util .Map ;
7
+ import java .util .Set ;
5
8
import java .util .UUID ;
6
9
import java .util .concurrent .ConcurrentHashMap ;
7
10
import java .util .concurrent .ConcurrentMap ;
8
11
import java .util .concurrent .TimeUnit ;
9
12
10
13
import com .amazonaws .services .sqs .model .CreateQueueRequest ;
11
14
import com .amazonaws .services .sqs .model .CreateQueueResult ;
15
+ import com .amazonaws .services .sqs .model .QueueAttributeName ;
12
16
import com .amazonaws .services .sqs .util .AbstractAmazonSQSClientWrapper ;
13
17
import com .amazonaws .services .sqs .util .SQSQueueUtils ;
14
18
@@ -30,7 +34,20 @@ class AmazonSQSTemporaryQueuesClient extends AbstractAmazonSQSClientWrapper {
30
34
31
35
// TODO-RS: Expose configuration
32
36
private final static String QUEUE_RETENTION_PERIOD_SECONDS = Long .toString (TimeUnit .MINUTES .toSeconds (5 ));
33
-
37
+
38
+ // We don't necessary support all queue attributes - some will behave differently on a virtual queue
39
+ // In particular, a virtual FIFO queue will deduplicate at the scope of its host queue!
40
+ private final static Set <String > SUPPORTED_QUEUE_ATTRIBUTES = new HashSet <>(Arrays .asList (
41
+ QueueAttributeName .DelaySeconds .name (),
42
+ QueueAttributeName .MaximumMessageSize .name (),
43
+ QueueAttributeName .MessageRetentionPeriod .name (),
44
+ QueueAttributeName .Policy .name (),
45
+ QueueAttributeName .ReceiveMessageWaitTimeSeconds .name (),
46
+ QueueAttributeName .RedrivePolicy .name (),
47
+ QueueAttributeName .VisibilityTimeout .name (),
48
+ QueueAttributeName .KmsMasterKeyId .name (),
49
+ QueueAttributeName .KmsDataKeyReusePeriodSeconds .name ()));
50
+
34
51
// These clients are owned by this one, and need to be shutdown when this client is.
35
52
private final AmazonSQSIdleQueueDeletingClient deleter ;
36
53
private final AmazonSQS virtualizer ;
@@ -85,6 +102,14 @@ AmazonSQSRequester getRequester() {
85
102
86
103
@ Override
87
104
public CreateQueueResult createQueue (CreateQueueRequest request ) {
105
+ // Check for unsupported queue attributes first
106
+ Set <String > unsupportedQueueAttributes = new HashSet <>(request .getAttributes ().keySet ());
107
+ unsupportedQueueAttributes .removeAll (SUPPORTED_QUEUE_ATTRIBUTES );
108
+ if (!unsupportedQueueAttributes .isEmpty ()) {
109
+ throw new IllegalArgumentException ("Cannot create a temporary queue with the following attributes: "
110
+ + String .join (", " , unsupportedQueueAttributes ));
111
+ }
112
+
88
113
Map <String , String > extraQueueAttributes = new HashMap <>();
89
114
// Add the retention period to both the host queue and each virtual queue
90
115
extraQueueAttributes .put (AmazonSQSIdleQueueDeletingClient .IDLE_QUEUE_RETENTION_PERIOD , QUEUE_RETENTION_PERIOD_SECONDS );
@@ -95,7 +120,11 @@ public CreateQueueResult createQueue(CreateQueueRequest request) {
95
120
});
96
121
97
122
extraQueueAttributes .put (AmazonSQSVirtualQueuesClient .VIRTUAL_QUEUE_HOST_QUEUE_ATTRIBUTE , hostQueueUrl );
98
- CreateQueueRequest createVirtualQueueRequest = SQSQueueUtils .copyWithExtraAttributes (request , extraQueueAttributes );
123
+ // The host queue takes care of all the other queue attributes, so don't specify them when creating the virtual
124
+ // queue or else the client may think we're trying to set them independently!
125
+ CreateQueueRequest createVirtualQueueRequest = new CreateQueueRequest ()
126
+ .withQueueName (request .getQueueName ())
127
+ .withAttributes (extraQueueAttributes );
99
128
return amazonSqsToBeExtended .createQueue (createVirtualQueueRequest );
100
129
}
101
130
0 commit comments