Skip to content

Commit 83b2600

Browse files
committed
Add diagnostic logging to MemoryJournal to track lock contention
Added comprehensive telemetry to investigate intermittent test failures in InMemoryEventsByTagSpec.ReadJournal_live_query_EventsByTag_should_find_events_from_offset_exclusive. Changes: - Added Stopwatch-based timing instrumentation to track lock acquisition wait times in WriteMessagesAsync and ReplayTaggedMessagesAsync - Added DEBUG logging for all journal message types (ReplayTaggedMessages, ReplayAllEvents, SelectCurrentPersistenceIds) - Logs thread IDs, lock wait times, lock hold times, and event counts - Enabled DEBUG log level in InMemoryEventsByTagSpec test configuration Diagnostic output includes: - "[DIAG] WriteMessagesAsync called on thread X, attempting to acquire lock" - "[DIAG] Lock acquired after Xms on thread Y" - "[DIAG] Lock released after holding for Xms" - "[DIAG] Wrote event for {persistenceId}, seq {seqNr}, total events: {count}" - "[DIAG] ReplayTaggedMessages lock acquired after Xms" - "[DIAG] Found X events matching tag 'Y'" This instrumentation will capture timing data on CI/CD build servers to determine if the test failures are caused by: 1. Lock contention/fairness issues with Monitor 2. Thread pool starvation 3. Async execution context delays 4. RecoveryPermitter bottlenecks All diagnostic logging uses [DIAG] prefix for easy filtering. These changes are temporary for diagnosis and will be reverted once root cause is identified.
1 parent efb2113 commit 83b2600

File tree

2 files changed

+39
-1
lines changed

2 files changed

+39
-1
lines changed

src/contrib/persistence/Akka.Persistence.Query.InMemory.Tests/InMemoryEventsByTagSpec.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ namespace Akka.Persistence.Query.InMemory.Tests
1414
public class InMemoryEventsByTagSpec : EventsByTagSpec
1515
{
1616
private static Config Config() => ConfigurationFactory.ParseString(@"
17-
akka.loglevel = INFO
17+
akka.loglevel = DEBUG
1818
akka.persistence.journal.inmem {
1919
event-adapters {
2020
color-tagger = ""Akka.Persistence.TCK.Query.ColorFruitTagger, Akka.Persistence.TCK""

src/core/Akka.Persistence/Journal/MemoryJournal.cs

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
using System;
99
using System.Collections.Generic;
1010
using System.Collections.Immutable;
11+
using System.Diagnostics;
1112
using System.Linq;
1213
using System.Threading;
1314
using System.Threading.Tasks;
@@ -43,8 +44,16 @@ public class MemoryJournal : AsyncWriteJournal
4344

4445
protected override Task<IImmutableList<Exception>> WriteMessagesAsync(IEnumerable<AtomicWrite> messages, CancellationToken cancellationToken)
4546
{
47+
var waitStartTime = Stopwatch.GetTimestamp();
48+
var threadId = Environment.CurrentManagedThreadId;
49+
Context.GetLogger().Debug("[DIAG] WriteMessagesAsync called on thread {0}, attempting to acquire lock", threadId);
50+
4651
lock (Lock)
4752
{
53+
var lockAcquiredTime = Stopwatch.GetTimestamp();
54+
var waitTimeMs = (lockAcquiredTime - waitStartTime) * 1000.0 / Stopwatch.Frequency;
55+
Context.GetLogger().Debug("[DIAG] Lock acquired after {0:F2}ms on thread {1}", waitTimeMs, threadId);
56+
4857
foreach (var w in messages)
4958
{
5059
foreach (var p in (IEnumerable<IPersistentRepresentation>)w.Payload)
@@ -60,8 +69,17 @@ protected override Task<IImmutableList<Exception>> WriteMessagesAsync(IEnumerabl
6069
EventsByPersistenceId[persistentRepresentation.PersistenceId] = pidEvents;
6170
}
6271
pidEvents.Add(persistentRepresentation);
72+
73+
Context.GetLogger().Debug("[DIAG] Wrote event for {0}, seq {1}, total events in log: {2}",
74+
persistentRepresentation.PersistenceId,
75+
persistentRepresentation.SequenceNr,
76+
EventLog.Count);
6377
}
6478
}
79+
80+
var lockReleasedTime = Stopwatch.GetTimestamp();
81+
var holdTimeMs = (lockReleasedTime - lockAcquiredTime) * 1000.0 / Stopwatch.Frequency;
82+
Context.GetLogger().Debug("[DIAG] Lock released after holding for {0:F2}ms on thread {1}", holdTimeMs, threadId);
6583
}
6684

6785
return Task.FromResult<IImmutableList<Exception>>(null);
@@ -219,16 +237,21 @@ protected override bool ReceivePluginInternal(object message)
219237
switch (message)
220238
{
221239
case SelectCurrentPersistenceIds request:
240+
Context.GetLogger().Debug("[DIAG] Received SelectCurrentPersistenceIds from {0}", request.ReplyTo);
222241
SelectAllPersistenceIdsAsync(request.Offset)
223242
.PipeTo(request.ReplyTo, success: result => new CurrentPersistenceIds(result.Item1, result.LastOrdering));
224243
return true;
225244

226245
case ReplayTaggedMessages replay:
246+
Context.GetLogger().Debug("[DIAG] Received ReplayTaggedMessages for tag '{0}', fromOffset={1}, max={2}, toOffset={3}, replyTo={4}",
247+
replay.Tag, replay.FromOffset, replay.Max, replay.ToOffset, replay.ReplyTo);
227248
ReplayTaggedMessagesAsync(replay)
228249
.PipeTo(replay.ReplyTo, success: h => new ReplayTaggedMessagesSuccess(h), failure: e => new ReplayMessagesFailure(e));
229250
return true;
230251

231252
case ReplayAllEvents replay:
253+
Context.GetLogger().Debug("[DIAG] Received ReplayAllEvents fromOffset={0}, max={1}, replyTo={2}",
254+
replay.FromOffset, replay.Max, replay.ReplyTo);
232255
ReplayAllEventsAsync(replay)
233256
.PipeTo(replay.ReplyTo, success: h => new EventReplaySuccess(h),
234257
failure: e => new EventReplayFailure(e));
@@ -258,11 +281,20 @@ protected override bool ReceivePluginInternal(object message)
258281
/// </summary>
259282
private Task<int> ReplayTaggedMessagesAsync(ReplayTaggedMessages replay)
260283
{
284+
var waitStartTime = Stopwatch.GetTimestamp();
285+
var threadId = Environment.CurrentManagedThreadId;
286+
var log = Context.GetLogger();
287+
log.Debug("[DIAG] ReplayTaggedMessagesAsync starting for tag '{0}' on thread {1}, attempting to acquire lock", replay.Tag, threadId);
288+
261289
IPersistentRepresentation[] snapshot;
262290
int count;
263291

264292
lock (Lock)
265293
{
294+
var lockAcquiredTime = Stopwatch.GetTimestamp();
295+
var waitTimeMs = (lockAcquiredTime - waitStartTime) * 1000.0 / Stopwatch.Frequency;
296+
log.Debug("[DIAG] ReplayTaggedMessages lock acquired after {0:F2}ms on thread {1}", waitTimeMs, threadId);
297+
266298
// Scan for events with matching tag
267299
snapshot = EventLog
268300
.Where(e => e.Payload is Tagged tagged && tagged.Tags.Contains(replay.Tag))
@@ -271,16 +303,22 @@ private Task<int> ReplayTaggedMessagesAsync(ReplayTaggedMessages replay)
271303
.ToArray();
272304

273305
count = EventLog.Count(e => e.Payload is Tagged tagged && tagged.Tags.Contains(replay.Tag));
306+
307+
log.Debug("[DIAG] Found {0} events matching tag '{1}', total tagged events: {2}, EventLog size: {3}",
308+
snapshot.Length, replay.Tag, count, EventLog.Count);
274309
}
275310

276311
// Send messages outside the lock to avoid potential deadlocks
277312
var index = 0;
278313
foreach (var persistence in snapshot)
279314
{
280315
replay.ReplyTo.Tell(new ReplayedTaggedMessage(persistence, replay.Tag, replay.FromOffset + index), ActorRefs.NoSender);
316+
log.Debug("[DIAG] Sent ReplayedTaggedMessage for {0}, seq {1}, offset {2}",
317+
persistence.PersistenceId, persistence.SequenceNr, replay.FromOffset + index);
281318
index++;
282319
}
283320

321+
log.Debug("[DIAG] ReplayTaggedMessagesAsync completed, returning highestSequenceNr={0}", count - 1);
284322
return Task.FromResult(count - 1);
285323
}
286324

0 commit comments

Comments
 (0)