Skip to content

CleanOnionArchitecture/EventBus.Kafka

Repository files navigation

EventBus.Kafka

EventBus.Kafka is a Kafka implementation of the Clean Onion Architecture Event Bus
This nuget package provides a quick and clean solution to messaging with Kafka and currently supports only .NET 6 and .NET 7

Quick Start

To installing a package run command or install from nuget store.

dotnet add package CleanOnionArchitecture.EventBus.Kafka

Register your event bus like this
builder.Services
    .AddEventBus()
    .AddKafkaEventBus(myKafkaConfiguration =>
    {
        myKafkaConfiguration.Server = "myKafkaServer";
        myKafkaConfiguration.Port = "myPort";
        //And other fields you desire
    });

or like this

builder.Services
    .AddEventBus()
    .AddKafkaEventBus("appsettings section goes here!");

An example appsettings section without authentication

{
    "MyKafkaConfiguration": {
        "Server": "localhost",
        "Port": 9092,
        "ConsumerGroupId": "kafka-consumers",
        "IsUsingAuthentication" : false,
        "Username": "these",
        "Password": "values",
        "SaslMechanism" : "are",
        "SecurityProtocol" : "optional",
        "EnableFlush" : "true",
        "FlushTimeout" : 10
    }
}

An example appsettings section with authentication
{
    "MyKafkaConfiguration": {
        "Server": "localhost",
        "Port": 9092,
        "ConsumerGroupId": "kafka-consumers",
        "IsUsingAuthentication" : true,
        "Username": "admin",
        "Password": "admin-secret",
        "SaslMechanism" : "PLAIN",
        "SecurityProtocol" : "Plaintext",
        "EnableFlush" : "true",
        "FlushTimeout" : 10
    }
}

Note

Supported Sasl mechanisms values are GSSAPI, PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, OAUTHBEARER
Supported Security Protocol values are Plaintext, Ssl, SaslPlaintext, SaslSsl
These registration types are also supporting generic arguments

EnableFlush
Flush status Kafka Event Bus. If enabled producer connection closes itself after given FlushTimeout seconds.
Default value is true

FlushTimeout
Flush Timeout value for Kafka Event Bus. This value timeouts the Flush method as given seconds
Default value is 10


public interface INotificationServiceEventBus : IKafkaEventBus
{
}

public class NotificationServiceEventBus : KafkaEventBus, INotificationServiceEventBus
{
    public NotificationServiceEventBus(ILogger<IEventBus> logger, ISubscriptionManager eventBusSubscriptionManager, IServiceScopeFactory serviceScopeFactory, KafkaServiceConfiguration kafkaServiceConfiguration) 
        : base(logger, eventBusSubscriptionManager, serviceScopeFactory, kafkaServiceConfiguration)
    {
    }
}

builder.Services
    .AddEventBus()
    .AddKafkaEventBus<INotificationServiceEventBus, NotificationServiceEventBus>("Integrations:NotificationService:Kafka");

In order to messaging with Kafka objects should be inherited from Event class
public record DummyEvent : Event
{
    public string DummyMessage { get; set; }
}

Warning


Curently Kafka topic names are created from class names, make sure producer and consumer have the same class name in both projects


Producer

For producer, once event bus has registered, it can be simply published

    private readonly IKafkaEventBus _eventBus; //This can be another implementation which inherits IKafkaEventBus

    public EventPublisherProvider(
        IKafkaEventBus eventBus
    )
    {
        this._eventBus = eventBus;
    }

    public async Task PublishDummy(DummyEvent @event)
    {
        await this._eventBus.PublishAsync(@event);
    }

Consumer

Consumer side must contain an event handler that inherits IEventHandler for every event

public class DummyEventHandler : IEventHandler<DummyEvent>
{
    public Task HandleEvent(DummyEvent @event, CancellationToken cancellationToken)
    {
        //logic here 
    }
}

Note

When EnableDeadLetter configuration is set to true, events that throws an exception when handled will be send to the topic named DeadLetter


On service registration part event handlers must be registered

builder.Services.AddScoped<DummyEventHandler>();

Note

Currently this registration step is manual, further improvements will focus on this situtation


Consumer must subscribe every event that needs to processed

WebApplication app = builder.Build();
IKafkaEventBus eventBus = app.Services.GetRequiredService<IKafkaEventBus>();
eventBus.SubscribeAsync<DummyEvent, DummyEventHandler>(CancellationToken.None).ConfigureAwait(false);

Further improvements are on the way! 👨‍💻
Feel free to share your thoughts or contribute to our project

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages