Skip to content

Commit c743067

Browse files
authored
Audit backend (provectus#3831)
AuditService added to log all API operations
1 parent 7f7242e commit c743067

33 files changed

+1201
-297
lines changed
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
11
rules:
2-
- pattern: ".*"
2+
- pattern: ".*"

documentation/compose/kafka-ui-arm64.yaml

+2
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ services:
2020
KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME: first
2121
KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS: http://kafka-connect0:8083
2222
DYNAMIC_CONFIG_ENABLED: 'true' # not necessary, added for tests
23+
KAFKA_CLUSTERS_0_AUDIT_TOPICAUDITENABLED: 'true'
24+
KAFKA_CLUSTERS_0_AUDIT_CONSOLEAUDITENABLED: 'true'
2325

2426
kafka0:
2527
image: confluentinc/cp-kafka:7.2.1.arm64

kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java

+12
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ public static class Cluster {
5151
List<Masking> masking;
5252
Long pollingThrottleRate;
5353
TruststoreConfig ssl;
54+
AuditProperties audit;
5455
}
5556

5657
@Data
@@ -143,6 +144,17 @@ public enum Type {
143144
}
144145
}
145146

147+
@Data
148+
@NoArgsConstructor
149+
@AllArgsConstructor
150+
public static class AuditProperties {
151+
String topic;
152+
Integer auditTopicsPartitions;
153+
Boolean topicAuditEnabled;
154+
Boolean consoleAuditEnabled;
155+
Map<String, String> auditTopicProperties;
156+
}
157+
146158
@PostConstruct
147159
public void validateAndSetDefaults() {
148160
if (clusters != null) {

kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/AclsController.java

+12-1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import com.provectus.kafka.ui.model.rbac.AccessContext;
99
import com.provectus.kafka.ui.model.rbac.permission.AclAction;
1010
import com.provectus.kafka.ui.service.acl.AclsService;
11+
import com.provectus.kafka.ui.service.audit.AuditService;
1112
import com.provectus.kafka.ui.service.rbac.AccessControlService;
1213
import java.util.Optional;
1314
import lombok.RequiredArgsConstructor;
@@ -26,19 +27,22 @@ public class AclsController extends AbstractController implements AclsApi {
2627

2728
private final AclsService aclsService;
2829
private final AccessControlService accessControlService;
30+
private final AuditService auditService;
2931

3032
@Override
3133
public Mono<ResponseEntity<Void>> createAcl(String clusterName, Mono<KafkaAclDTO> kafkaAclDto,
3234
ServerWebExchange exchange) {
3335
AccessContext context = AccessContext.builder()
3436
.cluster(clusterName)
3537
.aclActions(AclAction.EDIT)
38+
.operationName("createAcl")
3639
.build();
3740

3841
return accessControlService.validateAccess(context)
3942
.then(kafkaAclDto)
4043
.map(ClusterMapper::toAclBinding)
4144
.flatMap(binding -> aclsService.createAcl(getCluster(clusterName), binding))
45+
.doOnEach(sig -> auditService.audit(context, sig))
4246
.thenReturn(ResponseEntity.ok().build());
4347
}
4448

@@ -48,12 +52,14 @@ public Mono<ResponseEntity<Void>> deleteAcl(String clusterName, Mono<KafkaAclDTO
4852
AccessContext context = AccessContext.builder()
4953
.cluster(clusterName)
5054
.aclActions(AclAction.EDIT)
55+
.operationName("deleteAcl")
5156
.build();
5257

5358
return accessControlService.validateAccess(context)
5459
.then(kafkaAclDto)
5560
.map(ClusterMapper::toAclBinding)
5661
.flatMap(binding -> aclsService.deleteAcl(getCluster(clusterName), binding))
62+
.doOnEach(sig -> auditService.audit(context, sig))
5763
.thenReturn(ResponseEntity.ok().build());
5864
}
5965

@@ -66,6 +72,7 @@ public Mono<ResponseEntity<Flux<KafkaAclDTO>>> listAcls(String clusterName,
6672
AccessContext context = AccessContext.builder()
6773
.cluster(clusterName)
6874
.aclActions(AclAction.VIEW)
75+
.operationName("listAcls")
6976
.build();
7077

7178
var resourceType = Optional.ofNullable(resourceTypeDto)
@@ -83,20 +90,22 @@ public Mono<ResponseEntity<Flux<KafkaAclDTO>>> listAcls(String clusterName,
8390
ResponseEntity.ok(
8491
aclsService.listAcls(getCluster(clusterName), filter)
8592
.map(ClusterMapper::toKafkaAclDto)))
86-
);
93+
).doOnEach(sig -> auditService.audit(context, sig));
8794
}
8895

8996
@Override
9097
public Mono<ResponseEntity<String>> getAclAsCsv(String clusterName, ServerWebExchange exchange) {
9198
AccessContext context = AccessContext.builder()
9299
.cluster(clusterName)
93100
.aclActions(AclAction.VIEW)
101+
.operationName("getAclAsCsv")
94102
.build();
95103

96104
return accessControlService.validateAccess(context).then(
97105
aclsService.getAclAsCsvString(getCluster(clusterName))
98106
.map(ResponseEntity::ok)
99107
.flatMap(Mono::just)
108+
.doOnEach(sig -> auditService.audit(context, sig))
100109
);
101110
}
102111

@@ -105,11 +114,13 @@ public Mono<ResponseEntity<Void>> syncAclsCsv(String clusterName, Mono<String> c
105114
AccessContext context = AccessContext.builder()
106115
.cluster(clusterName)
107116
.aclActions(AclAction.EDIT)
117+
.operationName("syncAclsCsv")
108118
.build();
109119

110120
return accessControlService.validateAccess(context)
111121
.then(csvMono)
112122
.flatMap(csv -> aclsService.syncAclWithAclCsv(getCluster(clusterName), csv))
123+
.doOnEach(sig -> auditService.audit(context, sig))
113124
.thenReturn(ResponseEntity.ok().build());
114125
}
115126
}

kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ApplicationConfigController.java

+32-24
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import com.provectus.kafka.ui.model.rbac.AccessContext;
1616
import com.provectus.kafka.ui.service.ApplicationInfoService;
1717
import com.provectus.kafka.ui.service.KafkaClusterFactory;
18+
import com.provectus.kafka.ui.service.audit.AuditService;
1819
import com.provectus.kafka.ui.service.rbac.AccessControlService;
1920
import com.provectus.kafka.ui.util.ApplicationRestarter;
2021
import com.provectus.kafka.ui.util.DynamicConfigOperations;
@@ -55,6 +56,7 @@ interface PropertiesMapper {
5556
private final ApplicationRestarter restarter;
5657
private final KafkaClusterFactory kafkaClusterFactory;
5758
private final ApplicationInfoService applicationInfoService;
59+
private final AuditService auditService;
5860

5961
@Override
6062
public Mono<ResponseEntity<ApplicationInfoDTO>> getApplicationInfo(ServerWebExchange exchange) {
@@ -63,62 +65,68 @@ public Mono<ResponseEntity<ApplicationInfoDTO>> getApplicationInfo(ServerWebExch
6365

6466
@Override
6567
public Mono<ResponseEntity<ApplicationConfigDTO>> getCurrentConfig(ServerWebExchange exchange) {
66-
return accessControlService
67-
.validateAccess(
68-
AccessContext.builder()
69-
.applicationConfigActions(VIEW)
70-
.build()
71-
)
68+
var context = AccessContext.builder()
69+
.applicationConfigActions(VIEW)
70+
.operationName("getCurrentConfig")
71+
.build();
72+
return accessControlService.validateAccess(context)
7273
.then(Mono.fromSupplier(() -> ResponseEntity.ok(
7374
new ApplicationConfigDTO()
7475
.properties(MAPPER.toDto(dynamicConfigOperations.getCurrentProperties()))
75-
)));
76+
)))
77+
.doOnEach(sig -> auditService.audit(context, sig));
7678
}
7779

7880
@Override
7981
public Mono<ResponseEntity<Void>> restartWithConfig(Mono<RestartRequestDTO> restartRequestDto,
8082
ServerWebExchange exchange) {
81-
return accessControlService
82-
.validateAccess(
83-
AccessContext.builder()
84-
.applicationConfigActions(EDIT)
85-
.build()
86-
)
83+
var context = AccessContext.builder()
84+
.applicationConfigActions(EDIT)
85+
.operationName("restartWithConfig")
86+
.build();
87+
return accessControlService.validateAccess(context)
8788
.then(restartRequestDto)
88-
.map(dto -> {
89+
.<ResponseEntity<Void>>map(dto -> {
8990
dynamicConfigOperations.persist(MAPPER.fromDto(dto.getConfig().getProperties()));
9091
restarter.requestRestart();
9192
return ResponseEntity.ok().build();
92-
});
93+
})
94+
.doOnEach(sig -> auditService.audit(context, sig));
9395
}
9496

9597
@Override
9698
public Mono<ResponseEntity<UploadedFileInfoDTO>> uploadConfigRelatedFile(Flux<Part> fileFlux,
9799
ServerWebExchange exchange) {
98-
return accessControlService
99-
.validateAccess(
100-
AccessContext.builder()
101-
.applicationConfigActions(EDIT)
102-
.build()
103-
)
100+
var context = AccessContext.builder()
101+
.applicationConfigActions(EDIT)
102+
.operationName("uploadConfigRelatedFile")
103+
.build();
104+
return accessControlService.validateAccess(context)
104105
.then(fileFlux.single())
105106
.flatMap(file ->
106107
dynamicConfigOperations.uploadConfigRelatedFile((FilePart) file)
107108
.map(path -> new UploadedFileInfoDTO().location(path.toString()))
108-
.map(ResponseEntity::ok));
109+
.map(ResponseEntity::ok))
110+
.doOnEach(sig -> auditService.audit(context, sig));
109111
}
110112

111113
@Override
112114
public Mono<ResponseEntity<ApplicationConfigValidationDTO>> validateConfig(Mono<ApplicationConfigDTO> configDto,
113115
ServerWebExchange exchange) {
114-
return configDto
116+
var context = AccessContext.builder()
117+
.applicationConfigActions(EDIT)
118+
.operationName("validateConfig")
119+
.build();
120+
return accessControlService.validateAccess(context)
121+
.then(configDto)
115122
.flatMap(config -> {
116123
PropertiesStructure propertiesStructure = MAPPER.fromDto(config.getProperties());
117124
ClustersProperties clustersProperties = propertiesStructure.getKafka();
118125
return validateClustersConfig(clustersProperties)
119126
.map(validations -> new ApplicationConfigValidationDTO().clusters(validations));
120127
})
121-
.map(ResponseEntity::ok);
128+
.map(ResponseEntity::ok)
129+
.doOnEach(sig -> auditService.audit(context, sig));
122130
}
123131

124132
private Mono<Map<String, ClusterConfigValidationDTO>> validateClustersConfig(

kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/BrokersController.java

+53-29
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,11 @@
1111
import com.provectus.kafka.ui.model.rbac.AccessContext;
1212
import com.provectus.kafka.ui.model.rbac.permission.ClusterConfigAction;
1313
import com.provectus.kafka.ui.service.BrokerService;
14+
import com.provectus.kafka.ui.service.audit.AuditService;
1415
import com.provectus.kafka.ui.service.rbac.AccessControlService;
1516
import java.util.List;
17+
import java.util.Map;
18+
import javax.annotation.Nullable;
1619
import lombok.RequiredArgsConstructor;
1720
import lombok.extern.slf4j.Slf4j;
1821
import org.springframework.http.ResponseEntity;
@@ -27,78 +30,97 @@
2730
public class BrokersController extends AbstractController implements BrokersApi {
2831
private final BrokerService brokerService;
2932
private final ClusterMapper clusterMapper;
33+
34+
private final AuditService auditService;
3035
private final AccessControlService accessControlService;
3136

3237
@Override
3338
public Mono<ResponseEntity<Flux<BrokerDTO>>> getBrokers(String clusterName,
3439
ServerWebExchange exchange) {
35-
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
40+
var context = AccessContext.builder()
3641
.cluster(clusterName)
37-
.build());
42+
.operationName("getBrokers")
43+
.build();
3844

3945
var job = brokerService.getBrokers(getCluster(clusterName)).map(clusterMapper::toBrokerDto);
40-
41-
return validateAccess.thenReturn(ResponseEntity.ok(job));
46+
return accessControlService.validateAccess(context)
47+
.thenReturn(ResponseEntity.ok(job))
48+
.doOnEach(sig -> auditService.audit(context, sig));
4249
}
4350

4451
@Override
4552
public Mono<ResponseEntity<BrokerMetricsDTO>> getBrokersMetrics(String clusterName, Integer id,
4653
ServerWebExchange exchange) {
47-
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
54+
var context = AccessContext.builder()
4855
.cluster(clusterName)
49-
.build());
56+
.operationName("getBrokersMetrics")
57+
.operationParams(Map.of("id", id))
58+
.build();
5059

51-
return validateAccess.then(
52-
brokerService.getBrokerMetrics(getCluster(clusterName), id)
53-
.map(clusterMapper::toBrokerMetrics)
54-
.map(ResponseEntity::ok)
55-
.onErrorReturn(ResponseEntity.notFound().build())
56-
);
60+
return accessControlService.validateAccess(context)
61+
.then(
62+
brokerService.getBrokerMetrics(getCluster(clusterName), id)
63+
.map(clusterMapper::toBrokerMetrics)
64+
.map(ResponseEntity::ok)
65+
.onErrorReturn(ResponseEntity.notFound().build())
66+
)
67+
.doOnEach(sig -> auditService.audit(context, sig));
5768
}
5869

5970
@Override
6071
public Mono<ResponseEntity<Flux<BrokersLogdirsDTO>>> getAllBrokersLogdirs(String clusterName,
61-
List<Integer> brokers,
72+
@Nullable List<Integer> brokers,
6273
ServerWebExchange exchange) {
63-
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
74+
75+
List<Integer> brokerIds = brokers == null ? List.of() : brokers;
76+
77+
var context = AccessContext.builder()
6478
.cluster(clusterName)
65-
.build());
79+
.operationName("getAllBrokersLogdirs")
80+
.operationParams(Map.of("brokerIds", brokerIds))
81+
.build();
6682

67-
return validateAccess.thenReturn(ResponseEntity.ok(
68-
brokerService.getAllBrokersLogdirs(getCluster(clusterName), brokers)));
83+
return accessControlService.validateAccess(context)
84+
.thenReturn(ResponseEntity.ok(
85+
brokerService.getAllBrokersLogdirs(getCluster(clusterName), brokerIds)))
86+
.doOnEach(sig -> auditService.audit(context, sig));
6987
}
7088

7189
@Override
7290
public Mono<ResponseEntity<Flux<BrokerConfigDTO>>> getBrokerConfig(String clusterName,
7391
Integer id,
7492
ServerWebExchange exchange) {
75-
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
93+
var context = AccessContext.builder()
7694
.cluster(clusterName)
7795
.clusterConfigActions(ClusterConfigAction.VIEW)
78-
.build());
96+
.operationName("getBrokerConfig")
97+
.operationParams(Map.of("brokerId", id))
98+
.build();
7999

80-
return validateAccess.thenReturn(
100+
return accessControlService.validateAccess(context).thenReturn(
81101
ResponseEntity.ok(
82102
brokerService.getBrokerConfig(getCluster(clusterName), id)
83103
.map(clusterMapper::toBrokerConfig))
84-
);
104+
).doOnEach(sig -> auditService.audit(context, sig));
85105
}
86106

87107
@Override
88108
public Mono<ResponseEntity<Void>> updateBrokerTopicPartitionLogDir(String clusterName,
89109
Integer id,
90110
Mono<BrokerLogdirUpdateDTO> brokerLogdir,
91111
ServerWebExchange exchange) {
92-
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
112+
var context = AccessContext.builder()
93113
.cluster(clusterName)
94114
.clusterConfigActions(ClusterConfigAction.VIEW, ClusterConfigAction.EDIT)
95-
.build());
115+
.operationName("updateBrokerTopicPartitionLogDir")
116+
.operationParams(Map.of("brokerId", id))
117+
.build();
96118

97-
return validateAccess.then(
119+
return accessControlService.validateAccess(context).then(
98120
brokerLogdir
99121
.flatMap(bld -> brokerService.updateBrokerLogDir(getCluster(clusterName), id, bld))
100122
.map(ResponseEntity::ok)
101-
);
123+
).doOnEach(sig -> auditService.audit(context, sig));
102124
}
103125

104126
@Override
@@ -107,16 +129,18 @@ public Mono<ResponseEntity<Void>> updateBrokerConfigByName(String clusterName,
107129
String name,
108130
Mono<BrokerConfigItemDTO> brokerConfig,
109131
ServerWebExchange exchange) {
110-
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
132+
var context = AccessContext.builder()
111133
.cluster(clusterName)
112134
.clusterConfigActions(ClusterConfigAction.VIEW, ClusterConfigAction.EDIT)
113-
.build());
135+
.operationName("updateBrokerConfigByName")
136+
.operationParams(Map.of("brokerId", id))
137+
.build();
114138

115-
return validateAccess.then(
139+
return accessControlService.validateAccess(context).then(
116140
brokerConfig
117141
.flatMap(bci -> brokerService.updateBrokerConfigByName(
118142
getCluster(clusterName), id, name, bci.getValue()))
119143
.map(ResponseEntity::ok)
120-
);
144+
).doOnEach(sig -> auditService.audit(context, sig));
121145
}
122146
}

0 commit comments

Comments
 (0)