forked from akkadotnet/Akka.Streams.Kafka
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathCommittableSourceIntegrationTests.cs
128 lines (103 loc) · 4.98 KB
/
CommittableSourceIntegrationTests.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Akka.Configuration;
using Akka.Streams.Dsl;
using Akka.Streams.Kafka.Dsl;
using Akka.Streams.Kafka.Messages;
using Akka.Streams.Kafka.Settings;
using Akka.Streams.TestKit;
using Confluent.Kafka;
using Xunit;
using Xunit.Abstractions;
using Config = Akka.Configuration.Config;
namespace Akka.Streams.Kafka.Tests.Integration
{
public class CommittableSourceIntegrationTests : KafkaIntegrationTests
{
public CommittableSourceIntegrationTests(ITestOutputHelper output, KafkaFixture fixture)
: base(nameof(CommittableSourceIntegrationTests), output, fixture)
{
}
[Fact]
public async Task CommitableSource_consumes_messages_from_Producer_without_commits()
{
int elementsCount = 100;
var topic1 = CreateTopic(1);
var group1 = CreateGroup(1);
var topicPartition1 = new TopicPartition(topic1, 0);
await GivenInitializedTopic(topicPartition1);
await Source
.From(Enumerable.Range(1, elementsCount))
.Select(elem => new ProducerRecord<Null, string>(topicPartition1, elem.ToString()))
.RunWith(KafkaProducer.PlainSink(ProducerSettings), Materializer);
var consumerSettings = CreateConsumerSettings<string>(group1);
var probe = KafkaConsumer
.CommittableSource(consumerSettings, Subscriptions.Assignment(topicPartition1))
.Where(c => !c.Record.Value.Equals(InitialMsg))
.Select(c => c.Record.Value)
.RunWith(this.SinkProbe<string>(), Materializer);
probe.Request(elementsCount);
foreach (var i in Enumerable.Range(1, elementsCount).Select(c => c.ToString()))
probe.ExpectNext(i, TimeSpan.FromSeconds(10));
probe.Cancel();
}
[Fact]
public async Task CommitableSource_resume_from_commited_offset()
{
var topic1 = CreateTopic(1);
var topicPartition1 = new TopicPartition(topic1, 0);
var group1 = CreateGroup(1);
var group2 = CreateGroup(2);
await GivenInitializedTopic(topicPartition1);
await Source
.From(Enumerable.Range(1, 100))
.Select(elem => new ProducerRecord<Null, string>(topicPartition1, elem.ToString()))
.RunWith(KafkaProducer.PlainSink(ProducerSettings), Materializer);
var consumerSettings = CreateConsumerSettings<string>(group1);
var committedElements = new ConcurrentQueue<string>();
var (task, probe1) = KafkaConsumer.CommittableSource(consumerSettings, Subscriptions.Assignment(topicPartition1))
.WhereNot(c => c.Record.Value == InitialMsg)
.SelectAsync(10, async elem =>
{
await elem.CommitableOffset.Commit();
committedElements.Enqueue(elem.Record.Value);
return Done.Instance;
})
.ToMaterialized(this.SinkProbe<Done>(), Keep.Both)
.Run(Materializer);
probe1.Request(25);
foreach (var _ in Enumerable.Range(1, 25))
{
probe1.ExpectNext(Done.Instance, TimeSpan.FromSeconds(10));
}
probe1.Cancel();
AwaitCondition(() => task.IsShutdown.IsCompletedSuccessfully);
var probe2 = KafkaConsumer.CommittableSource(consumerSettings, Subscriptions.Assignment(new TopicPartition(topic1, 0)))
.Select(_ => _.Record.Value)
.RunWith(this.SinkProbe<string>(), Materializer);
// Note that due to buffers and SelectAsync(10) the committed offset is more
// than 26, and that is not wrong
// some concurrent publish
await Source
.From(Enumerable.Range(101, 100))
.Select(elem => new ProducerRecord<Null, string>(topicPartition1, elem.ToString()))
.RunWith(KafkaProducer.PlainSink(ProducerSettings), Materializer);
probe2.Request(100);
foreach (var i in Enumerable.Range(committedElements.Count + 1, 100).Select(c => c.ToString()))
probe2.ExpectNext(i, TimeSpan.FromSeconds(10));
probe2.Cancel();
// another consumer should see all
var probe3 = KafkaConsumer.CommittableSource(consumerSettings.WithGroupId(group2), Subscriptions.Assignment(new TopicPartition(topic1, 0)))
.WhereNot(c => c.Record.Value == InitialMsg)
.Select(_ => _.Record.Value)
.RunWith(this.SinkProbe<string>(), Materializer);
probe3.Request(100);
foreach (var i in Enumerable.Range(1, 100).Select(c => c.ToString()))
probe3.ExpectNext(i, TimeSpan.FromSeconds(10));
probe3.Cancel();
}
}
}