Skip to content

Commit e7529f0

Browse files
authored
Merge pull request #29 from WB-Tooling/generic-payload
generic payload
2 parents bd099b6 + 9e6d863 commit e7529f0

13 files changed

Lines changed: 173 additions & 64 deletions

Directory.Packages.props

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
│ WB Packages │
1111
╰──────────────────────────────────────────────────────────────────────╯ -->
1212
<ItemGroup>
13-
<PackageVersion Include="WB.Logging.Base" Version="[1.0.0-preview.15, 2.0.0)" />
13+
<PackageVersion Include="WB.Logging.Base" Version="[1.0.0-preview.16, 2.0.0)" />
1414
</ItemGroup>
1515

1616
<!-- ╭──────────────────────────────────────────────────────────────────────╮

src/Extensions/IAsyncLogSinkExtensions.cs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System;
2+
using System.Threading;
23
using System.Threading.Tasks;
34

45
namespace WB.Logging;
@@ -16,15 +17,18 @@ internal static class IAsyncLogSinkExtensions
1617
/// Submits the <paramref name="logMessage"/> to <paramref name="this"/> <see cref="IAsyncLogSink"/> safely,
1718
/// catching any exceptions that occur and logging them to the console or a fallback logger instead of throwing.
1819
/// </summary>
20+
/// <typeparam name="TPayload">The <see cref="Type"/> of the payload of the log message.</typeparam>
1921
/// <param name="this">The <see cref="IAsyncLogSink"/> to submit the log message to.</param>
2022
/// <param name="logMessage">The log message to submit.</param>
23+
/// <param name="cancellationToken">A <see cref="CancellationToken"/> to observe while waiting for the task to complete.</param>
2124
/// <returns>A <see cref="Task"/> that represents the asynchronous operation.</returns>
2225
[System.Diagnostics.CodeAnalysis.SuppressMessage("Design", "CA1031:Do not catch general exception types", Justification = "We want to catch all exceptions to prevent logging failures from crashing the application.")]
23-
internal static async Task SubmitSafeAsync(this IAsyncLogSink @this, LogMessage logMessage)
26+
internal static async Task SubmitSafeAsync<TPayload>(this IAsyncLogSink @this, ILogMessage<TPayload> logMessage, CancellationToken cancellationToken = default)
27+
where TPayload : notnull
2428
{
2529
try
2630
{
27-
await @this.SubmitAsync(logMessage).ConfigureAwait(false);
31+
await @this.SubmitAsync(logMessage, cancellationToken).ConfigureAwait(false);
2832
}
2933
catch (Exception ex)
3034
{

src/Extensions/ILogSinkExtensions.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,13 @@ internal static class ILogSinkExtensions
1616
/// Submits the <paramref name="logMessage"/> to <paramref name="this"/> <see cref="ILogSink"/> safely,
1717
/// catching any exceptions that occur and logging them to the console or a fallback logger instead of throwing.
1818
/// </summary>
19+
/// <typeparam name="TPayload">The <see cref="Type"/> of the payload of the log message.</typeparam>
1920
/// <param name="this">The <see cref="ILogSink"/> to submit the log message to.</param>
2021
/// <param name="logMessage">The log message to submit.</param>
2122
/// <returns>A <see cref="Task"/> that represents the asynchronous operation.</returns>
2223
[System.Diagnostics.CodeAnalysis.SuppressMessage("Design", "CA1031:Do not catch general exception types", Justification = "We want to catch all exceptions to prevent logging failures from crashing the application.")]
23-
internal static Task SubmitSafeAsync(this ILogSink @this, LogMessage logMessage)
24+
internal static Task WriteSafeAsync<TPayload>(this ILogSink @this, ILogMessage<TPayload> logMessage)
25+
where TPayload : notnull
2426
{
2527
return Task.Run(() =>
2628
{

src/LogMessage.cs

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
using System;
2+
using System.Collections.Generic;
3+
4+
namespace WB.Logging;
5+
6+
internal sealed record LogMessage<TPayload> : ILogMessage<TPayload>
7+
where TPayload : notnull
8+
{
9+
// ┌─────────────────────────────────────────────────────────────────────────────┐
10+
// │ Private Fields │
11+
// └─────────────────────────────────────────────────────────────────────────────┘
12+
private readonly List<string> senders = [];
13+
14+
// ┌─────────────────────────────────────────────────────────────────────────────┐
15+
// │ Public Properties │
16+
// └─────────────────────────────────────────────────────────────────────────────┘
17+
18+
/// <inheritdoc/>
19+
public required TPayload Payload { get; init; }
20+
21+
/// <inheritdoc/>
22+
public required DateTimeOffset Timestamp { get; init; }
23+
24+
/// <inheritdoc/>
25+
public IReadOnlyList<string> Senders => senders;
26+
27+
/// <inheritdoc/>
28+
public LogLevel? LogLevel { get; init; }
29+
30+
/// <inheritdoc/>
31+
object ILogMessage.Payload => Payload;
32+
33+
// ┌─────────────────────────────────────────────────────────────────────────────┐
34+
// │ Public Methods │
35+
// └─────────────────────────────────────────────────────────────────────────────┘
36+
37+
/// <summary>
38+
/// Adds the <paramref name="sender"/> to the list of senders of this log message.
39+
/// </summary>
40+
/// <param name="sender">The sender to add.</param>
41+
internal void AddSender(string sender)
42+
=> senders.Add(sender);
43+
}

src/Logger.cs

Lines changed: 87 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,16 @@
22
using System.Collections.Concurrent;
33
using System.Collections.Generic;
44
using System.Linq;
5+
using System.Linq.Expressions;
6+
using System.Reflection;
57
using System.Threading;
8+
using System.Threading.Channels;
69
using System.Threading.Tasks;
7-
using System.Threading.Tasks.Dataflow;
810

911
namespace WB.Logging;
1012

13+
internal delegate Task Dispatcher(ILogMessage logMessage, CancellationToken cancellationToken);
14+
1115
/// <inheritdoc/>
1216
/// <summary>
1317
/// Initializes a new instance of the <see cref="Logger"/> class.
@@ -19,27 +23,28 @@ public sealed class Logger : ILogger
1923
// └─────────────────────────────────────────────────────────────────────────────┘
2024
private readonly Logger? parent;
2125

22-
private readonly ActionBlock<Func<Task>> logMessageQueue = new(
23-
async logMessageAction =>
24-
{
25-
await logMessageAction().ConfigureAwait(false);
26-
},
27-
new ExecutionDataflowBlockOptions
28-
{
29-
MaxDegreeOfParallelism = 1,
30-
EnsureOrdered = true
31-
});
26+
private readonly Channel<ILogMessage> logMessageQueue = Channel.CreateUnbounded<ILogMessage>(new UnboundedChannelOptions
27+
{
28+
SingleReader = true,
29+
SingleWriter = true,
30+
});
31+
32+
private readonly Task logMessageProcessingTask;
33+
34+
private readonly CancellationTokenSource cancellationTokenSource = new();
3235

3336
private readonly ConcurrentBag<ILogger> childLoggers = [];
3437

3538
private readonly ConcurrentBag<ILogSink> logSinks = [];
3639

3740
private readonly ConcurrentBag<IAsyncLogSink> asyncLogSinks = [];
3841

39-
private readonly LogMessageFilters logMessageFilterRegistry = new();
42+
private readonly LogMessageFilterPipeline logMessageFilterPipeline = new();
4043

4144
private readonly IDisposable minimumLogLevelFilter;
4245

46+
private readonly ConcurrentDictionary<Type, Dispatcher> dispatchCache = new();
47+
4348
private int isDisposed;
4449

4550
// ┌─────────────────────────────────────────────────────────────────────────────┐
@@ -56,6 +61,7 @@ public Logger(string name)
5661
Name = name ?? throw new ArgumentNullException(nameof(name));
5762

5863
minimumLogLevelFilter = AddLogMessageFilter(logMessage => logMessage.LogLevel is null || logMessage.LogLevel >= MinimumLogLevel);
64+
logMessageProcessingTask = ExecuteAsync(cancellationTokenSource.Token);
5965
}
6066

6167
// ┌─────────────────────────────────────────────────────────────────────────────┐
@@ -125,8 +131,9 @@ public async ValueTask DisposeAsync()
125131
return;
126132
}
127133

128-
logMessageQueue.Complete();
129-
await logMessageQueue.Completion.ConfigureAwait(false);
134+
await cancellationTokenSource.CancelAsync().ConfigureAwait(false);
135+
cancellationTokenSource.Dispose();
136+
await logMessageProcessingTask.ConfigureAwait(false);
130137

131138
foreach (ILogger childLogger in childLoggers)
132139
{
@@ -181,14 +188,16 @@ public async Task FlushAsync(CancellationToken cancellationToken = default)
181188
public void Log<TPayload>(LogLevel? logLevel, TPayload payload)
182189
where TPayload : notnull
183190
{
184-
LogMessage logMessage = new()
191+
LogMessage<TPayload> logMessage = new()
185192
{
186193
Timestamp = TimestampProvider.CurrentTimestamp,
187194
LogLevel = logLevel,
188-
Payload = payload
195+
Payload = payload,
189196
};
190197

191198
Log(logMessage);
199+
200+
parent?.Log(logMessage);
192201
}
193202

194203
/// <inheritdoc/>
@@ -219,43 +228,81 @@ public ILogger CreateChildLogger(string name)
219228

220229
/// <inheritdoc/>
221230
public IDisposable AddLogMessageFilter(LogMessageFilter filter)
222-
=> logMessageFilterRegistry.Add(filter);
231+
=> logMessageFilterPipeline.Add(filter);
223232

224233
// ┌─────────────────────────────────────────────────────────────────────────────┐
225234
// │ Private Methods │
226235
// └─────────────────────────────────────────────────────────────────────────────┘
227-
private void Log(LogMessage logMessage)
236+
237+
private void Log<TPayload>(LogMessage<TPayload> logMessage)
238+
where TPayload : notnull
228239
{
229240
logMessage.AddSender(Name);
230241

231-
// Apply filters to the log message
232-
if (logMessage.Payload is not FlushItem)
242+
logMessageQueue.Writer.TryWrite(logMessage);
243+
}
244+
245+
private async Task ExecuteAsync(CancellationToken cancellationToken)
246+
{
247+
try
233248
{
234-
if (!logMessageFilterRegistry.IsMatch(logMessage))
249+
await foreach (ILogMessage logMessage in logMessageQueue.Reader.ReadAllAsync(cancellationToken).ConfigureAwait(false))
235250
{
236-
return;
251+
if (logMessage.Payload is FlushItem flushItem)
252+
{
253+
flushItem.Complete();
254+
}
255+
else if (logMessageFilterPipeline.IsMatch(logMessage))
256+
{
257+
await DispatchAsync(logMessage, cancellationToken).ConfigureAwait(false);
258+
}
237259
}
238-
239-
parent?.Log(logMessage);
240260
}
241-
242-
logMessageQueue.Post(() =>
261+
catch (OperationCanceledException)
243262
{
244-
if (logMessage.Payload is FlushItem flushItem)
245-
{
246-
flushItem.Complete();
263+
// normal shutdown
264+
}
265+
}
247266

248-
return Task.CompletedTask;
249-
}
250-
else
251-
{
252-
Task[] submitTasks = [
253-
.. logSinks.Select(logSink => logSink.SubmitSafeAsync(logMessage)),
254-
.. asyncLogSinks.Select(asyncLogSink => asyncLogSink.SubmitSafeAsync(logMessage))
255-
];
267+
private Task DispatchAsync(ILogMessage logMessage, CancellationToken cancellationToken)
268+
{
269+
Type payloadType = logMessage.Payload.GetType();
256270

257-
return Task.WhenAll(submitTasks);
258-
}
259-
});
271+
Dispatcher dispatcher = dispatchCache.GetOrAdd(
272+
payloadType,
273+
static (t, self) => self.CreateDispatcher(t),
274+
this);
275+
276+
return dispatcher(logMessage, cancellationToken);
277+
}
278+
279+
private Dispatcher CreateDispatcher(Type payloadType)
280+
{
281+
ParameterExpression logMessageParameter = Expression.Parameter(typeof(ILogMessage), "msg");
282+
ParameterExpression cancellationTokenParameter = Expression.Parameter(typeof(CancellationToken), "ct");
283+
284+
UnaryExpression cast = Expression.Convert(logMessageParameter, typeof(ILogMessage<>).MakeGenericType(payloadType));
285+
286+
MethodCallExpression methodCall = Expression.Call(
287+
Expression.Constant(this),
288+
typeof(Logger)
289+
.GetMethod(nameof(DispatchTyped), BindingFlags.NonPublic | BindingFlags.Instance)!
290+
.MakeGenericMethod(payloadType),
291+
cast,
292+
cancellationTokenParameter);
293+
294+
Expression<Dispatcher> lambda = Expression.Lambda<Dispatcher>(methodCall, logMessageParameter, cancellationTokenParameter);
295+
return lambda.Compile();
296+
}
297+
298+
private Task DispatchTyped<TPayload>(ILogMessage logMessage, CancellationToken cancellationToken)
299+
where TPayload : notnull
300+
{
301+
ILogMessage<TPayload> typedLogMessage = (ILogMessage<TPayload>)logMessage;
302+
303+
Task[] tasks = [.. logSinks.Select(logSink => logSink.WriteSafeAsync(typedLogMessage)),
304+
.. asyncLogSinks.Select(asyncLogSink => asyncLogSink.SubmitSafeAsync(typedLogMessage, cancellationToken))];
305+
306+
return Task.WhenAll(tasks);
260307
}
261308
}

tests/LoggerTests/src/MethodTests/AttachLogMessageFilter.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,10 @@ namespace LoggerTests.MethodTests.AttachLogMessageFilterMethodTests;
88

99
internal sealed class TestLogSink : ILogSink
1010
{
11-
public readonly List<LogMessage> ReceivedMessages = [];
11+
public readonly List<ILogMessage> ReceivedMessages = [];
1212

13-
public void Submit(LogMessage logMessage)
13+
public void Submit<TPayload>(ILogMessage<TPayload> logMessage)
14+
where TPayload : notnull
1415
=> ReceivedMessages.Add(logMessage);
1516
}
1617

@@ -98,6 +99,7 @@ public async Task ShouldDetachFilterWhenDisposableIsDisposed()
9899

99100
// Act
100101
logger.Log(LogLevel.Info, "INCLUDE message 1");
102+
await logger.FlushAsync().ConfigureAwait(false);
101103
filterDisposable.Dispose();
102104
logger.Log(LogLevel.Info, "EXCLUDE message 2");
103105
await logger.FlushAsync().ConfigureAwait(false);

tests/LoggerTests/src/MethodTests/AttachLogSink.cs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using System;
22
using System.Collections.Generic;
3+
using System.Threading;
34
using System.Threading.Tasks;
45
using AwesomeAssertions;
56
using WB.Logging;
@@ -8,17 +9,19 @@ namespace LoggerTests.MethodTests.AttachLogSinkMethodTests;
89

910
internal sealed class TestLogSink : ILogSink
1011
{
11-
public readonly List<LogMessage> ReceivedMessages = [];
12+
public readonly List<ILogMessage> ReceivedMessages = [];
1213

13-
public void Submit(LogMessage logMessage)
14+
public void Submit<TPayload>(ILogMessage<TPayload> logMessage)
15+
where TPayload : notnull
1416
=> ReceivedMessages.Add(logMessage);
1517
}
1618

1719
internal sealed class TestAsyncLogSink : IAsyncLogSink
1820
{
19-
public readonly List<LogMessage> ReceivedMessages = [];
21+
public readonly List<ILogMessage> ReceivedMessages = [];
2022

21-
public ValueTask SubmitAsync(LogMessage logMessage)
23+
public ValueTask SubmitAsync<TPayload>(ILogMessage<TPayload> logMessage, CancellationToken cancellationToken)
24+
where TPayload : notnull
2225
{
2326
ReceivedMessages.Add(logMessage);
2427

tests/LoggerTests/src/MethodTests/DisposeAsyncMethodTests.cs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System;
2+
using System.Threading;
23
using System.Threading.Tasks;
34
using AwesomeAssertions;
45
using WB.Logging;
@@ -12,11 +13,12 @@ internal sealed class TestLogSink : ILogSink, IDisposable
1213
public void Dispose()
1314
=> IsDisposed = true;
1415

15-
public void Submit(LogMessage logMessage)
16+
public void Submit<TPayload>(ILogMessage<TPayload> logMessage)
17+
where TPayload : notnull
1618
=> throw new NotImplementedException("This test log sink should not be used to submit log messages.");
1719
}
1820

19-
internal sealed class AsyncTestLogSink : ILogSink, IAsyncDisposable
21+
internal sealed class AsyncTestLogSink : IAsyncLogSink, IAsyncDisposable
2022
{
2123
public bool IsDisposed { get; private set; }
2224

@@ -26,7 +28,8 @@ public ValueTask DisposeAsync()
2628
return ValueTask.CompletedTask;
2729
}
2830

29-
public void Submit(LogMessage logMessage)
31+
public ValueTask SubmitAsync<TPayload>(ILogMessage<TPayload> logMessage, CancellationToken cancellationToken)
32+
where TPayload : notnull
3033
=> throw new NotImplementedException("This test log sink should not be used to submit log messages.");
3134
}
3235

tests/LoggerTests/src/MethodTests/FlushAsyncMethodTests.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,10 @@ namespace LoggerTests.MethodTests.FlushAsyncMethodTests;
99

1010
internal sealed class TestLogSink : ILogSink
1111
{
12-
public readonly List<LogMessage> ReceivedMessages = [];
12+
public readonly List<ILogMessage> ReceivedMessages = [];
1313

14-
public void Submit(LogMessage logMessage)
14+
public void Submit<TPayload>(ILogMessage<TPayload> logMessage)
15+
where TPayload : notnull
1516
=> ReceivedMessages.Add(logMessage);
1617
}
1718

0 commit comments

Comments
 (0)