-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathKafkaCollection.cs
99 lines (86 loc) · 3.7 KB
/
KafkaCollection.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
using DotNet.Testcontainers.Builders;
using DotNet.Testcontainers.Containers;
using DotNet.Testcontainers.Networks;
using static IntegrationTests.Helpers.DockerFileHelper;
namespace IntegrationTests;
[CollectionDefinition(Name)]
public class KafkaCollection : ICollectionFixture<KafkaFixture>
{
public const string Name = nameof(KafkaCollection);
}
/// <summary>
/// Container setup based on https://github.com/confluentinc/kafka-images/blob/83f57e511aead515822334ef28da6872d127c6a2/examples/kafka-single-node/docker-compose.yml
/// </summary>
public class KafkaFixture : IAsyncLifetime
{
private const int KafkaPort = 9092;
private const int ZookeeperClientPort = 2181;
private const string KafkaContainerName = "integration-test-kafka";
private const string TestNetworkName = $"{KafkaContainerName}-network";
private const string ZookeeperContainerName = $"{KafkaContainerName}-zookeeper";
private static readonly string ZooKeeperImage = ReadImageFrom("zookeeper.Dockerfile");
private static readonly string KafkaImage = ReadImageFrom("kafka.Dockerfile");
private IContainer? _kafkaContainer;
private IContainer? _zooKeeperContainer;
private INetwork? _containerNetwork;
public async Task InitializeAsync()
{
_containerNetwork = new NetworkBuilder()
.WithName(TestNetworkName)
.Build();
await _containerNetwork.CreateAsync();
_zooKeeperContainer = await LaunchZookeeper(_containerNetwork);
_kafkaContainer = await LaunchKafkaContainer(_containerNetwork, _zooKeeperContainer);
}
public async Task DisposeAsync()
{
if (_kafkaContainer != null)
{
await _kafkaContainer.DisposeAsync();
}
if (_zooKeeperContainer != null)
{
await _zooKeeperContainer.DisposeAsync();
}
if (_containerNetwork != null)
{
await _containerNetwork.DisposeAsync();
}
}
private static async Task<IContainer?> LaunchZookeeper(INetwork? containerNetwork)
{
var container = new ContainerBuilder()
.WithImage(ZooKeeperImage)
.WithName(ZookeeperContainerName)
.WithEnvironment("ZOOKEEPER_CLIENT_PORT", ZookeeperClientPort.ToString())
.WithEnvironment("ZOOKEEPER_TICK_TIME", "2000")
.WithNetwork(containerNetwork)
.Build();
await container.StartAsync();
return container;
}
private static async Task<IContainer?> LaunchKafkaContainer(
INetwork? containerNetwork,
IContainer? zooKeeperContainer)
{
// returned container name starts with '/'
var zookeeperContainerName = zooKeeperContainer?.Name.Substring(1);
var container = new ContainerBuilder()
.WithImage(KafkaImage)
.WithName(KafkaContainerName)
.WithPortBinding(KafkaPort)
.WithEnvironment("KAFKA_BROKER_ID", "1")
.WithEnvironment("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "false")
.WithEnvironment("KAFKA_ZOOKEEPER_CONNECT", $"{zookeeperContainerName}:{ZookeeperClientPort}")
.WithEnvironment("KAFKA_ADVERTISED_LISTENERS", $"PLAINTEXT://{KafkaContainerName}:29092,PLAINTEXT_HOST://localhost:{KafkaPort}")
.WithEnvironment("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT")
.WithEnvironment("KAFKA_INTER_BROKER_LISTENER_NAME", "PLAINTEXT")
.WithEnvironment("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1")
.WithNetwork(containerNetwork)
.Build();
await container.StartAsync();
return container;
}
}