diff --git a/src/Foundatio.TestHarness/Messaging/MessageBusTestBase.cs b/src/Foundatio.TestHarness/Messaging/MessageBusTestBase.cs index 0d763e285..f19f1542c 100644 --- a/src/Foundatio.TestHarness/Messaging/MessageBusTestBase.cs +++ b/src/Foundatio.TestHarness/Messaging/MessageBusTestBase.cs @@ -522,7 +522,7 @@ await messageBus.SubscribeAsync(msg => { Interlocked.Increment(ref messageCount); cancellationTokenSource.Cancel(); countdown.Signal(); - }, cancellationTokenSource.Token); + }, cancellationToken: cancellationTokenSource.Token); await messageBus.SubscribeAsync(msg => countdown.Signal()); diff --git a/src/Foundatio.TestHarness/Queue/QueueTestBase.cs b/src/Foundatio.TestHarness/Queue/QueueTestBase.cs index 3afebc009..0db1b2bbc 100644 --- a/src/Foundatio.TestHarness/Queue/QueueTestBase.cs +++ b/src/Foundatio.TestHarness/Queue/QueueTestBase.cs @@ -553,6 +553,7 @@ public virtual async Task WillWaitForItemAsync() { return; try { + Log.MinimumLevel = LogLevel.Trace; await queue.DeleteQueueAsync(); await AssertEmptyQueueAsync(queue); @@ -564,14 +565,16 @@ public virtual async Task WillWaitForItemAsync() { Assert.InRange(sw.Elapsed, TimeSpan.FromMilliseconds(50), TimeSpan.FromMilliseconds(5000)); _ = Task.Run(async () => { - await SystemClock.SleepAsync(500); + await Task.Delay(500); + _logger.LogInformation("Enqueuing async message"); await queue.EnqueueAsync(new SimpleWorkItem { Data = "Hello" }); }); sw.Restart(); - workItem = await queue.DequeueAsync(TimeSpan.FromSeconds(1)); + _logger.LogInformation("Dequeuing message with timeout"); + workItem = await queue.DequeueAsync(TimeSpan.FromSeconds(10)); sw.Stop(); if (_logger.IsEnabled(LogLevel.Trace)) _logger.LogTrace("Time {Elapsed:g}", sw.Elapsed); Assert.True(sw.Elapsed > TimeSpan.FromMilliseconds(400)); diff --git a/src/Foundatio/Foundatio.csproj b/src/Foundatio/Foundatio.csproj index 91824d2ee..2b33bb216 100644 --- a/src/Foundatio/Foundatio.csproj +++ b/src/Foundatio/Foundatio.csproj @@ -5,5 +5,6 @@ + diff --git a/src/Foundatio/Messaging/IMessageBus.cs b/src/Foundatio/Messaging/IMessageBus.cs index f1eeca8df..aa9ad9ac5 100644 --- a/src/Foundatio/Messaging/IMessageBus.cs +++ b/src/Foundatio/Messaging/IMessageBus.cs @@ -7,6 +7,8 @@ public interface IMessageBus : IMessagePublisher, IMessageSubscriber, IDisposabl public class MessageOptions { public string UniqueId { get; set; } public string CorrelationId { get; set; } + public string Topic { get; set; } + public string MessageType { get; set; } public TimeSpan? DeliveryDelay { get; set; } public IDictionary Properties { get; set; } = new Dictionary(); } diff --git a/src/Foundatio/Messaging/IMessageSubscriber.cs b/src/Foundatio/Messaging/IMessageSubscriber.cs index e7517d958..557873d22 100644 --- a/src/Foundatio/Messaging/IMessageSubscriber.cs +++ b/src/Foundatio/Messaging/IMessageSubscriber.cs @@ -1,37 +1,68 @@ using System; +using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; namespace Foundatio.Messaging { public interface IMessageSubscriber { - Task SubscribeAsync(Func handler, CancellationToken cancellationToken = default) where T : class; + Task SubscribeAsync(Func handler, MessageSubscriptionOptions options = null, CancellationToken cancellationToken = default) where T : class; } public static class MessageBusExtensions { - public static Task SubscribeAsync(this IMessageSubscriber subscriber, Func handler, CancellationToken cancellationToken = default) where T : class { - return subscriber.SubscribeAsync((msg, token) => handler(msg), cancellationToken); + public static Task SubscribeAsync(this IMessageSubscriber subscriber, Func handler, MessageSubscriptionOptions options = null, CancellationToken cancellationToken = default) where T : class { + return subscriber.SubscribeAsync((msg, token) => handler(msg), options, cancellationToken); } - public static Task SubscribeAsync(this IMessageSubscriber subscriber, Action handler, CancellationToken cancellationToken = default) where T : class { + public static Task SubscribeAsync(this IMessageSubscriber subscriber, Action handler, MessageSubscriptionOptions options = null, CancellationToken cancellationToken = default) where T : class { return subscriber.SubscribeAsync((msg, token) => { handler(msg); return Task.CompletedTask; - }, cancellationToken); + }, options, cancellationToken); } - public static Task SubscribeAsync(this IMessageSubscriber subscriber, Func handler, CancellationToken cancellationToken = default) { - return subscriber.SubscribeAsync((msg, token) => handler(msg, token), cancellationToken); + public static Task SubscribeAsync(this IMessageSubscriber subscriber, Func handler, MessageSubscriptionOptions options = null, CancellationToken cancellationToken = default) { + return subscriber.SubscribeAsync((msg, token) => handler(msg, token), options, cancellationToken); } - public static Task SubscribeAsync(this IMessageSubscriber subscriber, Func handler, CancellationToken cancellationToken = default) { - return subscriber.SubscribeAsync((msg, token) => handler(msg), cancellationToken); + public static Task SubscribeAsync(this IMessageSubscriber subscriber, Func handler, MessageSubscriptionOptions options = null, CancellationToken cancellationToken = default) { + return subscriber.SubscribeAsync((msg, token) => handler(msg), options, cancellationToken); } - public static Task SubscribeAsync(this IMessageSubscriber subscriber, Action handler, CancellationToken cancellationToken = default) { + public static Task SubscribeAsync(this IMessageSubscriber subscriber, Action handler, MessageSubscriptionOptions options = null, CancellationToken cancellationToken = default) { return subscriber.SubscribeAsync((msg, token) => { handler(msg); return Task.CompletedTask; - }, cancellationToken); + }, options, cancellationToken); } } + + public interface IMessageSubscription : IAsyncDisposable { + string SubscriptionId { get; } + IDictionary Properties { get; } + } + + public class MessageSubscription : IMessageSubscription { + private readonly Func _disposeSubscriptionFunc; + + public MessageSubscription(string subscriptionId, Func disposeSubscriptionFunc) { + SubscriptionId = subscriptionId; + _disposeSubscriptionFunc = disposeSubscriptionFunc; + } + + public string SubscriptionId { get; } + public IDictionary Properties { get; set; } = new Dictionary(); + + public ValueTask DisposeAsync() { + return _disposeSubscriptionFunc?.Invoke() ?? new ValueTask(); + } + } + + public class MessageSubscriptionOptions { + /// + /// The topic name + /// + public string Topic { get; set; } + + public IDictionary Properties { get; set; } = new Dictionary(); + } } diff --git a/src/Foundatio/Messaging/Message.cs b/src/Foundatio/Messaging/Message.cs index 0b41b4590..4ca2feb43 100644 --- a/src/Foundatio/Messaging/Message.cs +++ b/src/Foundatio/Messaging/Message.cs @@ -1,17 +1,22 @@ using System; using System.Collections.Generic; using System.Diagnostics; -using Foundatio.Serializer; +using System.Threading.Tasks; namespace Foundatio.Messaging { - public interface IMessage { + public interface IMessage : IAsyncDisposable { string UniqueId { get; } string CorrelationId { get; } + string Topic { get; } string Type { get; } Type ClrType { get; } byte[] Data { get; } object GetBody(); IDictionary Properties { get; } + + Task RenewLockAsync(); + Task AbandonAsync(); + Task CompleteAsync(); } public interface IMessage : IMessage where T: class { @@ -29,11 +34,29 @@ public Message(byte[] data, Func getBody) { public string UniqueId { get; set; } public string CorrelationId { get; set; } + public string Topic { get; set; } public string Type { get; set; } public Type ClrType { get; set; } public IDictionary Properties { get; set; } = new Dictionary(); public byte[] Data { get; set; } + public object GetBody() => _getBody(this); + + public virtual Task AbandonAsync() { + return Task.CompletedTask; + } + + public virtual Task CompleteAsync() { + return Task.CompletedTask; + } + + public virtual Task RenewLockAsync() { + return Task.CompletedTask; + } + + public virtual ValueTask DisposeAsync() { + return new ValueTask(AbandonAsync()); + } } public class Message : IMessage where T : class { @@ -44,19 +67,19 @@ public Message(IMessage message) { } public byte[] Data => _message.Data; - public T Body => (T)GetBody(); - public string UniqueId => _message.UniqueId; - public string CorrelationId => _message.CorrelationId; - + public string Topic => _message.Topic; public string Type => _message.Type; - public Type ClrType => _message.ClrType; - public IDictionary Properties => _message.Properties; public object GetBody() => _message.GetBody(); + + public Task AbandonAsync() => _message.AbandonAsync(); + public Task CompleteAsync() => _message.CompleteAsync(); + public Task RenewLockAsync() => _message.RenewLockAsync(); + public ValueTask DisposeAsync() => _message.DisposeAsync(); } } \ No newline at end of file diff --git a/src/Foundatio/Messaging/MessageBusBase.cs b/src/Foundatio/Messaging/MessageBusBase.cs index 56a17d3a1..bce9f2c51 100644 --- a/src/Foundatio/Messaging/MessageBusBase.cs +++ b/src/Foundatio/Messaging/MessageBusBase.cs @@ -6,6 +6,7 @@ using System.Reflection; using System.Threading; using System.Threading.Tasks; +using System.Threading.Channels; using Foundatio.Serializer; using Foundatio.Utility; using Microsoft.Extensions.Logging; @@ -13,6 +14,7 @@ namespace Foundatio.Messaging { public abstract class MessageBusBase : IMessageBus, IDisposable where TOptions : SharedMessageBusOptions { + protected readonly ConcurrentDictionary> _topics = new(); private readonly CancellationTokenSource _messageBusDisposedCancellationTokenSource; protected readonly ConcurrentDictionary _subscribers = new(); protected readonly TOptions _options; @@ -25,7 +27,7 @@ public MessageBusBase(TOptions options) { var loggerFactory = options?.LoggerFactory ?? NullLoggerFactory.Instance; _logger = loggerFactory.CreateLogger(GetType()); _serializer = options.Serializer ?? DefaultSerializer.Instance; - MessageBusId = _options.Topic + Guid.NewGuid().ToString("N").Substring(10); + MessageBusId = _options.DefaultTopic + Guid.NewGuid().ToString("N").Substring(10); _messageBusDisposedCancellationTokenSource = new CancellationTokenSource(); } @@ -59,11 +61,11 @@ protected string GetMappedMessageType(Type messageType) { } private readonly ConcurrentDictionary _knownMessageTypesCache = new(); - protected virtual Type GetMappedMessageType(string messageType) { - if (String.IsNullOrEmpty(messageType)) + protected virtual Type GetMappedMessageType(IConsumeMessageContext context) { + if (context == null || String.IsNullOrEmpty(context.MessageType)) return null; - return _knownMessageTypesCache.GetOrAdd(messageType, type => { + return _knownMessageTypesCache.GetOrAdd(context.MessageType, type => { if (_options.MessageTypeMappings != null && _options.MessageTypeMappings.ContainsKey(type)) return _options.MessageTypeMappings[type]; @@ -86,11 +88,14 @@ protected virtual Type GetMappedMessageType(string messageType) { } }); } + protected Type GetMappedMessageType(string messageType) { + return GetMappedMessageType(new ConsumeMessageContext { MessageType = messageType }); + } protected virtual Task RemoveTopicSubscriptionAsync() => Task.CompletedTask; protected virtual Task EnsureTopicSubscriptionAsync(CancellationToken cancellationToken) => Task.CompletedTask; - protected virtual Task SubscribeImplAsync(Func handler, CancellationToken cancellationToken) where T : class { + protected virtual Task SubscribeImplAsync(Func handler, MessageSubscriptionOptions options, CancellationToken cancellationToken = default) where T : class { var subscriber = new Subscriber { CancellationToken = cancellationToken, Type = typeof(T), @@ -105,14 +110,6 @@ protected virtual Task SubscribeImplAsync(Func ha } }; - if (cancellationToken != CancellationToken.None) { - cancellationToken.Register(() => { - _subscribers.TryRemove(subscriber.Id, out _); - if (_subscribers.Count == 0) - RemoveTopicSubscriptionAsync().GetAwaiter().GetResult(); - }); - } - if (subscriber.Type.Name == "IMessage`1" && subscriber.Type.GenericTypeArguments.Length == 1) { var modelType = subscriber.Type.GenericTypeArguments.Single(); subscriber.GenericType = typeof(Message<>).MakeGenericType(modelType); @@ -121,15 +118,26 @@ protected virtual Task SubscribeImplAsync(Func ha if (!_subscribers.TryAdd(subscriber.Id, subscriber) && _logger.IsEnabled(LogLevel.Error)) _logger.LogError("Unable to add subscriber {SubscriberId}", subscriber.Id); - return Task.CompletedTask; + return Task.FromResult(new MessageSubscription(Guid.NewGuid().ToString("N"), async () => { + _subscribers.TryRemove(subscriber.Id, out _); + if (_subscribers.Count == 0) + await RemoveTopicSubscriptionAsync().AnyContext(); + })); } - public async Task SubscribeAsync(Func handler, CancellationToken cancellationToken = default) where T : class { + public async Task SubscribeAsync(Func handler, MessageSubscriptionOptions options = null, CancellationToken cancellationToken = default) where T : class { if (_logger.IsEnabled(LogLevel.Trace)) _logger.LogTrace("Adding subscriber for {MessageType}.", typeof(T).FullName); - await SubscribeImplAsync(handler, cancellationToken).AnyContext(); + options ??= new MessageSubscriptionOptions(); + + var sub = await SubscribeImplAsync(handler, options).AnyContext(); await EnsureTopicSubscriptionAsync(cancellationToken).AnyContext(); + + if (cancellationToken != CancellationToken.None) + cancellationToken.Register(() => sub.DisposeAsync()); + + return sub; } protected List GetMessageSubscribers(IMessage message) { diff --git a/src/Foundatio/Messaging/NullMessageBus.cs b/src/Foundatio/Messaging/NullMessageBus.cs index a7bce4390..b79505a7c 100644 --- a/src/Foundatio/Messaging/NullMessageBus.cs +++ b/src/Foundatio/Messaging/NullMessageBus.cs @@ -10,8 +10,8 @@ public Task PublishAsync(Type messageType, object message, MessageOptions option return Task.CompletedTask; } - public Task SubscribeAsync(Func handler, CancellationToken cancellationToken = default) where T : class { - return Task.CompletedTask; + public Task SubscribeAsync(Func handler, MessageSubscriptionOptions options = null, CancellationToken cancellationToken = default) where T : class { + return Task.FromResult(new MessageSubscription(Guid.NewGuid().ToString(), () => new ValueTask())); } public void Dispose() {} diff --git a/src/Foundatio/Messaging/SharedMessageBusOptions.cs b/src/Foundatio/Messaging/SharedMessageBusOptions.cs index b7146a1a6..4f0cb2142 100644 --- a/src/Foundatio/Messaging/SharedMessageBusOptions.cs +++ b/src/Foundatio/Messaging/SharedMessageBusOptions.cs @@ -1,26 +1,58 @@ -using System; +using System; using System.Collections.Generic; namespace Foundatio.Messaging { public class SharedMessageBusOptions : SharedOptions { /// - /// The topic name + /// The default topic name /// - public string Topic { get; set; } = "messages"; + public string DefaultTopic { get; set; } = "messages"; /// - /// Controls which types messages are mapped to. + /// Resolves message types + /// + public IMessageRouter Router { get; set; } + + /// + /// Statically configured message type mappings. will be run first and then this dictionary will be checked. /// public Dictionary MessageTypeMappings { get; set; } = new Dictionary(); } + public interface IMessageRouter { + // get topic from bus options, message and message options + // get message type from message and options + // get .net type from topic, message type and properties (headers) + IConsumeMessageContext ToMessageType(Type messageType); + Type ToClrType(IConsumeMessageContext context); + } + + public interface IConsumeMessageContext { + string Topic { get; set; } + string MessageType { get; set; } + IDictionary Properties { get; } + } + + public class ConsumeMessageContext : IConsumeMessageContext { + public string Topic { get; set; } + public string MessageType { get; set; } + public IDictionary Properties { get; set; } = new Dictionary(); + } + public class SharedMessageBusOptionsBuilder : SharedOptionsBuilder where TOptions : SharedMessageBusOptions, new() where TBuilder : SharedMessageBusOptionsBuilder { public TBuilder Topic(string topic) { if (string.IsNullOrEmpty(topic)) throw new ArgumentNullException(nameof(topic)); - Target.Topic = topic; + Target.DefaultTopic = topic; + return (TBuilder)this; + } + + public TBuilder MessageTypeResolver(IMessageRouter resolver) { + if (resolver == null) + throw new ArgumentNullException(nameof(resolver)); + Target.Router = resolver; return (TBuilder)this; } diff --git a/src/Foundatio/Queues/IQueueEntry.cs b/src/Foundatio/Queues/IQueueEntry.cs index a159cbe08..a3b57ab84 100644 --- a/src/Foundatio/Queues/IQueueEntry.cs +++ b/src/Foundatio/Queues/IQueueEntry.cs @@ -4,7 +4,7 @@ using Foundatio.Utility; namespace Foundatio.Queues { - public interface IQueueEntry { + public interface IQueueEntry : IAsyncDisposable { string Id { get; } string CorrelationId { get; } IDictionary Properties { get; } @@ -18,7 +18,6 @@ public interface IQueueEntry { Task RenewLockAsync(); Task AbandonAsync(); Task CompleteAsync(); - ValueTask DisposeAsync(); } public interface IQueueEntry : IQueueEntry where T : class { diff --git a/src/Foundatio/Serializer/ISerializer.cs b/src/Foundatio/Serializer/ISerializer.cs index 25f022b23..bec99a225 100644 --- a/src/Foundatio/Serializer/ISerializer.cs +++ b/src/Foundatio/Serializer/ISerializer.cs @@ -14,6 +14,8 @@ public static class DefaultSerializer { public static ISerializer Instance { get; set; } = new SystemTextJsonSerializer(); } + // TODO: Add a wrapper that adds activities for serialization + public static class SerializerExtensions { public static T Deserialize(this ISerializer serializer, Stream data) { return (T)serializer.Deserialize(data, typeof(T));