Skip to content

Commit

Permalink
[Improvement][Registry] Add ZK authorization yaml control & add diges…
Browse files Browse the repository at this point in the history
…t auth UT

Update dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryProperties.java

Co-authored-by: Wenjun Ruan <[email protected]>
  • Loading branch information
pegasas and ruanwenjun committed Aug 20, 2024
1 parent 66548a2 commit faaac71
Show file tree
Hide file tree
Showing 10 changed files with 155 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,9 @@ registry:
session-timeout: 60s
connection-timeout: 15s
block-until-connected: 15s
digest: ~
authorization:
digest: ~


metrics:
enabled: true
Expand Down
3 changes: 2 additions & 1 deletion dolphinscheduler-api/src/main/resources/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,8 @@ registry:
session-timeout: 60s
connection-timeout: 15s
block-until-connected: 15s
digest: ~
authorization:
digest: ~

api:
audit-enable: false
Expand Down
3 changes: 2 additions & 1 deletion dolphinscheduler-master/src/main/resources/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ registry:
session-timeout: 60s
connection-timeout: 15s
block-until-connected: 15s
digest: ~
authorization:
digest: ~

master:
listen-port: 5678
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ registry:
connection-timeout: 9s
block-until-connected: 600ms
# The following options are set according to personal needs
digest: ~
authorization:
digest: ~
```
After do this config, you can start your DolphinScheduler cluster, your cluster will use zookeeper as registry center to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,6 @@
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;

import com.google.common.base.Strings;

@Slf4j
final class ZookeeperRegistry implements Registry {

Expand All @@ -80,9 +78,10 @@ final class ZookeeperRegistry implements Registry {
.sessionTimeoutMs(DurationUtils.toMillisInt(properties.getSessionTimeout()))
.connectionTimeoutMs(DurationUtils.toMillisInt(properties.getConnectionTimeout()));

final String digest = properties.getDigest();
if (!Strings.isNullOrEmpty(digest)) {
builder.authorization("digest", digest.getBytes(StandardCharsets.UTF_8))
if (properties.getAuthorization().size() > 0) {
final String schema = properties.getAuthorization().keySet().stream().findFirst().get();
final String schemaValue = properties.getAuthorization().get(schema);
builder.authorization(schema.toLowerCase(), schemaValue.getBytes(StandardCharsets.UTF_8))
.aclProvider(new ACLProvider() {

@Override
Expand All @@ -96,6 +95,7 @@ public List<ACL> getAclForPath(final String path) {
}
});
}

client = builder.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import org.apache.commons.lang3.StringUtils;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;

import lombok.AllArgsConstructor;
import lombok.Data;
Expand Down Expand Up @@ -79,6 +81,9 @@ public void validate(Object target, Errors errors) {
|| zookeeper.getBlockUntilConnected().isNegative()) {
errors.rejectValue("zookeeper.blockUntilConnected", "", "zookeeper.blockUntilConnected should be positive");
}
if (zookeeper.getAuthorization() != null && zookeeper.getAuthorization().size() != 1) {
errors.rejectValue("zookeeper.authorization", "", "zookeeper.authorization should be unique");
}
printConfig();
}

Expand All @@ -88,10 +93,11 @@ private void printConfig() {
"\n namespace -> " + zookeeper.getNamespace() +
"\n connectString -> " + zookeeper.getConnectString() +
"\n retryPolicy -> " + zookeeper.getRetryPolicy() +
"\n digest -> " + zookeeper.getDigest() +
"\n authorization -> " + zookeeper.getAuthorization() +
"\n sessionTimeout -> " + zookeeper.getSessionTimeout() +
"\n connectionTimeout -> " + zookeeper.getConnectionTimeout() +
"\n blockUntilConnected -> " + zookeeper.getBlockUntilConnected() +
"\n authorization -> " + zookeeper.getAuthorization() +
"\n****************************ZookeeperRegistryProperties**************************************";
log.info(config);
}
Expand All @@ -102,7 +108,7 @@ public static final class ZookeeperProperties {
private String namespace = "dolphinscheduler";
private String connectString;
private RetryPolicy retryPolicy = new RetryPolicy();
private String digest;
private Map<String, String> authorization = new HashMap<>();
private Duration sessionTimeout = Duration.ofSeconds(60);
private Duration connectionTimeout = Duration.ofSeconds(15);
private Duration blockUntilConnected = Duration.ofSeconds(15);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.plugin.registry.zookeeper;

import org.apache.dolphinscheduler.plugin.registry.RegistryTestCase;

import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.client.ZKClientConfig;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.server.DumbWatcher;
import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;

import java.util.Collections;
import java.util.stream.Stream;

import lombok.SneakyThrows;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.lifecycle.Startables;
import org.testcontainers.utility.DockerImageName;

@ActiveProfiles("digest")
@SpringBootTest(classes = ZookeeperRegistryProperties.class)
@SpringBootApplication(scanBasePackageClasses = ZookeeperRegistryProperties.class)
public class ZookeeperRegistryDigestTestCase extends RegistryTestCase<ZookeeperRegistry> {

@Autowired
private ZookeeperRegistryProperties zookeeperRegistryProperties;

private static GenericContainer<?> zookeeperContainer;

private static final Network NETWORK = Network.newNetwork();

private static ZooKeeper zk;

private static final String ROOT_USER = "root";

private static final String ROOT_PASSWORD = "root_passwd";

private static final String ID_PASSWORD = String.format("%s:%s", ROOT_USER, ROOT_PASSWORD);

private static void setupRootACLForDigest(final ZooKeeper zk) throws Exception {
final String digest = DigestAuthenticationProvider.generateDigest(ID_PASSWORD);
final ACL acl = new ACL(ZooDefs.Perms.ALL, new Id("digest", digest));
zk.setACL("/", Collections.singletonList(acl), -1);
}

@SneakyThrows
@BeforeAll
public static void setUpTestingServer() {
zookeeperContainer = new GenericContainer<>(DockerImageName.parse("zookeeper:3.8"))
.withNetwork(NETWORK)
.withExposedPorts(2181);
Startables.deepStart(Stream.of(zookeeperContainer)).join();
System.clearProperty("registry.zookeeper.connect-string");
System.setProperty("registry.zookeeper.connect-string", "localhost:" + zookeeperContainer.getMappedPort(2181));
zk = new ZooKeeper("localhost:" + zookeeperContainer.getMappedPort(2181),
30000, new DumbWatcher(), new ZKClientConfig());
setupRootACLForDigest(zk);
}

@SneakyThrows
@Override
public ZookeeperRegistry createRegistry() {
return new ZookeeperRegistry(zookeeperRegistryProperties);
}

@SneakyThrows
@AfterAll
public static void tearDownTestingServer() {
zk.close();
zookeeperContainer.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

registry:
type: zookeeper
zookeeper:
namespace: dolphinscheduler
connect-string: 127.0.0.1:2181
retry-policy:
base-sleep-time: 60ms
max-sleep: 300ms
max-retries: 5
session-timeout: 30s
connection-timeout: 9s
block-until-connected: 3s
authorization:
digest: root:root_passwd
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,5 @@ registry:
session-timeout: 30s
connection-timeout: 9s
block-until-connected: 3s
digest: ~
authorization:
digest: ~
3 changes: 2 additions & 1 deletion dolphinscheduler-worker/src/main/resources/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ registry:
session-timeout: 60s
connection-timeout: 15s
block-until-connected: 15s
digest: ~
authorization:
digest: ~

worker:
# worker listener port
Expand Down

0 comments on commit faaac71

Please sign in to comment.