Skip to content

Commit bd91d97

Browse files
authored
Merge pull request #1355 from cshannon/AMQ-9625
AMQ-9625 - Prevent queue messages from becoming stuck
2 parents 3400983 + 7f218fe commit bd91d97

File tree

1 file changed

+48
-1
lines changed

1 file changed

+48
-1
lines changed

activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -255,12 +255,20 @@ public synchronized boolean tryAddMessageLast(MessageReference node, long wait)
255255
disableCache = true;
256256
}
257257

258-
if (disableCache && isCacheEnabled()) {
258+
// AMQ-9625 - use this.cacheEnabled directly because the method isCacheEnabled() is overriden
259+
// to try to re-enable the cache which we don't want at this point as we already skipped
260+
// adding it to the cache
261+
if (disableCache && this.cacheEnabled) {
259262
if (LOG.isTraceEnabled()) {
260263
LOG.trace("{} - disabling cache on add {} {}", this, node.getMessageId(), node.getMessageId().getFutureOrSequenceLong());
261264
}
262265
syncWithStore(node.getMessage());
263266
setCacheEnabled(false);
267+
} else if (!this.cacheEnabled) {
268+
// AMQ-9625 - Verify and wait on previous in flight async messages here if another
269+
// thread triggered the cache to be disabled
270+
// see the waitForAsyncMessage() method and Jira for more info
271+
waitForAsyncMessage(node.getMessage());
264272
}
265273
size++;
266274
return true;
@@ -319,6 +327,11 @@ private void syncWithStore(Message currentAdd) throws Exception {
319327
break;
320328
}
321329

330+
// AMQ-9625 - If we are disabling the cache and syncing the store then
331+
// we need to wait for task to finish before updating the store batch
332+
// see the waitForAsyncMessage() method and Jira for more info
333+
waitForAsyncMessage(currentAdd);
334+
322335
MessageId candidate = lastCachedIds[ASYNC_ADD];
323336
if (candidate != null) {
324337
// ensure we don't skip current possibly sync add b/c we waited on the future
@@ -530,4 +543,38 @@ public String toString() {
530543
public Subscription getSubscription() {
531544
return null;
532545
}
546+
547+
// AMQ-9625 - If the cache is disabled check if we need to wait for an async message
548+
// to finish its task because the message is not being added to the cache.
549+
// Normally, async messages will only be used if the cache is enabled so most of the time
550+
// this check should not find any async messages to wait on if the cache is disabled
551+
// and is basically a noop.
552+
//
553+
// However, while messages are being published, if the memory limit is reached the first
554+
// thread that is adding the message that reaches the limit will disable the cache.
555+
// This means there will be 1 or more potentially outstanding in flight adds that are
556+
// queued up as async writes to the store.
557+
//
558+
// If the cache is disabled, we need to wait for any async message tasks to be
559+
// finished otherwise there is a chance of missing the messages on dispatch
560+
// when the queue pages in the next batch because store writes will finish after
561+
// the store cursor has already moved ahead leading to a stuck message.
562+
private void waitForAsyncMessage(Message node) {
563+
// Note: isRecievedByDFBridge() was repurposed to be used to mark messages that
564+
// are added to the store as async
565+
if (node.getMessage().isRecievedByDFBridge()) {
566+
final Object futureOrLong = node.getMessageId().getFutureOrSequenceLong();
567+
if (futureOrLong instanceof Future) {
568+
try {
569+
((Future<?>) futureOrLong).get();
570+
} catch (Exception exceptionOk) {
571+
// We don't care if we get an exception (cancelled, etc) we just want
572+
// to ensure the task is finished and not pending.
573+
} finally {
574+
LOG.trace("{} - future finished inside waitForAsyncMessage {} {}", this,
575+
node.getMessageId(), futureOrLong);
576+
}
577+
}
578+
}
579+
}
533580
}

0 commit comments

Comments
 (0)