-
Notifications
You must be signed in to change notification settings - Fork 21
Add Akka.Streams.Kafka.Testkit to support other specs #263
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Draft
Arkatufus
wants to merge
2
commits into
akkadotnet:dev
Choose a base branch
from
Arkatufus:Add_Kafka_Testkit
base: dev
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Draft
Changes from all commits
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
25 changes: 25 additions & 0 deletions
25
src/Akka.Streams.Kafka.Testkit/Akka.Streams.Kafka.Testkit.csproj
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
<Project Sdk="Microsoft.NET.Sdk"> | ||
<Import Project="..\common.props" /> | ||
|
||
<PropertyGroup> | ||
<TargetFramework>netstandard2.0</TargetFramework> | ||
<LangVersion>8.0</LangVersion> | ||
</PropertyGroup> | ||
|
||
<ItemGroup> | ||
<PackageReference Include="Akka.Streams.TestKit" Version="$(AkkaVersion)" /> | ||
<PackageReference Include="Akka.TestKit.Xunit2" Version="$(AkkaVersion)" /> | ||
<PackageReference Include="Docker.DotNet" Version="3.125.5" /> | ||
<PackageReference Include="DotNet.Testcontainers" Version="1.6.0-beta.2028" /> | ||
<PackageReference Include="FakeItEasy" Version="7.2.0" /> | ||
</ItemGroup> | ||
|
||
<ItemGroup> | ||
<ProjectReference Include="..\Akka.Streams.Kafka\Akka.Streams.Kafka.csproj" /> | ||
</ItemGroup> | ||
|
||
<ItemGroup> | ||
<None Remove="Resources\reference.conf" /> | ||
<EmbeddedResource Include="Resources\reference.conf" /> | ||
</ItemGroup> | ||
</Project> |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
55 changes: 55 additions & 0 deletions
55
src/Akka.Streams.Kafka.Testkit/Dsl/ConsumerControlFactory.cs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
using System.Threading.Tasks; | ||
using Akka.Streams.Dsl; | ||
using Akka.Streams.Kafka.Helpers; | ||
|
||
namespace Akka.Streams.Kafka.Testkit.Dsl | ||
{ | ||
public static class ConsumerControlFactory | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Helper functions for consumer controls |
||
{ | ||
public static Source<TA, IControl> AttachControl<TA, TB>(Source<TA, TB> source) | ||
=> source.ViaMaterialized(ControlFlow<TA>(), Keep.Right); | ||
|
||
public static Flow<TA, TA, IControl> ControlFlow<TA>() | ||
=> Flow.Create<TA>() | ||
.ViaMaterialized(KillSwitches.Single<TA>(), Keep.Right) | ||
.MapMaterializedValue(Control); | ||
|
||
public static IControl Control(IKillSwitch killSwitch) | ||
=> new FakeControl(killSwitch); | ||
|
||
public class FakeControl : IControl | ||
{ | ||
private readonly IKillSwitch _killSwitch; | ||
private readonly TaskCompletionSource<Done> _shutdownPromise; | ||
|
||
public FakeControl(IKillSwitch killSwitch) | ||
{ | ||
_killSwitch = killSwitch; | ||
_shutdownPromise = new TaskCompletionSource<Done>(); | ||
} | ||
|
||
public Task Stop() | ||
{ | ||
_killSwitch.Shutdown(); | ||
_shutdownPromise.SetResult(Done.Instance); | ||
return _shutdownPromise.Task; | ||
} | ||
|
||
public Task Shutdown() | ||
{ | ||
_killSwitch.Shutdown(); | ||
_shutdownPromise.SetResult(Done.Instance); | ||
return _shutdownPromise.Task; | ||
} | ||
|
||
public Task IsShutdown => _shutdownPromise.Task; | ||
|
||
public Task<TResult> DrainAndShutdown<TResult>(Task<TResult> streamCompletion) | ||
{ | ||
_killSwitch.Shutdown(); | ||
_shutdownPromise.SetResult(Done.Instance); | ||
return Task.FromResult(default(TResult)); | ||
} | ||
} | ||
} | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,177 @@ | ||
using System; | ||
using System.Collections.Generic; | ||
using System.Collections.Immutable; | ||
using System.Linq; | ||
using System.Threading; | ||
using System.Threading.Tasks; | ||
using Akka.Actor.Setup; | ||
using Akka.Streams.Dsl; | ||
using Akka.Streams.Kafka.Dsl; | ||
using Akka.Streams.Kafka.Helpers; | ||
using Akka.Streams.Kafka.Messages; | ||
using Akka.Streams.Kafka.Settings; | ||
using Akka.Streams.Kafka.Testkit.Internal; | ||
using Akka.Streams.TestKit; | ||
using Akka.Util; | ||
using Confluent.Kafka; | ||
using Confluent.Kafka.Admin; | ||
using Xunit; | ||
using Xunit.Abstractions; | ||
using Config = Akka.Configuration.Config; | ||
|
||
namespace Akka.Streams.Kafka.Testkit.Dsl | ||
{ | ||
public abstract class KafkaSpec : KafkaTestKit, IAsyncLifetime | ||
{ | ||
protected KafkaSpec(string config, string actorSystemName = null, ITestOutputHelper output = null) : base(config, actorSystemName, output) | ||
{ | ||
} | ||
|
||
protected KafkaSpec(Config config, string actorSystemName = null, ITestOutputHelper output = null) : base(config, actorSystemName, output) | ||
{ | ||
} | ||
|
||
protected KafkaSpec(ActorSystemSetup config, string actorSystemName = null, ITestOutputHelper output = null) : base(config, actorSystemName, output) | ||
{ | ||
} | ||
|
||
protected IProducer<string, string> TestProducer { get; private set; } | ||
|
||
|
||
public virtual Task InitializeAsync() | ||
{ | ||
TestProducer = ProducerDefaults().CreateKafkaProducer(); | ||
SetUpAdminClient(); | ||
return Task.CompletedTask; | ||
} | ||
|
||
public virtual Task DisposeAsync() | ||
{ | ||
TestProducer?.Dispose(); | ||
CleanUpAdminClient(); | ||
Shutdown(); | ||
return Task.CompletedTask; | ||
} | ||
|
||
protected void Sleep(TimeSpan time, string msg) | ||
{ | ||
Log.Debug($"Sleeping {time}: {msg}"); | ||
Thread.Sleep(time); | ||
} | ||
|
||
protected List<T> AwaitMultiple<T>(TimeSpan timeout, IEnumerable<Task<T>> tasks) | ||
{ | ||
var completedTasks = new List<Task<T>>(); | ||
using (var cts = new CancellationTokenSource(timeout)) | ||
{ | ||
var waitingTasks = tasks.ToList(); | ||
while (waitingTasks.Count > 0) | ||
{ | ||
var anyTask = Task.WhenAny(waitingTasks); | ||
try | ||
{ | ||
anyTask.Wait(cts.Token); | ||
} | ||
catch (Exception e) | ||
{ | ||
throw new Exception($"AwaitMultiple failed. Exception: {e.Message}", e); | ||
} | ||
|
||
var completedTask = anyTask.Result; | ||
waitingTasks.Remove(completedTask); | ||
completedTasks.Add(completedTask); | ||
} | ||
} | ||
|
||
return completedTasks.Select(t => t.Result).ToList(); | ||
} | ||
|
||
protected TimeSpan SleepAfterProduce => TimeSpan.FromSeconds(4); | ||
|
||
protected void AwaitProduce(IEnumerable<Task<Done>> tasks) | ||
{ | ||
AwaitMultiple(TimeSpan.FromSeconds(4), tasks); | ||
Sleep(SleepAfterProduce, "to be sure producing has happened"); | ||
} | ||
|
||
protected readonly Partition Partition0 = new Partition(0); | ||
|
||
// Not implemented | ||
[Obsolete("Kafka DescribeCluster API isn't supported by the .NET driver")] | ||
protected void WaitUntilCluster(Func<object, bool> predicate) | ||
=> Checks.WaitUntilCluster(Settings.ClusterTimeout, Settings.CheckInterval, AdminClient, predicate, Log); | ||
|
||
protected void WaitUntilConsumerGroup(string groupId, Func<GroupInfo, bool> predicate) | ||
=> Checks.WaitUntilConsumerGroup( | ||
groupId: groupId, | ||
timeout: Settings.ConsumerGroupTimeout, | ||
sleepInBetween: Settings.CheckInterval, | ||
adminClient: AdminClient, | ||
predicate: predicate, | ||
log: Log); | ||
|
||
protected void WaitUntilConsumerSummary(string groupId, Func<List<GroupMemberInfo>, bool> predicate) | ||
=> WaitUntilConsumerGroup(groupId, info => | ||
{ | ||
return info.State == "Stable" && Try<bool>.From(() => predicate(info.Members)).OrElse(false).Success.Value; | ||
}); | ||
|
||
protected ImmutableList<string> CreateTopics(IEnumerable<int> topics) | ||
=> CreateTopicsAsync(topics).Result; | ||
|
||
protected async Task<ImmutableList<string>> CreateTopicsAsync(IEnumerable<int> topics) | ||
{ | ||
var topicNames = topics.Select(CreateTopicName).ToImmutableList(); | ||
var configs = new Dictionary<string, string>(); | ||
var newTopics = topicNames.Select(topic => | ||
new TopicSpecification | ||
{ | ||
Name = topic, | ||
NumPartitions = 1, | ||
ReplicationFactor = 1, | ||
Configs = configs | ||
}); | ||
await AdminClient.CreateTopicsAsync( | ||
topics: newTopics, | ||
options: new CreateTopicsOptions {RequestTimeout = TimeSpan.FromSeconds(10)}); | ||
return topicNames; | ||
} | ||
|
||
protected void PeriodicalCheck<T>(string description, int maxTries, TimeSpan sleepInBetween, Func<T> data, Func<T, bool> predicate) | ||
=> Checks.PeriodicalCheck(description, new TimeSpan(sleepInBetween.Ticks * maxTries), sleepInBetween, data, predicate, Log); | ||
|
||
/// <summary> | ||
/// Produce messages to topic using specified range and return a Future so the caller can synchronize consumption. | ||
/// </summary> | ||
protected Task Produce(string topic, IEnumerable<int> range, int? partition = null) | ||
=> ProduceString(topic, range.Select(i => i.ToString()), partition); | ||
|
||
protected Task ProduceString(string topic, IEnumerable<string> range, int? partition = null) | ||
{ | ||
partition ??= Partition0; | ||
return Source.From(range) | ||
// NOTE: If no partition is specified but a key is present a partition will be chosen | ||
// using a hash of the key. If neither key nor partition is present a partition | ||
// will be assigned in a round-robin fashion. | ||
.Select(n => new ProducerRecord<string, string>(topic, partition, DefaultKey, n)) | ||
.RunWith(KafkaProducer.PlainSink(ProducerDefaults().WithProducer(TestProducer)), Sys.Materializer()); | ||
} | ||
|
||
protected Task ProduceTimestamped(string topic, IEnumerable<(int, long)> timestampedRange) | ||
=> Source.From(timestampedRange) | ||
.Select( tuple => | ||
{ | ||
var (n, ts) = tuple; | ||
return new ProducerRecord<string, string>(topic, Partition0, ts, DefaultKey, n.ToString()); | ||
}) | ||
.RunWith(KafkaProducer.PlainSink(ProducerDefaults().WithProducer(TestProducer)), Sys.Materializer()); | ||
|
||
protected (IControl, TestSubscriber.Probe<string>) CreateProbe( | ||
ConsumerSettings<string, string> consumerSettings, | ||
string[] topics) | ||
=> KafkaConsumer.PlainSource(consumerSettings, Subscriptions.Topics(topics)) | ||
.Select(s => s.Message.Value) | ||
.ToMaterialized(this.SinkProbe<string>(), Keep.Both) | ||
.Run(Sys.Materializer()); | ||
} | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,91 @@ | ||
using System; | ||
using System.Collections.Generic; | ||
using System.Collections.Immutable; | ||
using System.Linq; | ||
using System.Threading; | ||
using System.Threading.Tasks; | ||
using Akka.Actor; | ||
using Akka.Streams.Implementation; | ||
using Akka.Util.Internal; | ||
|
||
namespace Akka.Streams.Kafka.Testkit | ||
{ | ||
public static class Extensions | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Moved from Kafka.Test |
||
{ | ||
public static async Task WithTimeout(this Task task, TimeSpan timeout) | ||
{ | ||
using (var cts = new CancellationTokenSource()) | ||
{ | ||
var timeoutTask = Task.Delay(timeout, cts.Token); | ||
var completed = await Task.WhenAny(task, timeoutTask); | ||
if (completed == timeoutTask) | ||
throw new OperationCanceledException("Operation timed out"); | ||
else | ||
cts.Cancel(); | ||
} | ||
} | ||
|
||
public static List<List<T>> Grouped<T>(this IEnumerable<T> messages, int size) | ||
{ | ||
var groups = new List<List<T>>(); | ||
var list = new List<T>(); | ||
var index = 0; | ||
foreach (var message in messages) | ||
{ | ||
list.Add(message); | ||
if(index != 0 && index % size == 0) | ||
{ | ||
groups.Add(list); | ||
list = new List<T>(); | ||
} | ||
|
||
index++; | ||
} | ||
if(list.Count > 0) | ||
groups.Add(list); | ||
return groups; | ||
} | ||
|
||
public static void AssertAllStagesStopped(this Akka.TestKit.Xunit2.TestKit spec, Action block, IMaterializer materializer) | ||
{ | ||
AssertAllStagesStopped(spec, () => | ||
{ | ||
block(); | ||
return NotUsed.Instance; | ||
}, materializer); | ||
} | ||
|
||
public static T AssertAllStagesStopped<T>(this Akka.TestKit.Xunit2.TestKit spec, Func<T> block, IMaterializer materializer) | ||
{ | ||
if (!(materializer is ActorMaterializerImpl impl)) | ||
return block(); | ||
|
||
var probe = spec.CreateTestProbe(impl.System); | ||
probe.Send(impl.Supervisor, StreamSupervisor.StopChildren.Instance); | ||
probe.ExpectMsg<StreamSupervisor.StoppedChildren>(); | ||
var result = block(); | ||
|
||
probe.Within(TimeSpan.FromSeconds(5), () => | ||
{ | ||
IImmutableSet<IActorRef> children = ImmutableHashSet<IActorRef>.Empty; | ||
try | ||
{ | ||
probe.AwaitAssert(() => | ||
{ | ||
impl.Supervisor.Tell(StreamSupervisor.GetChildren.Instance, probe.Ref); | ||
children = probe.ExpectMsg<StreamSupervisor.Children>().Refs; | ||
if (children.Count != 0) | ||
throw new Exception($"expected no StreamSupervisor children, but got {children.Aggregate("", (s, @ref) => s + @ref + ", ")}"); | ||
}); | ||
} | ||
catch | ||
{ | ||
children.ForEach(c=>c.Tell(StreamSupervisor.PrintDebugDump.Instance)); | ||
throw; | ||
} | ||
}); | ||
|
||
return result; | ||
} | ||
} | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.