Skip to content

Commit ed5dab8

Browse files
IgorFedchenkoAaronontheweb
authored andcommitted
FlowWithContext (#83)
* Major update of producers core to match alpakka codebase and add features * Added FlowWithContext implementation * Implemented source/flow with context integration test
1 parent f418e7d commit ed5dab8

20 files changed

+1108
-266
lines changed

examples/SimpleProducer/Program.cs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
using System;
22
using System.Linq;
33
using System.Text;
4+
using System.Threading.Tasks;
5+
using Akka;
46
using Akka.Actor;
57
using Akka.Configuration;
68
using Akka.Streams;
@@ -31,14 +33,15 @@ public static void Main(string[] args)
3133
Source
3234
.Cycle(() => Enumerable.Range(1, 100).GetEnumerator())
3335
.Select(c => c.ToString())
34-
.Select(elem => new MessageAndMeta<Null, string> { Topic = "akka100", Message = new Message<Null, string> { Value = elem }})
35-
.Via(KafkaProducer.PlainFlow(producerSettings))
36-
.Select(record =>
36+
.Select(elem => ProducerMessage.Single(new ProducerRecord<Null, string>("akka100", elem)))
37+
.Via(KafkaProducer.FlexiFlow<Null, string, NotUsed>(producerSettings))
38+
.Select(result =>
3739
{
38-
Console.WriteLine($"Producer: {record.Topic}/{record.Partition} {record.Offset}: {record.Value}");
39-
return record;
40+
var response = result as Result<Null, string, NotUsed>;
41+
Console.WriteLine($"Producer: {response.Metadata.Topic}/{response.Metadata.Partition} {response.Metadata.Offset}: {response.Metadata.Value}");
42+
return result;
4043
})
41-
.RunWith(Sink.Ignore<DeliveryReport<Null, string>>(), materializer);
44+
.RunWith(Sink.Ignore<IResults<Null, string, NotUsed>>(), materializer);
4245

4346
// TODO: producer as a Commitable Sink
4447

src/Akka.Streams.Kafka.Tests/Integration/CommittablePartitionedSourceIntegrationTests.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,8 @@ public async Task CommittablePartitionedSource_Should_handle_exceptions_in_strea
8383
return t.Result;
8484
});
8585
})
86-
.MergeSubstreams().As<Source<int, IControl>>()
86+
.MergeSubstreams()
87+
.As<Source<int, IControl>>()
8788
.Scan(0, (c, n) => c + n)
8889
.ToMaterialized(Sink.Last<int>(), Keep.Both)
8990
.MapMaterializedValue(tuple => DrainingControl<int>.Create(tuple.Item1, tuple.Item2))

src/Akka.Streams.Kafka.Tests/Integration/CommittableSourceIntegrationTests.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ public async Task CommitableSource_consumes_messages_from_Producer_without_commi
3535

3636
await Source
3737
.From(Enumerable.Range(1, elementsCount))
38-
.Select(elem => new MessageAndMeta<Null, string> { TopicPartition = topicPartition1, Message = new Message<Null, string> { Value = elem.ToString() } })
38+
.Select(elem => new ProducerRecord<Null, string>(topicPartition1, elem.ToString()))
3939
.RunWith(KafkaProducer.PlainSink(ProducerSettings), Materializer);
4040

4141
var consumerSettings = CreateConsumerSettings<string>(group1);
@@ -65,7 +65,7 @@ public async Task CommitableSource_resume_from_commited_offset()
6565

6666
await Source
6767
.From(Enumerable.Range(1, 100))
68-
.Select(elem => new MessageAndMeta<Null, string> { TopicPartition = topicPartition1, Message = new Message<Null, string> { Value = elem.ToString() } })
68+
.Select(elem => new ProducerRecord<Null, string>(topicPartition1, elem.ToString()))
6969
.RunWith(KafkaProducer.PlainSink(ProducerSettings), Materializer);
7070

7171
var consumerSettings = CreateConsumerSettings<string>(group1);
@@ -103,7 +103,7 @@ await Source
103103
// some concurrent publish
104104
await Source
105105
.From(Enumerable.Range(101, 100))
106-
.Select(elem => new MessageAndMeta<Null, string> { TopicPartition = topicPartition1, Message = new Message<Null, string> { Value = elem.ToString() } })
106+
.Select(elem => new ProducerRecord<Null, string>(topicPartition1, elem.ToString()))
107107
.RunWith(KafkaProducer.PlainSink(ProducerSettings), Materializer);
108108

109109
probe2.Request(100);
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
using System;
2+
using System.Collections.Immutable;
3+
using System.Linq;
4+
using System.Threading.Tasks;
5+
using Akka.Streams.Dsl;
6+
using Akka.Streams.Implementation.Fusing;
7+
using Akka.Streams.Kafka.Dsl;
8+
using Akka.Streams.Kafka.Extensions;
9+
using Akka.Streams.Kafka.Helpers;
10+
using Akka.Streams.Kafka.Messages;
11+
using Akka.Streams.Kafka.Settings;
12+
using Confluent.Kafka;
13+
using FluentAssertions;
14+
using Xunit;
15+
using Xunit.Abstractions;
16+
17+
namespace Akka.Streams.Kafka.Tests.Integration
18+
{
19+
public class FlowWithContextIntegrationTests : KafkaIntegrationTests
20+
{
21+
public FlowWithContextIntegrationTests(ITestOutputHelper output, KafkaFixture fixture)
22+
: base(nameof(FlowWithContextIntegrationTests), output, fixture)
23+
{
24+
}
25+
26+
[Fact]
27+
public async Task ProducerFlowWithContext_should_work_with_source_with_context()
28+
{
29+
bool Duplicate(string value) => value == "1";
30+
bool Ignore(string value) => value == "2";
31+
32+
var consumerSettings = CreateConsumerSettings<string, string>(CreateGroup(1));
33+
var topic1 = CreateTopic(1);
34+
var topic2 = CreateTopic(2);
35+
var topic3 = CreateTopic(3);
36+
var topic4 = CreateTopic(4);
37+
var producerSettings = BuildProducerSettings<string, string>();
38+
var committerSettings = CommitterSettings;
39+
var totalMessages = 10;
40+
var totalConsumed = 0;
41+
42+
await ProduceStrings(topic1, Enumerable.Range(1, totalMessages), producerSettings);
43+
44+
var control = KafkaConsumer.SourceWithOffsetContext(consumerSettings, Subscriptions.Topics(topic1))
45+
.Select(record =>
46+
{
47+
IEnvelope<string, string, NotUsed> output;
48+
if (Duplicate(record.Value))
49+
{
50+
output = ProducerMessage.Multi(new[]
51+
{
52+
new ProducerRecord<string, string>(topic2, record.Key, record.Value),
53+
new ProducerRecord<string, string>(topic3, record.Key, record.Value)
54+
}.ToImmutableSet());
55+
}
56+
else if (Ignore(record.Value))
57+
{
58+
output = ProducerMessage.PassThrough<string, string>();
59+
}
60+
else
61+
{
62+
output = ProducerMessage.Single(new ProducerRecord<string, string>(topic4, record.Key, record.Value));
63+
}
64+
65+
Log.Debug($"Giving message of type {output.GetType().Name}");
66+
return output;
67+
})
68+
.Via(KafkaProducer.FlowWithContext<string, string, ICommittableOffset>(producerSettings))
69+
.AsSource()
70+
.Log("Produced messages", r => $"Committing {r.Item2.Offset.Topic}:{r.Item2.Offset.Partition}[{r.Item2.Offset.Offset}]")
71+
.ToMaterialized(Committer.SinkWithOffsetContext<IResults<string, string, ICommittableOffset>>(committerSettings), Keep.Both)
72+
.MapMaterializedValue(tuple => DrainingControl<NotUsed>.Create(tuple.Item1, tuple.Item2.ContinueWith(t => NotUsed.Instance)))
73+
.Run(Materializer);
74+
75+
var (control2, result) = KafkaConsumer.PlainSource(consumerSettings, Subscriptions.Topics(topic2, topic3, topic4))
76+
.Scan(0, (c, _) => c + 1)
77+
.Select(consumed =>
78+
{
79+
totalConsumed = consumed;
80+
return consumed;
81+
})
82+
.ToMaterialized(Sink.Last<int>(), Keep.Both)
83+
.Run(Materializer);
84+
85+
AwaitCondition(() => totalConsumed == totalMessages, TimeSpan.FromSeconds(30));
86+
87+
AssertTaskCompletesWithin(TimeSpan.FromSeconds(10), control.DrainAndShutdown());
88+
AssertTaskCompletesWithin(TimeSpan.FromSeconds(10), control2.Shutdown());
89+
AssertTaskCompletesWithin(TimeSpan.FromSeconds(10), result).Should().Be(totalConsumed);
90+
}
91+
}
92+
}

src/Akka.Streams.Kafka.Tests/Integration/PlainSinkIntegrationTests.cs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ public async Task PlainSink_should_publish_100_elements_to_Kafka_producer()
4242
await Source
4343
.From(Enumerable.Range(1, 100))
4444
.Select(c => c.ToString())
45-
.Select(elem => new MessageAndMeta<Null, string> { TopicPartition = topicPartition1, Message = new Message<Null, string> { Value = elem } })
45+
.Select(elem => new ProducerRecord<Null, string>(topicPartition1, elem.ToString()))
4646
.RunWith(KafkaProducer.PlainSink(ProducerSettings), Materializer);
4747

4848
var dateTimeStart = DateTime.UtcNow;
@@ -80,9 +80,10 @@ public async Task PlainSink_should_fail_stage_if_broker_unavailable()
8080
var probe = Source
8181
.From(Enumerable.Range(1, 100))
8282
.Select(c => c.ToString())
83-
.Select(elem => new MessageAndMeta<Null, string> { Topic = topic1, Message = new Message<Null, string> { Value = elem } })
84-
.Via(KafkaProducer.PlainFlow(config))
85-
.RunWith(this.SinkProbe<DeliveryReport<Null, string>>(), Materializer);
83+
.Select(elem => new ProducerRecord<Null, string>(topic1, elem.ToString()))
84+
.Select(record => new Message<Null, string, NotUsed>(record, NotUsed.Instance) as IEnvelope<Null, string, NotUsed>)
85+
.Via(KafkaProducer.FlexiFlow<Null, string, NotUsed>(config))
86+
.RunWith(this.SinkProbe<IResults<Null, string, NotUsed>>(), Materializer);
8687

8788
probe.ExpectSubscription();
8889
probe.OnError(new KafkaException(ErrorCode.Local_Transport));

src/Akka.Streams.Kafka.Tests/KafkaIntegrationTests.cs

Lines changed: 37 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -38,16 +38,27 @@ public KafkaIntegrationTests(string actorSystemName, ITestOutputHelper output, K
3838

3939
protected string CreateTopic(int number) => $"topic-{number}-{Uuid}";
4040
protected string CreateGroup(int number) => $"group-{number}-{Uuid}";
41+
42+
protected ProducerSettings<Null, string> ProducerSettings => BuildProducerSettings<Null, string>();
4143

42-
protected ProducerSettings<Null, string> ProducerSettings
44+
protected ProducerSettings<TKey, TValue> BuildProducerSettings<TKey, TValue>()
4345
{
44-
get => ProducerSettings<Null, string>.Create(Sys, null, null).WithBootstrapServers(_fixture.KafkaServer);
46+
return ProducerSettings<TKey, TValue>.Create(Sys, null, null).WithBootstrapServers(_fixture.KafkaServer);
4547
}
4648

4749
protected CommitterSettings CommitterSettings
4850
{
4951
get => CommitterSettings.Create(Sys);
5052
}
53+
54+
protected ConsumerSettings<TKey, TValue> CreateConsumerSettings<TKey, TValue>(string group)
55+
{
56+
return ConsumerSettings<TKey, TValue>.Create(Sys, null, null)
57+
.WithBootstrapServers(_fixture.KafkaServer)
58+
.WithStopTimeout(TimeSpan.FromSeconds(1))
59+
.WithProperty("auto.offset.reset", "earliest")
60+
.WithGroupId(group);
61+
}
5162

5263
protected ConsumerSettings<Null, TValue> CreateConsumerSettings<TValue>(string group)
5364
{
@@ -58,41 +69,54 @@ protected ConsumerSettings<Null, TValue> CreateConsumerSettings<TValue>(string g
5869
.WithGroupId(group);
5970
}
6071

61-
protected async Task ProduceStrings(string topic, IEnumerable<int> range, ProducerSettings<Null, string> producerSettings)
72+
protected async Task ProduceStrings<TKey>(string topic, IEnumerable<int> range, ProducerSettings<TKey, string> producerSettings)
6273
{
6374
await Source
6475
.From(range)
65-
.Select(elem => new MessageAndMeta<Null, string> { Topic = topic, Message = new Message<Null, string> { Value = elem.ToString() } })
76+
.Select(elem => new ProducerRecord<TKey, string>(topic, elem.ToString()))
6677
.RunWith(KafkaProducer.PlainSink(producerSettings), Materializer);
6778
}
6879

69-
protected async Task ProduceStrings(Func<int, TopicPartition> partitionSelector, IEnumerable<int> range, ProducerSettings<Null, string> producerSettings)
80+
protected async Task ProduceStrings<TKey>(Func<int, TopicPartition> partitionSelector, IEnumerable<int> range, ProducerSettings<TKey, string> producerSettings)
7081
{
7182
await Source
7283
.From(range)
73-
.Select(elem => new MessageAndMeta<Null, string> { TopicPartition = partitionSelector(elem), Message = new Message<Null, string> { Value = elem.ToString() } })
84+
.Select(elem => new ProducerRecord<TKey, string>(partitionSelector(elem), elem.ToString()))
7485
.RunWith(KafkaProducer.PlainSink(producerSettings), Materializer);
7586
}
7687

77-
protected async Task ProduceStrings(TopicPartition topicPartition, IEnumerable<int> range, ProducerSettings<Null, string> producerSettings)
88+
protected async Task ProduceStrings<TKey>(TopicPartition topicPartition, IEnumerable<int> range, ProducerSettings<TKey, string> producerSettings)
7889
{
7990
await Source
8091
.From(range)
81-
.Select(elem => new MessageAndMeta<Null, string> { TopicPartition = topicPartition, Message = new Message<Null, string> { Value = elem.ToString() } })
92+
.Select(elem => new ProducerRecord<TKey, string>(topicPartition, elem.ToString()))
8293
.RunWith(KafkaProducer.PlainSink(producerSettings), Materializer);
8394
}
8495

8596
/// <summary>
8697
/// Asserts that task will finish successfully until specified timeout.
8798
/// Throws task exception if task failes
8899
/// </summary>
89-
protected async Task AssertCompletesSuccessfullyWithin(TimeSpan timeout, Task task)
100+
protected void AssertTaskCompletesWithin(TimeSpan timeout, Task task, bool assertIsSuccessful = true)
90101
{
91-
var timeoutTask = Task.Delay(timeout);
92-
93-
await Task.WhenAny(timeoutTask, task);
102+
AwaitCondition(() => task.IsCompleted, timeout, $"task should complete within {timeout} timeout");
103+
104+
if (assertIsSuccessful)
105+
task.IsCompletedSuccessfully.Should().Be(true, "task should compete successfully");
106+
}
107+
108+
/// <summary>
109+
/// Asserts that task will finish successfully until specified timeout.
110+
/// Throws task exception if task failes
111+
/// </summary>
112+
protected TResult AssertTaskCompletesWithin<TResult>(TimeSpan timeout, Task<TResult> task, bool assertIsSuccessful = true)
113+
{
114+
AwaitCondition(() => task.IsCompleted, timeout, $"task should complete within {timeout} timeout");
115+
116+
if (assertIsSuccessful)
117+
task.IsCompletedSuccessfully.Should().Be(true, "task should compete successfully");
94118

95-
task.IsCompletedSuccessfully.Should().Be(true, $"Timeout {timeout} while waitilng task finish successfully");
119+
return task.Result;
96120
}
97121

98122
protected async Task GivenInitializedTopic(string topic)

0 commit comments

Comments
 (0)