6
6
import java .util .List ;
7
7
import java .util .Map ;
8
8
import java .util .Optional ;
9
- import java .util .concurrent .ConcurrentHashMap ;
10
- import java .util .concurrent .Executors ;
11
- import java .util .concurrent .Future ;
12
- import java .util .concurrent .ScheduledExecutorService ;
13
- import java .util .concurrent .TimeUnit ;
9
+ import java .util .concurrent .*;
14
10
import java .util .function .Consumer ;
15
11
16
12
import com .amazonaws .services .sqs .model .QueueNameExistsException ;
@@ -190,17 +186,17 @@ public CreateQueueResult createQueue(CreateQueueRequest request) {
190
186
throw new IllegalArgumentException ();
191
187
}
192
188
189
+ String retentionPeriodString = retentionPeriod .get ().toString ();
190
+ long currentTimestamp = System .currentTimeMillis ();
193
191
CreateQueueRequest superRequest = request .clone ()
194
192
.withQueueName (queueName )
195
- .withAttributes (attributes );
193
+ .withAttributes (attributes )
194
+ .addTagsEntry (IDLE_QUEUE_RETENTION_PERIOD_TAG , retentionPeriodString )
195
+ .addTagsEntry (LAST_HEARTBEAT_TIMESTAMP_TAG , String .valueOf (currentTimestamp ));
196
196
197
197
CreateQueueResult result = super .createQueue (superRequest );
198
198
String queueUrl = result .getQueueUrl ();
199
199
200
- String retentionPeriodString = retentionPeriod .get ().toString ();
201
- amazonSqsToBeExtended .tagQueue (queueUrl ,
202
- Collections .singletonMap (IDLE_QUEUE_RETENTION_PERIOD_TAG , retentionPeriodString ));
203
-
204
200
// TODO-RS: Filter more carefully to all attributes valid for createQueue
205
201
List <String > attributeNames = Arrays .asList (QueueAttributeName .ReceiveMessageWaitTimeSeconds .toString (),
206
202
QueueAttributeName .VisibilityTimeout .toString ());
@@ -210,8 +206,9 @@ public CreateQueueResult createQueue(CreateQueueRequest request) {
210
206
QueueMetadata metadata = new QueueMetadata (queueName , queueUrl , createdAttributes );
211
207
queues .put (queueUrl , metadata );
212
208
213
- metadata .heartbeater = executor .scheduleAtFixedRate (() -> heartbeatToQueue (queueUrl ),
214
- 0 , heartbeatIntervalSeconds , TimeUnit .SECONDS );
209
+ long initialDelay = ThreadLocalRandom .current ().nextLong (heartbeatIntervalSeconds );
210
+ metadata .heartbeater = executor .scheduleAtFixedRate (() -> heartbeatToQueue (queueUrl ),
211
+ initialDelay , heartbeatIntervalSeconds , TimeUnit .SECONDS );
215
212
216
213
return result ;
217
214
}
0 commit comments