Skip to content

Commit 66faaac

Browse files
authored
feat: add multi-file support and cce user accounts (#6)
1 parent 9d9a7fb commit 66faaac

File tree

11 files changed

+209
-8
lines changed

11 files changed

+209
-8
lines changed

docs/_coverpage.md

+2-1
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,5 @@
66
- Automated Topic & ACL Management
77
- Manage topics, services, and ACLs with desired state files
88

9-
[Documentation](/documentation.md)
9+
[Documentation](/documentation.md)
10+
[GitHub](https://github.com/devshawn/kafka-gitops)

src/main/java/com/devshawn/kafka/gitops/StateManager.java

+34-5
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import com.devshawn.kafka.gitops.service.ConfluentCloudService;
2020
import com.devshawn.kafka.gitops.service.KafkaService;
2121
import com.devshawn.kafka.gitops.service.ParserService;
22+
import com.devshawn.kafka.gitops.service.RoleService;
2223
import com.devshawn.kafka.gitops.util.LogUtil;
2324
import com.fasterxml.jackson.core.JsonParser;
2425
import com.fasterxml.jackson.databind.DeserializationFeature;
@@ -38,6 +39,7 @@ public class StateManager {
3839
private final ObjectMapper objectMapper;
3940
private final ParserService parserService;
4041
private final KafkaService kafkaService;
42+
private final RoleService roleService;
4143
private final ConfluentCloudService confluentCloudService;
4244

4345
private PlanManager planManager;
@@ -49,6 +51,7 @@ public StateManager(ManagerConfig managerConfig, ParserService parserService) {
4951
this.objectMapper = initializeObjectMapper();
5052
this.kafkaService = new KafkaService(KafkaGitopsConfigLoader.load());
5153
this.parserService = parserService;
54+
this.roleService = new RoleService();
5255
this.confluentCloudService = new ConfluentCloudService(objectMapper);
5356
this.planManager = new PlanManager(managerConfig, kafkaService, objectMapper);
5457
this.applyManager = new ApplyManager(managerConfig, kafkaService);
@@ -93,11 +96,12 @@ public void createServiceAccounts() {
9396
AtomicInteger count = new AtomicInteger();
9497
if (isConfluentCloudEnabled(desiredStateFile)) {
9598
desiredStateFile.getServices().forEach((name, service) -> {
96-
if (serviceAccounts.stream().noneMatch(it -> it.getName().equals(name))) {
97-
confluentCloudService.createServiceAccount(name);
98-
LogUtil.printSimpleSuccess(String.format("Successfully created service account: %s", name));
99-
count.getAndIncrement();
100-
}
99+
createServiceAccount(name, serviceAccounts, count);
100+
});
101+
102+
desiredStateFile.getUsers().forEach((name, user) -> {
103+
String serviceAccountName = String.format("user-%s", name);
104+
createServiceAccount(serviceAccountName, serviceAccounts, count);
101105
});
102106
} else {
103107
throw new ConfluentCloudException("Confluent Cloud must be enabled in the state file to use this command.");
@@ -108,6 +112,14 @@ public void createServiceAccounts() {
108112
}
109113
}
110114

115+
private void createServiceAccount(String name, List<ServiceAccount> serviceAccounts, AtomicInteger count) {
116+
if (serviceAccounts.stream().noneMatch(it -> it.getName().equals(name))) {
117+
confluentCloudService.createServiceAccount(name);
118+
LogUtil.printSimpleSuccess(String.format("Successfully created service account: %s", name));
119+
count.getAndIncrement();
120+
}
121+
}
122+
111123
private DesiredState getDesiredState() {
112124
DesiredStateFile desiredStateFile = parserService.parseStateFile();
113125
DesiredState.Builder desiredState = new DesiredState.Builder()
@@ -116,6 +128,7 @@ private DesiredState getDesiredState() {
116128

117129
if (isConfluentCloudEnabled(desiredStateFile)) {
118130
generateConfluentCloudServiceAcls(desiredState, desiredStateFile);
131+
generateConfluentCloudUserAcls(desiredState, desiredStateFile);
119132
} else {
120133
generateServiceAcls(desiredState, desiredStateFile);
121134
}
@@ -147,6 +160,22 @@ private void generateConfluentCloudServiceAcls(DesiredState.Builder desiredState
147160
});
148161
}
149162

163+
private void generateConfluentCloudUserAcls(DesiredState.Builder desiredState, DesiredStateFile desiredStateFile) {
164+
List<ServiceAccount> serviceAccounts = confluentCloudService.getServiceAccounts();
165+
desiredStateFile.getUsers().forEach((name, user) -> {
166+
AtomicReference<Integer> index = new AtomicReference<>(0);
167+
String serviceAccountName = String.format("user-%s", name);
168+
169+
Optional<ServiceAccount> serviceAccount = serviceAccounts.stream().filter(it -> it.getName().equals(serviceAccountName)).findFirst();
170+
String serviceAccountId = serviceAccount.orElseThrow(() -> new ServiceAccountNotFoundException(serviceAccountName)).getId();
171+
172+
user.getRoles().forEach(role -> {
173+
List<AclDetails.Builder> acls = roleService.getAcls(role, String.format("User:%s", serviceAccountId));
174+
acls.forEach(acl -> desiredState.putAcls(String.format("%s-%s", name, index.getAndSet(index.get() + 1)), acl.build()));
175+
});
176+
});
177+
}
178+
150179
private void generateServiceAcls(DesiredState.Builder desiredState, DesiredStateFile desiredStateFile) {
151180
desiredStateFile.getServices().forEach((name, service) -> {
152181
AtomicReference<Integer> index = new AtomicReference<>(0);

src/main/java/com/devshawn/kafka/gitops/domain/state/DesiredStateFile.java

+2
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ public interface DesiredStateFile {
1818

1919
Map<String, TopicDetails> getTopics();
2020

21+
Map<String, UserDetails> getUsers();
22+
2123
Map<String, Map<String, CustomAclDetails>> getCustomServiceAcls();
2224

2325
class Builder extends DesiredStateFile_Builder {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package com.devshawn.kafka.gitops.domain.state;
2+
3+
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
4+
import org.inferred.freebuilder.FreeBuilder;
5+
6+
import java.util.List;
7+
8+
@FreeBuilder
9+
@JsonDeserialize(builder = UserDetails.Builder.class)
10+
public interface UserDetails {
11+
12+
List<String> getRoles();
13+
14+
class Builder extends UserDetails_Builder {
15+
}
16+
}

src/main/java/com/devshawn/kafka/gitops/domain/state/settings/Settings.java

+2
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ public interface Settings {
1313

1414
Optional<SettingsTopics> getTopics();
1515

16+
Optional<SettingsFiles> getFiles();
17+
1618
class Builder extends Settings_Builder {
1719
}
1820
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package com.devshawn.kafka.gitops.domain.state.settings;
2+
3+
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
4+
import org.inferred.freebuilder.FreeBuilder;
5+
6+
import java.util.Optional;
7+
8+
@FreeBuilder
9+
@JsonDeserialize(builder = SettingsFiles.Builder.class)
10+
public interface SettingsFiles {
11+
12+
Optional<String> getServices();
13+
14+
Optional<String> getTopics();
15+
16+
Optional<String> getUsers();
17+
18+
class Builder extends SettingsFiles_Builder {
19+
}
20+
}

src/main/java/com/devshawn/kafka/gitops/service/ParserService.java

+53-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.devshawn.kafka.gitops.service;
22

33
import com.devshawn.kafka.gitops.domain.state.DesiredStateFile;
4+
import com.devshawn.kafka.gitops.domain.state.settings.SettingsFiles;
45
import com.devshawn.kafka.gitops.exception.ValidationException;
56
import com.fasterxml.jackson.core.JsonParser;
67
import com.fasterxml.jackson.databind.DeserializationFeature;
@@ -16,6 +17,7 @@
1617

1718
import java.io.File;
1819
import java.io.IOException;
20+
import java.nio.file.Paths;
1921
import java.util.List;
2022
import java.util.stream.Collectors;
2123

@@ -36,10 +38,32 @@ public ParserService(File file) {
3638
}
3739

3840
public DesiredStateFile parseStateFile() {
41+
DesiredStateFile desiredStateFile = parseStateFile(file);
42+
if (desiredStateFile.getSettings().isPresent() && desiredStateFile.getSettings().get().getFiles().isPresent()) {
43+
DesiredStateFile.Builder builder = new DesiredStateFile.Builder().mergeFrom(desiredStateFile);
44+
SettingsFiles settingsFiles = desiredStateFile.getSettings().get().getFiles().get();
45+
if (settingsFiles.getServices().isPresent()) {
46+
DesiredStateFile servicesFile = loadServiceFile(settingsFiles.getServices().get());
47+
builder.putAllServices(servicesFile.getServices());
48+
}
49+
if (settingsFiles.getTopics().isPresent()) {
50+
DesiredStateFile topicsFile = loadTopicsFile(settingsFiles.getTopics().get());
51+
builder.putAllTopics(topicsFile.getTopics());
52+
}
53+
if (settingsFiles.getUsers().isPresent()) {
54+
DesiredStateFile usersFile = loadUsersFile(settingsFiles.getUsers().get());
55+
builder.putAllUsers(usersFile.getUsers());
56+
}
57+
return builder.build();
58+
}
59+
return desiredStateFile;
60+
}
61+
62+
public DesiredStateFile parseStateFile(File stateFile) {
3963
log.info("Parsing desired state file...");
4064

4165
try {
42-
return objectMapper.readValue(file, DesiredStateFile.class);
66+
return objectMapper.readValue(stateFile, DesiredStateFile.class);
4367
} catch (ValueInstantiationException ex) {
4468
List<String> fields = getYamlFields(ex);
4569
String joinedFields = String.join(" -> ", fields);
@@ -64,6 +88,34 @@ public DesiredStateFile parseStateFile() {
6488
}
6589
}
6690

91+
private DesiredStateFile loadServiceFile(String servicesFileName) {
92+
File servicesFile = getAdditionalFile(servicesFileName);
93+
if (!servicesFile.exists()) {
94+
throw new ValidationException(String.format("Services file '%s' could not be found.", servicesFileName));
95+
}
96+
return parseStateFile(servicesFile);
97+
}
98+
99+
private DesiredStateFile loadTopicsFile(String topicsFileName) {
100+
File topicsFile = getAdditionalFile(topicsFileName);
101+
if (!topicsFile.exists()) {
102+
throw new ValidationException(String.format("Topics file '%s' could not be found.", topicsFileName));
103+
}
104+
return parseStateFile(topicsFile);
105+
}
106+
107+
private DesiredStateFile loadUsersFile(String usersFileName) {
108+
File usersFile = getAdditionalFile(usersFileName);
109+
if (!usersFile.exists()) {
110+
throw new ValidationException(String.format("Users file '%s' could not be found.", usersFileName));
111+
}
112+
return parseStateFile(usersFile);
113+
}
114+
115+
private File getAdditionalFile(String fileName) {
116+
return new File(Paths.get(file.getAbsoluteFile().getParent(), fileName).toString());
117+
}
118+
67119
private List<String> getYamlFields(JsonMappingException ex) {
68120
return ex.getPath().stream()
69121
.map(JsonMappingException.Reference::getFieldName)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
package com.devshawn.kafka.gitops.service;
2+
3+
import com.devshawn.kafka.gitops.domain.state.AclDetails;
4+
import com.devshawn.kafka.gitops.domain.state.ServiceDetails;
5+
import com.devshawn.kafka.gitops.exception.ValidationException;
6+
7+
import java.util.ArrayList;
8+
import java.util.List;
9+
import java.util.Optional;
10+
11+
public class RoleService extends ServiceDetails {
12+
13+
public List<AclDetails.Builder> getAcls(String role, String principal) {
14+
switch (role.toLowerCase()) {
15+
case "reader":
16+
return getReaderAcls(principal);
17+
case "writer":
18+
return getWriterAcls(principal);
19+
case "operator":
20+
return getOperatorAcls(principal);
21+
default:
22+
throw new ValidationException(String.format("Role '%s' does not exist. Supported roles: 'reader', 'writer', 'operator'.", role));
23+
}
24+
}
25+
26+
private List<AclDetails.Builder> getReaderAcls(String principal) {
27+
List<AclDetails.Builder> acls = new ArrayList<>();
28+
acls.add(generateReadAcl("*", Optional.of(principal)));
29+
acls.add(generateConsumerGroupAcl("*", Optional.of(principal), "READ"));
30+
return acls;
31+
}
32+
33+
private List<AclDetails.Builder> getWriterAcls(String principal) {
34+
List<AclDetails.Builder> acls = new ArrayList<>();
35+
acls.add(generateWriteACL("*", Optional.of(principal)));
36+
return acls;
37+
}
38+
39+
private List<AclDetails.Builder> getOperatorAcls(String principal) {
40+
List<AclDetails.Builder> acls = new ArrayList<>();
41+
acls.add(getClusterDescribeAcl(principal));
42+
acls.add(getWildcardTopicAcl(principal, "DESCRIBE"));
43+
acls.add(getWildcardTopicAcl(principal, "DESCRIBE_CONFIGS"));
44+
acls.add(generateConsumerGroupAcl("*", Optional.of(principal), "READ"));
45+
acls.add(generateConsumerGroupAcl("*", Optional.of(principal), "DESCRIBE"));
46+
return acls;
47+
}
48+
49+
private AclDetails.Builder getWildcardTopicAcl(String principal, String operation) {
50+
return new AclDetails.Builder()
51+
.setHost("*")
52+
.setType("TOPIC")
53+
.setPermission("ALLOW")
54+
.setPrincipal(principal)
55+
.setOperation(operation)
56+
.setPattern("LITERAL")
57+
.setName("*");
58+
}
59+
60+
private AclDetails.Builder getClusterDescribeAcl(String principal) {
61+
return new AclDetails.Builder()
62+
.setHost("*")
63+
.setType("CLUSTER")
64+
.setPermission("ALLOW")
65+
.setPrincipal(principal)
66+
.setOperation("DESCRIBE")
67+
.setPattern("LITERAL")
68+
.setName("kafka-cluster");
69+
}
70+
}

src/test/groovy/com/devshawn/kafka/gitops/PlanCommandIntegrationSpec.groovy

+2-1
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,8 @@ class PlanCommandIntegrationSpec extends Specification {
117117
where:
118118
planName << [
119119
"invalid-missing-principal",
120-
"invalid-topic"
120+
"invalid-topic",
121+
"unrecognized-property"
121122
]
122123
}
123124
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
Generating execution plan...
2+
3+
[INVALID] Unrecognized field: [test] in state file definition: topics -> test-topic
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
topics:
2+
test-topic:
3+
test: invalid
4+
partitions: 1
5+
replication: 1

0 commit comments

Comments
 (0)