Skip to content

Commit f603d7b

Browse files
committed
Add diagnostic logging to MemoryJournal to track lock contention
Added comprehensive telemetry to investigate intermittent test failures in InMemoryEventsByTagSpec.ReadJournal_live_query_EventsByTag_should_find_events_from_offset_exclusive. Changes: - Added Stopwatch-based timing instrumentation to track lock acquisition wait times in WriteMessagesAsync and ReplayTaggedMessagesAsync - Added DEBUG logging for all journal message types (ReplayTaggedMessages, ReplayAllEvents, SelectCurrentPersistenceIds) - Logs thread IDs, lock wait times, lock hold times, and event counts - Enabled DEBUG log level in InMemoryEventsByTagSpec test configuration Diagnostic output includes: - "[DIAG] WriteMessagesAsync called on thread X, attempting to acquire lock" - "[DIAG] Lock acquired after Xms on thread Y" - "[DIAG] Lock released after holding for Xms" - "[DIAG] Wrote event for {persistenceId}, seq {seqNr}, total events: {count}" - "[DIAG] ReplayTaggedMessages lock acquired after Xms" - "[DIAG] Found X events matching tag 'Y'" This instrumentation will capture timing data on CI/CD build servers to determine if the test failures are caused by: 1. Lock contention/fairness issues with Monitor 2. Thread pool starvation 3. Async execution context delays 4. RecoveryPermitter bottlenecks All diagnostic logging uses [DIAG] prefix for easy filtering. These changes are temporary for diagnosis and will be reverted once root cause is identified.
1 parent efb2113 commit f603d7b

File tree

2 files changed

+41
-1
lines changed

2 files changed

+41
-1
lines changed

src/contrib/persistence/Akka.Persistence.Query.InMemory.Tests/InMemoryEventsByTagSpec.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ namespace Akka.Persistence.Query.InMemory.Tests
1414
public class InMemoryEventsByTagSpec : EventsByTagSpec
1515
{
1616
private static Config Config() => ConfigurationFactory.ParseString(@"
17-
akka.loglevel = INFO
17+
akka.loglevel = DEBUG
1818
akka.persistence.journal.inmem {
1919
event-adapters {
2020
color-tagger = ""Akka.Persistence.TCK.Query.ColorFruitTagger, Akka.Persistence.TCK""

src/core/Akka.Persistence/Journal/MemoryJournal.cs

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
using System;
99
using System.Collections.Generic;
1010
using System.Collections.Immutable;
11+
using System.Diagnostics;
1112
using System.Linq;
1213
using System.Threading;
1314
using System.Threading.Tasks;
@@ -35,6 +36,9 @@ public class MemoryJournal : AsyncWriteJournal
3536

3637
private readonly object _lock = new();
3738
private readonly Dictionary<string, long> _deletedTo = new();
39+
private ILoggingAdapter _log;
40+
41+
protected ILoggingAdapter Log => _log ??= Context.GetLogger();
3842

3943
protected virtual List<IPersistentRepresentation> EventLog => _eventLog;
4044
protected virtual Dictionary<string, List<IPersistentRepresentation>> EventsByPersistenceId => _eventsByPersistenceId;
@@ -43,8 +47,16 @@ public class MemoryJournal : AsyncWriteJournal
4347

4448
protected override Task<IImmutableList<Exception>> WriteMessagesAsync(IEnumerable<AtomicWrite> messages, CancellationToken cancellationToken)
4549
{
50+
var waitStartTime = Stopwatch.GetTimestamp();
51+
var threadId = Environment.CurrentManagedThreadId;
52+
Log.Debug("[DIAG] WriteMessagesAsync called on thread {0}, attempting to acquire lock", threadId);
53+
4654
lock (Lock)
4755
{
56+
var lockAcquiredTime = Stopwatch.GetTimestamp();
57+
var waitTimeMs = (lockAcquiredTime - waitStartTime) * 1000.0 / Stopwatch.Frequency;
58+
Log.Debug("[DIAG] Lock acquired after {0:F2}ms on thread {1}", waitTimeMs, threadId);
59+
4860
foreach (var w in messages)
4961
{
5062
foreach (var p in (IEnumerable<IPersistentRepresentation>)w.Payload)
@@ -60,8 +72,17 @@ protected override Task<IImmutableList<Exception>> WriteMessagesAsync(IEnumerabl
6072
EventsByPersistenceId[persistentRepresentation.PersistenceId] = pidEvents;
6173
}
6274
pidEvents.Add(persistentRepresentation);
75+
76+
Log.Debug("[DIAG] Wrote event for {0}, seq {1}, total events in log: {2}",
77+
persistentRepresentation.PersistenceId,
78+
persistentRepresentation.SequenceNr,
79+
EventLog.Count);
6380
}
6481
}
82+
83+
var lockReleasedTime = Stopwatch.GetTimestamp();
84+
var holdTimeMs = (lockReleasedTime - lockAcquiredTime) * 1000.0 / Stopwatch.Frequency;
85+
Log.Debug("[DIAG] Lock released after holding for {0:F2}ms on thread {1}", holdTimeMs, threadId);
6586
}
6687

6788
return Task.FromResult<IImmutableList<Exception>>(null);
@@ -219,16 +240,21 @@ protected override bool ReceivePluginInternal(object message)
219240
switch (message)
220241
{
221242
case SelectCurrentPersistenceIds request:
243+
Log.Debug("[DIAG] Received SelectCurrentPersistenceIds from {0}", request.ReplyTo);
222244
SelectAllPersistenceIdsAsync(request.Offset)
223245
.PipeTo(request.ReplyTo, success: result => new CurrentPersistenceIds(result.Item1, result.LastOrdering));
224246
return true;
225247

226248
case ReplayTaggedMessages replay:
249+
Log.Debug("[DIAG] Received ReplayTaggedMessages for tag '{0}', fromOffset={1}, max={2}, toOffset={3}, replyTo={4}",
250+
replay.Tag, replay.FromOffset, replay.Max, replay.ToOffset, replay.ReplyTo);
227251
ReplayTaggedMessagesAsync(replay)
228252
.PipeTo(replay.ReplyTo, success: h => new ReplayTaggedMessagesSuccess(h), failure: e => new ReplayMessagesFailure(e));
229253
return true;
230254

231255
case ReplayAllEvents replay:
256+
Log.Debug("[DIAG] Received ReplayAllEvents fromOffset={0}, max={1}, replyTo={2}",
257+
replay.FromOffset, replay.Max, replay.ReplyTo);
232258
ReplayAllEventsAsync(replay)
233259
.PipeTo(replay.ReplyTo, success: h => new EventReplaySuccess(h),
234260
failure: e => new EventReplayFailure(e));
@@ -258,11 +284,19 @@ protected override bool ReceivePluginInternal(object message)
258284
/// </summary>
259285
private Task<int> ReplayTaggedMessagesAsync(ReplayTaggedMessages replay)
260286
{
287+
var waitStartTime = Stopwatch.GetTimestamp();
288+
var threadId = Environment.CurrentManagedThreadId;
289+
Log.Debug("[DIAG] ReplayTaggedMessagesAsync starting for tag '{0}' on thread {1}, attempting to acquire lock", replay.Tag, threadId);
290+
261291
IPersistentRepresentation[] snapshot;
262292
int count;
263293

264294
lock (Lock)
265295
{
296+
var lockAcquiredTime = Stopwatch.GetTimestamp();
297+
var waitTimeMs = (lockAcquiredTime - waitStartTime) * 1000.0 / Stopwatch.Frequency;
298+
Log.Debug("[DIAG] ReplayTaggedMessages lock acquired after {0:F2}ms on thread {1}", waitTimeMs, threadId);
299+
266300
// Scan for events with matching tag
267301
snapshot = EventLog
268302
.Where(e => e.Payload is Tagged tagged && tagged.Tags.Contains(replay.Tag))
@@ -271,16 +305,22 @@ private Task<int> ReplayTaggedMessagesAsync(ReplayTaggedMessages replay)
271305
.ToArray();
272306

273307
count = EventLog.Count(e => e.Payload is Tagged tagged && tagged.Tags.Contains(replay.Tag));
308+
309+
Log.Debug("[DIAG] Found {0} events matching tag '{1}', total tagged events: {2}, EventLog size: {3}",
310+
snapshot.Length, replay.Tag, count, EventLog.Count);
274311
}
275312

276313
// Send messages outside the lock to avoid potential deadlocks
277314
var index = 0;
278315
foreach (var persistence in snapshot)
279316
{
280317
replay.ReplyTo.Tell(new ReplayedTaggedMessage(persistence, replay.Tag, replay.FromOffset + index), ActorRefs.NoSender);
318+
Log.Debug("[DIAG] Sent ReplayedTaggedMessage for {0}, seq {1}, offset {2}",
319+
persistence.PersistenceId, persistence.SequenceNr, replay.FromOffset + index);
281320
index++;
282321
}
283322

323+
Log.Debug("[DIAG] ReplayTaggedMessagesAsync completed, returning highestSequenceNr={0}", count - 1);
284324
return Task.FromResult(count - 1);
285325
}
286326

0 commit comments

Comments
 (0)