From cf3dd43e02f5857dde39fb92b4c1de45152823a4 Mon Sep 17 00:00:00 2001 From: Lokesh Khurana Date: Wed, 22 Jan 2025 14:01:21 -0800 Subject: [PATCH] PHOENIX-7502 :- Decouple principal from HAGroupInfo (#2053) Co-authored-by: lokesh-khurana --- .../jdbc/FailoverPhoenixConnection.java | 50 ++-- .../phoenix/jdbc/FailoverPhoenixContext.java | 49 ++++ .../org/apache/phoenix/jdbc/HAURLInfo.java | 104 +++++++++ .../phoenix/jdbc/HighAvailabilityGroup.java | 187 +++++++++++---- .../phoenix/jdbc/HighAvailabilityPolicy.java | 68 +++--- .../jdbc/ParallelPhoenixConnection.java | 18 +- .../phoenix/jdbc/ParallelPhoenixContext.java | 10 +- .../phoenix/jdbc/PhoenixEmbeddedDriver.java | 8 +- .../jdbc/FailoverPhoenixConnectionIT.java | 145 ++++++++++++ .../phoenix/jdbc/HighAvailabilityGroupIT.java | 215 ++++++++++++++++-- .../jdbc/HighAvailabilityGroupTestIT.java | 27 ++- .../jdbc/HighAvailabilityTestingUtility.java | 22 ++ .../jdbc/ParallelPhoenixConnectionIT.java | 13 +- .../jdbc/FailoverPhoenixConnectionTest.java | 22 +- .../ParallelPhoenixConnectionFailureTest.java | 2 +- .../jdbc/ParallelPhoenixConnectionTest.java | 14 +- .../jdbc/ParallelPhoenixContextTest.java | 13 +- ...llelPhoenixNullComparingResultSetTest.java | 3 +- .../ParallelPhoenixPreparedStatementTest.java | 3 +- .../jdbc/ParallelPhoenixResultSetTest.java | 15 +- .../phoenix/jdbc/ParallelPhoenixUtilTest.java | 5 +- 21 files changed, 833 insertions(+), 160 deletions(-) create mode 100644 phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/FailoverPhoenixContext.java create mode 100644 phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAURLInfo.java diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/FailoverPhoenixConnection.java b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/FailoverPhoenixConnection.java index 2ade5ef4de7..0ad691958fc 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/FailoverPhoenixConnection.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/FailoverPhoenixConnection.java @@ -72,14 +72,11 @@ public class FailoverPhoenixConnection implements PhoenixMonitoredConnection { public static final String FAILOVER_TIMEOUT_MS_ATTR = "phoenix.ha.failover.timeout.ms"; public static final long FAILOVER_TIMEOUT_MS_DEFAULT = 10_000; private static final Logger LOG = LoggerFactory.getLogger(FailoverPhoenixConnection.class); + /** - * Connection properties. - */ - private final Properties properties; - /** - * High availability group. + * Context for FailoverPhoenixConnection */ - private final HighAvailabilityGroup haGroup; + private final FailoverPhoenixContext context; /** * Failover policy, per connection. */ @@ -103,13 +100,13 @@ public class FailoverPhoenixConnection implements PhoenixMonitoredConnection { */ private Map> previousReadMetrics = new HashMap<>(); - public FailoverPhoenixConnection(HighAvailabilityGroup haGroup, Properties properties) + public FailoverPhoenixConnection(FailoverPhoenixContext context) throws SQLException { - this.properties = properties; - this.haGroup = haGroup; - this.policy = FailoverPolicy.get(properties); + this.context = context; + this.policy = FailoverPolicy.get(context.getProperties()); this.isClosed = false; - this.connection = haGroup.connectActive(properties); + this.connection = context.getHAGroup().connectActive(context.getProperties(), + context.getHAURLInfo()); } /** @@ -171,9 +168,9 @@ private static Map> mergeMetricMaps( void failover(long timeoutMs) throws SQLException { checkConnection(); - if (haGroup.isActive(connection)) { + if (context.getHAGroup().isActive(connection)) { LOG.info("Connection {} is against ACTIVE cluster in HA group {}; skip failing over.", - connection.getURL(), haGroup.getGroupInfo().getName()); + connection.getURL(), context.getHAGroup().getGroupInfo().getName()); return; } @@ -183,7 +180,8 @@ void failover(long timeoutMs) throws SQLException { while (newConn == null && EnvironmentEdgeManager.currentTimeMillis() < startTime + timeoutMs) { try { - newConn = haGroup.connectActive(properties); + newConn = context.getHAGroup().connectActive(context.getProperties(), + context.getHAURLInfo()); } catch (SQLException e) { cause = e; LOG.info("Got exception when trying to connect to active cluster.", e); @@ -197,7 +195,7 @@ void failover(long timeoutMs) throws SQLException { } if (newConn == null) { throw new FailoverSQLException("Can not failover connection", - haGroup.getGroupInfo().toString(), cause); + context.getHAGroup().getGroupInfo().toString(), cause); } final PhoenixConnection oldConn = connection; @@ -217,7 +215,7 @@ void failover(long timeoutMs) throws SQLException { oldConn.close(new SQLExceptionInfo .Builder(SQLExceptionCode.HA_CLOSED_AFTER_FAILOVER) .setMessage("Phoenix connection got closed due to failover") - .setHaGroupInfo(haGroup.getGroupInfo().toString()) + .setHaGroupInfo(context.getHAGroup().getGroupInfo().toString()) .build() .buildException()); } catch (SQLException e) { @@ -226,7 +224,8 @@ void failover(long timeoutMs) throws SQLException { } } } - LOG.info("Connection {} failed over to {}", haGroup.getGroupInfo(), connection.getURL()); + LOG.info("Connection {} failed over to {}", context.getHAGroup().getGroupInfo(), + connection.getURL()); } /** @@ -241,7 +240,7 @@ void failover(long timeoutMs) throws SQLException { private void checkConnection() throws SQLException { if (isClosed) { throw new SQLExceptionInfo.Builder(SQLExceptionCode.CONNECTION_CLOSED) - .setHaGroupInfo(haGroup.getGroupInfo().toString()) + .setHaGroupInfo(context.getHAGroup().getGroupInfo().toString()) .build() .buildException(); } @@ -249,7 +248,7 @@ private void checkConnection() throws SQLException { throw new SQLExceptionInfo .Builder(SQLExceptionCode.CANNOT_ESTABLISH_CONNECTION) .setMessage("Connection has not been established to ACTIVE HBase cluster") - .setHaGroupInfo(haGroup.getGroupInfo().toString()) + .setHaGroupInfo(context.getHAGroup().getGroupInfo().toString()) .build() .buildException(); } @@ -327,8 +326,9 @@ public void clearMetrics() { @VisibleForTesting T wrapActionDuringFailover(SupplierWithSQLException s) throws SQLException { checkConnection(); - final long timeoutMs = Long.parseLong(properties.getProperty(FAILOVER_TIMEOUT_MS_ATTR, - String.valueOf(FAILOVER_TIMEOUT_MS_DEFAULT))); + final long timeoutMs = Long.parseLong(context.getProperties(). + getProperty(FAILOVER_TIMEOUT_MS_ATTR, + String.valueOf(FAILOVER_TIMEOUT_MS_DEFAULT))); int failoverCount = 0; while (true) { try { @@ -642,4 +642,12 @@ interface SupplierWithSQLException { interface RunWithSQLException { void run() throws SQLException; } + + /** + * @return the context of a given FailoverPhoenixConnection + */ + @VisibleForTesting + public FailoverPhoenixContext getContext() { + return context; + } } diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/FailoverPhoenixContext.java b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/FailoverPhoenixContext.java new file mode 100644 index 00000000000..873712cdf31 --- /dev/null +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/FailoverPhoenixContext.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.jdbc; + +import java.util.Properties; + +/** + * FailoverPhoenixContext holds the properties and HAGroup Info for a failover phoenix connection. + */ +public class FailoverPhoenixContext { + + private final Properties properties; + private final HighAvailabilityGroup haGroup; + private final HAURLInfo haurlInfo; + + FailoverPhoenixContext(Properties properties, HighAvailabilityGroup haGroup, + HAURLInfo haurlInfo) { + this.properties = properties; + this.haGroup = haGroup; + this.haurlInfo = haurlInfo; + } + + public Properties getProperties() { + return properties; + } + + public HighAvailabilityGroup getHAGroup() { + return haGroup; + } + + public HAURLInfo getHAURLInfo() { + return haurlInfo; + } +} diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAURLInfo.java b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAURLInfo.java new file mode 100644 index 00000000000..5c9c4f9b394 --- /dev/null +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAURLInfo.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.jdbc; + + +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions; + +/** + * An HAURLInfo contains information of an HA Url with respect of HA Group Name. + *

+ * It is constructed based on client input, including the JDBC connection string and properties. + * Objects of this class are used to get appropriate principal and additional JDBC parameters. + *

+ * This class is immutable. + */ + +@VisibleForTesting +public class HAURLInfo { + private final String name; + private final String principal; + private final String additionalJDBCParams; + + HAURLInfo(String name, String principal, String additionalJDBCParams) { + Preconditions.checkNotNull(name); + this.name = name; + this.principal = principal; + this.additionalJDBCParams = additionalJDBCParams; + } + + HAURLInfo(String name, String principal) { + this(name, principal, null); + } + + HAURLInfo(String name) { + this(name, null, null); + } + + public String getName() { + return name; + } + + public String getPrincipal() { + return principal; + } + + public String getAdditionalJDBCParams() { + return additionalJDBCParams; + } + + @Override + public String toString() { + if (principal != null) { + return String.format("%s[%s]", name, principal); + } + return name; + } + + @Override + public boolean equals(Object other) { + if (other == null) { + return false; + } + if (other == this) { + return true; + } + if (other.getClass() != getClass()) { + return false; + } + HAURLInfo otherInfo = (HAURLInfo) other; + return new EqualsBuilder() + .append(name, otherInfo.name) + .append(principal, otherInfo.principal) + .isEquals(); + } + + @Override + public int hashCode() { + if (principal != null) { + return new HashCodeBuilder(7, 47) + .append(name) + .append(principal).hashCode(); + } + return new HashCodeBuilder(7, 47).append(name).hashCode(); + } + +} \ No newline at end of file diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityGroup.java b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityGroup.java index fb2137d1066..c5a1c67be92 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityGroup.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityGroup.java @@ -49,10 +49,14 @@ import java.sql.Driver; import java.sql.DriverManager; import java.sql.SQLException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Properties; +import java.util.Set; import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; @@ -124,8 +128,23 @@ public class HighAvailabilityGroup { public static final long PHOENIX_HA_TRANSITION_TIMEOUT_MS_DEFAULT = 5 * 60 * 1000; // 5 mins static final Logger LOG = LoggerFactory.getLogger(HighAvailabilityGroup.class); + + /** + * Two maps to store client provided info mapping to HighAvailabilityGroup. + * GROUPS which store HAGroupInfo (name and url of clusters where CRR resides) + * to HighAvailabilityGroup mapping, which is the information required to get roleRecord + * and URLS which store HAGroupInfo to HAURLInfo (name, principal) 1:n mapping + * which represents a given group of clients trying to connect to a HighAvailabilityGroup, + * this info is required to fetch the CQSI(s) linked to given HighAvailabilityGroup in case + * of failover or a change where CQSIs needs to be closed and invalidated + * + * HAURLInfo is stored in {@link ParallelPhoenixContext} and {@link FailoverPhoenixContext} + * for the current given connection + * + */ @VisibleForTesting static final Map GROUPS = new ConcurrentHashMap<>(); + static final Map> URLS = new ConcurrentHashMap<>(); @VisibleForTesting static final Cache MISSING_CRR_GROUPS_CACHE = CacheBuilder.newBuilder() .expireAfterWrite(PHOENIX_HA_TRANSITION_TIMEOUT_MS_DEFAULT, TimeUnit.MILLISECONDS) @@ -195,28 +214,81 @@ private HighAvailabilityGroup(HAGroupInfo info, Properties properties) { this.state = state; } - public static HAGroupInfo getHAGroupInfo(String url, Properties properties) - throws SQLException { - if (url.startsWith(PhoenixRuntime.JDBC_PROTOCOL)) { - url = url.substring(PhoenixRuntime.JDBC_PROTOCOL.length() + 1); - } - if (!(url.contains("[") && url.contains("|") && url.contains("]"))) { - throw new SQLExceptionInfo.Builder(SQLExceptionCode.MALFORMED_CONNECTION_URL) - .setMessage(String.format("URL %s is not a valid HA connection string", url)) - .build() - .buildException(); - } + /** + * Get an instance of HAURLInfo given the HA connecting URL (with "|") and client properties. + * Here we do parsing of url and try to extract principal and other additional params + * @throws SQLException + */ + public static HAURLInfo getUrlInfo(String url, Properties properties) throws SQLException { + url = checkUrl(url); + String principal = null; String additionalJDBCParams = null; int idx = url.indexOf("]"); int extraIdx = url.indexOf(PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR, idx + 1); if (extraIdx != -1) { - // skip the JDBC_PROTOCOL_SEPARATOR + //after zk quorums there should be a separator + if (extraIdx != idx + 1) { + throw new SQLExceptionInfo.Builder(SQLExceptionCode.MALFORMED_CONNECTION_URL) + .setMessage(String.format("URL %s is not a valid HA connection string", + url)) + .build() + .buildException(); + } additionalJDBCParams = url.substring(extraIdx + 1); + //Get the principal + extraIdx = additionalJDBCParams.indexOf(PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR); + if (extraIdx != -1) { + if (extraIdx != 0) { + principal = additionalJDBCParams.substring(0, extraIdx); + } + //Storing terminator as part of additional Params + additionalJDBCParams = additionalJDBCParams.substring(extraIdx + 1); + } else { + extraIdx = additionalJDBCParams.indexOf(PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR); + if (extraIdx != -1) { + //Not storing terminator to make it consistent. + principal = additionalJDBCParams.substring(0, extraIdx); + additionalJDBCParams = String.valueOf(PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR); + } else { + principal = additionalJDBCParams; + additionalJDBCParams = null; + } + } + } else { + extraIdx = url.indexOf(PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR, idx + 1); + if (extraIdx != -1) { + //There is something in between zkquorum and terminator but no separator(s), + //So not sure what it is + if (extraIdx != idx + 1) { + throw new SQLExceptionInfo.Builder(SQLExceptionCode.MALFORMED_CONNECTION_URL) + .setMessage(String.format("URL %s is not a valid HA connection string", + url)) + .build() + .buildException(); + } else { + additionalJDBCParams = url.substring(extraIdx); + } + } } - url = url.substring(url.indexOf("[") + 1, url.indexOf("]")); - String[] urls = url.split("\\|"); + String name = properties.getProperty(PHOENIX_HA_GROUP_ATTR); + if (StringUtils.isEmpty(name)) { + throw new SQLExceptionInfo.Builder(SQLExceptionCode.HA_INVALID_PROPERTIES) + .setMessage(String.format("HA group name can not be empty for HA URL %s", url)) + .build() + .buildException(); + } + HAURLInfo haurlInfo = new HAURLInfo(name, principal, additionalJDBCParams); + HAGroupInfo info = getHAGroupInfo(url, properties); + URLS.computeIfAbsent(info, haGroupInfo -> new HashSet<>()).add(haurlInfo); + return haurlInfo; + } + private static HAGroupInfo getHAGroupInfo(String url, Properties properties) + throws SQLException { + url = checkUrl(url); + url = url.substring(url.indexOf("[") + 1, url.indexOf("]")); + String [] urls = url.split("\\|"); String name = properties.getProperty(PHOENIX_HA_GROUP_ATTR); if (StringUtils.isEmpty(name)) { throw new SQLExceptionInfo.Builder(SQLExceptionCode.HA_INVALID_PROPERTIES) @@ -224,7 +296,26 @@ public static HAGroupInfo getHAGroupInfo(String url, Properties properties) .build() .buildException(); } - return new HAGroupInfo(name, urls[0], urls[1], additionalJDBCParams); + return new HAGroupInfo(name, urls[0], urls[1]); + } + + /** + * checks if the given url is appropriate for HA Connection + * @param url + * @return the url without protocol + * @throws SQLException + */ + private static String checkUrl(String url) throws SQLException { + if (url.startsWith(PhoenixRuntime.JDBC_PROTOCOL)) { + url = url.substring(PhoenixRuntime.JDBC_PROTOCOL.length() + 1); + } + if (!(url.contains("[") && url.contains("|") && url.contains("]"))) { + throw new SQLExceptionInfo.Builder(SQLExceptionCode.MALFORMED_CONNECTION_URL) + .setMessage(String.format("URL %s is not a valid HA connection string", url)) + .build() + .buildException(); + } + return url; } /** @@ -488,7 +579,7 @@ private void waitForInitialization(Properties properties) throws IOException { * @return a JDBC connection implementation * @throws SQLException if fails to connect a JDBC connection */ - public Connection connect(Properties properties) throws SQLException { + public Connection connect(Properties properties, HAURLInfo haurlInfo) throws SQLException { if (state != State.READY) { throw new SQLExceptionInfo .Builder(SQLExceptionCode.CANNOT_ESTABLISH_CONNECTION) @@ -497,7 +588,7 @@ public Connection connect(Properties properties) throws SQLException { .build() .buildException(); } - return roleRecord.getPolicy().provide(this, properties); + return roleRecord.getPolicy().provide(this, properties, haurlInfo); } /** @@ -509,11 +600,12 @@ public Connection connect(Properties properties) throws SQLException { * @return a Phoenix connection to current active HBase cluster * @throws SQLException if fails to get a connection */ - PhoenixConnection connectActive(final Properties properties) throws SQLException { + PhoenixConnection connectActive(final Properties properties, final HAURLInfo haurlInfo) + throws SQLException { try { Optional url = roleRecord.getActiveUrl(); if (state == State.READY && url.isPresent()) { - PhoenixConnection conn = connectToOneCluster(url.get(), properties); + PhoenixConnection conn = connectToOneCluster(url.get(), properties, haurlInfo); // After connection is created, double check if the cluster is still ACTIVE // This is to make sure the newly created connection will not be returned to client // if the target cluster is not active any more. This can happen during failover. @@ -575,9 +667,10 @@ boolean isActive(PhoenixConnection connection) { *

* The URL should belong to one of the two ZK clusters in this HA group. It returns the Phoenix * connection to the given cluster without checking the context of the cluster's role. Please - * use {@link #connectActive(Properties)} to connect to the ACTIVE cluster. + * use {@link #connectActive(Properties, HAURLInfo)} to connect to the ACTIVE cluster. */ - PhoenixConnection connectToOneCluster(String url, Properties properties) throws SQLException { + PhoenixConnection connectToOneCluster(String url, Properties properties, HAURLInfo haurlInfo) + throws SQLException { Preconditions.checkNotNull(url); if (url.startsWith(PhoenixRuntime.JDBC_PROTOCOL)) { Preconditions.checkArgument(url.length() > PhoenixRuntime.JDBC_PROTOCOL.length(), @@ -587,7 +680,7 @@ PhoenixConnection connectToOneCluster(String url, Properties properties) throws Preconditions.checkArgument(url.equals(info.getUrl1()) || url.equals(info.getUrl2()), "The URL '" + url + "' does not belong to this HA group " + info); - String jdbcString = info.getJDBCUrl(url); + String jdbcString = info.getJDBCUrl(url, haurlInfo); ClusterRole role = roleRecord.getRole(url); if (!role.canConnect()) { @@ -742,7 +835,8 @@ enum State {UNINITIALIZED, READY, IN_TRANSITION, CLOSED} * An HAGroupInfo contains information of an HA group. *

* It is constructed based on client input, including the JDBC connection string and properties. - * Objects of this class are used as the keys of HA group cache {@link #GROUPS}. + * Objects of this class are used as the keys of HA group cache {@link #GROUPS} and HA url info cache + * {@link #URLS}. *

* This class is immutable. */ @@ -750,9 +844,8 @@ enum State {UNINITIALIZED, READY, IN_TRANSITION, CLOSED} static final class HAGroupInfo { private final String name; private final PairOfSameType urls; - private final String additionalJDBCParams; - HAGroupInfo(String name, String url1, String url2, String additionalJDBCParams) { + HAGroupInfo(String name, String url1, String url2) { Preconditions.checkNotNull(name); Preconditions.checkNotNull(url1); Preconditions.checkNotNull(url2); @@ -766,11 +859,6 @@ static final class HAGroupInfo { } else { this.urls = new PairOfSameType<>(url1, url2); } - this.additionalJDBCParams = additionalJDBCParams; - } - - HAGroupInfo(String name, String url1, String url2) { - this(name, url1, url2, null); } public String getName() { @@ -785,26 +873,47 @@ public String getUrl2() { return urls.getSecond(); } - public String getJDBCUrl(String zkUrl) { - Preconditions.checkArgument(zkUrl.equals(getUrl1()) || zkUrl.equals(getUrl2()), + + public String getJDBCUrl(String zkUrl, HAURLInfo haURLInfo) { + Preconditions.checkArgument(zkUrl.equals(getUrl1()) || zkUrl.equals(getUrl2()) + || zkUrl.equals("[" + getUrl1() + "|" + getUrl2() + "]") + || zkUrl.equals("[" + getUrl2() + "|" + getUrl1() + "]"), "The URL '" + zkUrl + "' does not belong to this HA group " + this); StringBuilder sb = new StringBuilder(); sb.append(PhoenixRuntime.JDBC_PROTOCOL_ZK); sb.append(PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR); sb.append(zkUrl); - if (!Strings.isNullOrEmpty(additionalJDBCParams)) { - sb.append(PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR); - sb.append(additionalJDBCParams); + if (haURLInfo != null) { + if (!Strings.isNullOrEmpty(haURLInfo.getPrincipal()) + && !Strings.isNullOrEmpty(haURLInfo.getAdditionalJDBCParams())) { + sb.append(PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR); + sb.append(haURLInfo.getPrincipal()); + if (!haURLInfo.getAdditionalJDBCParams(). + equals(String.valueOf(PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR))) { + sb.append(PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR); + } + sb.append(haURLInfo.getAdditionalJDBCParams()); + } else if (!Strings.isNullOrEmpty(haURLInfo.getPrincipal())) { + sb.append(PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR); + sb.append(haURLInfo.getPrincipal()); + } else if (!Strings.isNullOrEmpty(haURLInfo.getAdditionalJDBCParams())) { + if (!haURLInfo.getAdditionalJDBCParams(). + equals(String.valueOf(PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR))) { + sb.append(PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR); + sb.append(PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR); + } + sb.append(haURLInfo.getAdditionalJDBCParams()); + } } return sb.toString(); } - public String getJDBCUrl1() { - return getJDBCUrl(getUrl1()); + public String getJDBCUrl1(HAURLInfo haURLInfo) { + return getJDBCUrl(getUrl1(), haURLInfo); } - public String getJDBCUrl2() { - return getJDBCUrl(getUrl2()); + public String getJDBCUrl2(HAURLInfo haURLInfo) { + return getJDBCUrl(getUrl2(), haURLInfo); } /** diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityPolicy.java b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityPolicy.java index 93d26618644..7a82d47a936 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityPolicy.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityPolicy.java @@ -39,9 +39,10 @@ enum HighAvailabilityPolicy { FAILOVER { @Override - public Connection provide(HighAvailabilityGroup haGroup, Properties info) - throws SQLException { - return new FailoverPhoenixConnection(haGroup, info); + public Connection provide(HighAvailabilityGroup haGroup, Properties info, + HAURLInfo haURLInfo) throws SQLException { + FailoverPhoenixContext context = new FailoverPhoenixContext(info, haGroup, haURLInfo); + return new FailoverPhoenixConnection(context); } @Override void transitClusterRole(HighAvailabilityGroup haGroup, ClusterRoleRecord oldRecord, @@ -65,50 +66,61 @@ private void transitStandby(HighAvailabilityGroup haGroup, String zkUrl) LOG.info("Cluster {} becomes STANDBY in HA group {}, now close all its connections", zkUrl, haGroup.getGroupInfo()); ConnectionQueryServices cqs = null; - try { - cqs = PhoenixDriver.INSTANCE.getConnectionQueryServices( - haGroup.getGroupInfo().getJDBCUrl(zkUrl), haGroup.getProperties()); - cqs.closeAllConnections(new SQLExceptionInfo - .Builder(SQLExceptionCode.HA_CLOSED_AFTER_FAILOVER) - .setMessage("Phoenix connection got closed due to failover") - .setHaGroupInfo(haGroup.getGroupInfo().toString())); - LOG.info("Closed all connections to cluster {} for HA group {}", zkUrl, - haGroup.getGroupInfo()); - } finally { - if (cqs != null) { - // CQS is closed but it is not invalidated from global cache in PhoenixDriver - // so that any new connection will get error instead of creating a new CQS - LOG.info("Closing CQS after cluster '{}' becomes STANDBY", zkUrl); - cqs.close(); - LOG.info("Successfully closed CQS after cluster '{}' becomes STANDBY", zkUrl); + + //Close connections for every HAURLInfo's (different principal) conn for a give HAGroup + for (HAURLInfo haurlInfo : HighAvailabilityGroup.URLS.get(haGroup.getGroupInfo())) { + try { + String jdbcZKUrl = haGroup.getGroupInfo().getJDBCUrl(zkUrl, haurlInfo); + cqs = PhoenixDriver.INSTANCE.getConnectionQueryServices( + jdbcZKUrl, haGroup.getProperties()); + cqs.closeAllConnections(new SQLExceptionInfo + .Builder(SQLExceptionCode.HA_CLOSED_AFTER_FAILOVER) + .setMessage("Phoenix connection got closed due to failover") + .setHaGroupInfo(haGroup.getGroupInfo().toString())); + LOG.info("Closed all connections to cluster {} for HA group {}", + jdbcZKUrl, haGroup.getGroupInfo()); + } finally { + if (cqs != null) { + //CQS is closed but it is not invalidated from global cache in PhoenixDriver + //so that any new connection will get error instead of creating a new CQS + LOG.info("Closing CQS after cluster '{}' becomes STANDBY", zkUrl); + cqs.close(); + LOG.info("Successfully closed CQS after cluster '{}' becomes STANDBY", + zkUrl); + } } } } + private void transitActive(HighAvailabilityGroup haGroup, String zkUrl) throws SQLException { // Invalidate CQS cache if any that has been closed but has not been cleared - LOG.info("invalidating cqs cache for zkUrl: " + zkUrl); - PhoenixDriver.INSTANCE.invalidateCache(haGroup.getGroupInfo().getJDBCUrl(zkUrl), - haGroup.getProperties()); + for (HAURLInfo haurlInfo : HighAvailabilityGroup.URLS.get(haGroup.getGroupInfo())) { + String jdbcZKUrl = haGroup.getGroupInfo().getJDBCUrl(zkUrl, haurlInfo); + LOG.info("invalidating cqs cache for zkUrl: " + jdbcZKUrl); + PhoenixDriver.INSTANCE.invalidateCache(jdbcZKUrl, + haGroup.getProperties()); + } } }, PARALLEL { @Override - public Connection provide(HighAvailabilityGroup haGroup, Properties info) - throws SQLException { + public Connection provide(HighAvailabilityGroup haGroup, Properties info, + HAURLInfo haURLInfo) throws SQLException { List executorCapacities = PhoenixHAExecutorServiceProvider.hasCapacity(info); if (executorCapacities.contains(Boolean.TRUE)) { ParallelPhoenixContext context = new ParallelPhoenixContext(info, haGroup, - PhoenixHAExecutorServiceProvider.get(info), executorCapacities); + PhoenixHAExecutorServiceProvider.get(info), + executorCapacities, haURLInfo); return new ParallelPhoenixConnection(context); } else { // TODO: Once we have operation/primary wait timeout use the same // Give regular connection or a failover connection? LOG.warn("Falling back to single phoenix connection due to resource constraints"); GlobalClientMetrics.GLOBAL_HA_PARALLEL_CONNECTION_FALLBACK_COUNTER.increment(); - return haGroup.connectActive(info); + return haGroup.connectActive(info, haURLInfo); } } @Override @@ -125,10 +137,12 @@ void transitClusterRole(HighAvailabilityGroup haGroup, ClusterRoleRecord oldReco * * @param haGroup The high availability (HA) group * @param info Connection properties + * @param haurlInfo additional info of client provided url * @return a JDBC connection * @throws SQLException if fails to provide a connection */ - abstract Connection provide(HighAvailabilityGroup haGroup, Properties info) throws SQLException; + abstract Connection provide(HighAvailabilityGroup haGroup, Properties info, HAURLInfo haurlInfo) + throws SQLException; /** * Call-back function when a cluster role transition is detected in the high availability group. diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ParallelPhoenixConnection.java b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ParallelPhoenixConnection.java index 3184af7adf2..c0670274934 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ParallelPhoenixConnection.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ParallelPhoenixConnection.java @@ -69,14 +69,15 @@ public class ParallelPhoenixConnection implements PhoenixMonitoredConnection { CompletableFuture futureConnection2; public ParallelPhoenixConnection(ParallelPhoenixContext context) throws SQLException { this.context = context; - LOG.trace("First Url: {} Second Url: {}", context.getHaGroup().getGroupInfo().getJDBCUrl1(), - context.getHaGroup().getGroupInfo().getJDBCUrl2()); + LOG.trace("First Url: {} Second Url: {}", context.getHaGroup().getGroupInfo(). + getJDBCUrl1(context.getHaurlInfo()), context.getHaGroup().getGroupInfo(). + getJDBCUrl2(context.getHaurlInfo())); futureConnection1 = context.chainOnConn1(() -> getConnection(context.getHaGroup(), - context.getHaGroup().getGroupInfo().getJDBCUrl1(), - context.getProperties())); + context.getHaGroup().getGroupInfo().getJDBCUrl1(context.getHaurlInfo()), + context.getProperties(), context.getHaurlInfo())); futureConnection2 = context.chainOnConn2(() -> getConnection(context.getHaGroup(), - context.getHaGroup().getGroupInfo().getJDBCUrl2(), - context.getProperties())); + context.getHaGroup().getGroupInfo().getJDBCUrl2(context.getHaurlInfo()), + context.getProperties(), context.getHaurlInfo())); // Ensure one connection is successful before returning ParallelPhoenixUtil.INSTANCE.runFutures(Arrays.asList(futureConnection1, futureConnection2), context, false); @@ -91,9 +92,10 @@ public ParallelPhoenixConnection(ParallelPhoenixContext context) throws SQLExcep ParallelPhoenixUtil.INSTANCE.runFutures(Arrays.asList(futureConnection1, futureConnection2), context, false); } - private static PhoenixConnection getConnection(HighAvailabilityGroup haGroup, String url, Properties properties) { + private static PhoenixConnection getConnection(HighAvailabilityGroup haGroup, String url, + Properties properties, HAURLInfo haurlInfo) { try { - return haGroup.connectToOneCluster(url, properties); + return haGroup.connectToOneCluster(url, properties, haurlInfo); } catch (SQLException exception) { if (LOG.isTraceEnabled()) { LOG.trace(String.format("Failed to get a connection for haGroup %s to %s", haGroup.toString(), url), exception); diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ParallelPhoenixContext.java b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ParallelPhoenixContext.java index 567abad2dd3..7d496dd4c63 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ParallelPhoenixContext.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ParallelPhoenixContext.java @@ -57,6 +57,7 @@ public class ParallelPhoenixContext { private final Properties properties; private final HighAvailabilityGroup haGroup; + private final HAURLInfo haurlInfo; private final long operationTimeoutMs; private volatile boolean isClosed = false; @@ -72,12 +73,15 @@ public class ParallelPhoenixContext { * @param executorCapacities Ordered list of executorCapacities corresponding to executors. Null is interpreted as * executors having capacity */ - ParallelPhoenixContext(Properties properties, HighAvailabilityGroup haGroup, List executors, List executorCapacities) { + ParallelPhoenixContext(Properties properties, HighAvailabilityGroup haGroup, + List executors, + List executorCapacities, HAURLInfo haurlInfo) { Preconditions.checkNotNull(executors); Preconditions.checkArgument(executors.size() >= 2, "Expected 2 executor pairs, one for each connection with a normal/close executor"); GLOBAL_HA_PARALLEL_CONNECTION_CREATED_COUNTER.increment(); this.properties = properties; this.haGroup = haGroup; + this.haurlInfo = haurlInfo; this.parallelPhoenixMetrics = new ParallelPhoenixMetrics(); this.operationTimeoutMs = getOperationTimeoutMs(properties); @@ -124,6 +128,10 @@ public HighAvailabilityGroup getHaGroup() { return haGroup; } + public HAURLInfo getHaurlInfo() { + return haurlInfo; + } + public boolean isAutoCommit() { return Boolean.valueOf((String) properties.getOrDefault(AUTO_COMMIT_ATTRIB, "false")); } diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java index f4de1ae7793..2ea63fb6943 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java @@ -135,16 +135,18 @@ protected final Connection createConnection(String url, Properties info) throws Properties augmentedInfo = PropertiesUtil.deepCopy(info); augmentedInfo.putAll(getDefaultProps().asMap()); if (url.contains("|")) { + // Get HAURLInfo to pass it to connection creation + HAURLInfo haurlInfo = HighAvailabilityGroup.getUrlInfo(url, augmentedInfo); // High availability connection using two clusters Optional haGroup = HighAvailabilityGroup.get(url, augmentedInfo); if (haGroup.isPresent()) { - return haGroup.get().connect(augmentedInfo); + return haGroup.get().connect(augmentedInfo, haurlInfo); } else { // If empty HA group is returned, fall back to single cluster. url = HighAvailabilityGroup.getFallbackCluster(url, info).orElseThrow( - () -> new SQLException( - "HA group can not be initialized, fallback to single cluster")); + () -> new SQLException( + "HA group can not be initialized, fallback to single cluster")); } } ConnectionQueryServices cqs = getConnectionQueryServices(url, augmentedInfo); diff --git a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/FailoverPhoenixConnectionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/FailoverPhoenixConnectionIT.java index 890529c68ea..29c5f2f1b0f 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/FailoverPhoenixConnectionIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/FailoverPhoenixConnectionIT.java @@ -19,6 +19,7 @@ import static org.apache.hadoop.test.GenericTestUtils.waitFor; import static org.apache.phoenix.exception.SQLExceptionCode.CANNOT_ESTABLISH_CONNECTION; +import static org.apache.phoenix.jdbc.HighAvailabilityGroup.URLS; import static org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.doTestBasicOperationsWithConnection; import static org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.HBaseTestingUtilityPair; import static org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_GROUP_ATTR; @@ -46,7 +47,10 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.commons.lang3.RandomStringUtils; +import org.apache.commons.lang3.RandomUtils; import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; import org.apache.phoenix.exception.FailoverSQLException; import org.apache.phoenix.jdbc.ClusterRoleRecord.ClusterRole; @@ -56,6 +60,7 @@ import org.apache.phoenix.util.PhoenixRuntime; import org.junit.After; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Rule; @@ -592,6 +597,138 @@ public void testFailoverMetrics() throws Exception { assertTrue(PhoenixRuntime.getWriteMetricInfoForMutationsSinceLastReset(conn).isEmpty()); } + /** + * Test transit cluster role record which should affect all the principals for a given HAGroup + */ + @Test(timeout = 300000) + public void testAllConnectionsOfHAIsAffected() throws Exception { + Connection conn = createFailoverConnection(); + PhoenixConnection wrappedConn = ((FailoverPhoenixConnection) conn).getWrappedConnection(); + + //Create another connection with same params except different principal + //This should use same haGroup as default one and transiting that haGroup should affect this conn as well. + String principal = RandomStringUtils.randomAlphabetic(5); + Connection conn2 = DriverManager.getConnection(CLUSTERS.getJdbcHAUrl(principal), clientProperties); + PhoenixConnection wrappedConn2 = ((FailoverPhoenixConnection) conn2).getWrappedConnection(); + + // Following we create a new HA group and create a connection against this HA group with default PRINCIPAL + String haGroupName2 = haGroup.getGroupInfo().getName() + "2"; + CLUSTERS.initClusterRole(haGroupName2, HighAvailabilityPolicy.FAILOVER); + Properties clientProperties2 = new Properties(clientProperties); + clientProperties2.setProperty(PHOENIX_HA_GROUP_ATTR, haGroupName2); + Connection conn3 = DriverManager.getConnection(CLUSTERS.getJdbcHAUrl(), clientProperties2); + PhoenixConnection wrappedConn3 = ((FailoverPhoenixConnection) conn3).getWrappedConnection(); + + //Create another connection with haGroup2 with same principal as for conn2 with haGroup, which should not be + //affected by transiting haGroup + Connection conn4 = DriverManager.getConnection(CLUSTERS.getJdbcHAUrl(principal), clientProperties2); + PhoenixConnection wrappedConn4 = ((FailoverPhoenixConnection) conn4).getWrappedConnection(); + + assertFalse(wrappedConn.isClosed()); + assertFalse(wrappedConn2.isClosed()); + assertFalse(wrappedConn3.isClosed()); + assertFalse(wrappedConn4.isClosed()); + + CLUSTERS.transitClusterRole(haGroup, ClusterRole.STANDBY, ClusterRole.ACTIVE); + + assertTrue(wrappedConn.isClosed()); + assertTrue(wrappedConn2.isClosed()); + assertFalse(wrappedConn3.isClosed()); //only connection with haGroup will be closed irrespective of principal + assertFalse(wrappedConn4.isClosed()); + + } + + @Test(timeout = 300000) + public void testUserPrincipal() throws Exception { + Connection conn = createFailoverConnection(); //PRINCIPAL, haGroupName + FailoverPhoenixConnection fconn = (FailoverPhoenixConnection) conn; + ConnectionQueryServices cqsi = PhoenixDriver.INSTANCE.getConnectionQueryServices(CLUSTERS.getJdbcUrl1(), clientProperties); + + String haGroupName2 = testName.getMethodName() + RandomStringUtils.randomAlphabetic(3);; + CLUSTERS.initClusterRole(haGroupName2, HighAvailabilityPolicy.FAILOVER); + clientProperties.setProperty(PHOENIX_HA_GROUP_ATTR, haGroupName2); + Connection conn2 = DriverManager.getConnection(CLUSTERS.getJdbcHAUrl(), clientProperties); //PRINCIPAL,haGroupName2 + FailoverPhoenixConnection fconn2 = (FailoverPhoenixConnection) conn2; + ConnectionQueryServices cqsi2 = PhoenixDriver.INSTANCE.getConnectionQueryServices(CLUSTERS.getJdbcUrl1(), clientProperties); + + Connection conn3 = DriverManager.getConnection(CLUSTERS.getJdbcHAUrlWithoutPrincipal(), clientProperties); //null,haGroupName2 + FailoverPhoenixConnection fconn3 = (FailoverPhoenixConnection) conn3; + ConnectionQueryServices cqsi3 = PhoenixDriver.INSTANCE.getConnectionQueryServices(CLUSTERS. + getJdbcUrlWithoutPrincipal(CLUSTERS.getUrl1()), clientProperties); + + clientProperties.setProperty(PHOENIX_HA_GROUP_ATTR, haGroupName); + String principal4 = RandomStringUtils.randomAlphabetic(5); + Connection conn4 = DriverManager.getConnection(CLUSTERS.getJdbcHAUrl(principal4), clientProperties);//principal4, haGroupName + FailoverPhoenixConnection fconn4 = (FailoverPhoenixConnection) conn4; + ConnectionQueryServices cqsi4 = PhoenixDriver.INSTANCE.getConnectionQueryServices(CLUSTERS.getJdbcUrl1(principal4), clientProperties); + + //Check wrapped connection urls + Assert.assertEquals(CLUSTERS.getJdbcUrl1(), fconn.getWrappedConnection().getURL()); + Assert.assertEquals(CLUSTERS.getJdbcUrl1(), fconn2.getWrappedConnection().getURL()); + Assert.assertEquals(CLUSTERS.getJdbcUrlWithoutPrincipal(CLUSTERS.getUrl1()), fconn3.getWrappedConnection().getURL()); + Assert.assertEquals(CLUSTERS.getJdbcUrl1(principal4), fconn4.getWrappedConnection().getURL()); + + //Check cqsi objects should be same with what we get from connections + Assert.assertEquals(HBaseTestingUtilityPair.PRINCIPAL,cqsi.getUserName()); + Assert.assertSame(cqsi, fconn.getWrappedConnection().getQueryServices()); + + Assert.assertEquals(HBaseTestingUtilityPair.PRINCIPAL,cqsi2.getUserName()); + Assert.assertSame(cqsi2, fconn2.getWrappedConnection().getQueryServices()); + + Assert.assertNull(cqsi3.getUserName()); + Assert.assertSame(cqsi3, fconn3.getWrappedConnection().getQueryServices()); + + Assert.assertEquals(principal4,cqsi4.getUserName()); + Assert.assertSame(cqsi4, fconn4.getWrappedConnection().getQueryServices()); + + } + + @Test(timeout = 300000) + public void testHAGroupMappingsWithDifferentPrincipalsOnDifferentThreads() throws Exception { + int numThreads = RandomUtils.nextInt(3, 5); + List connectionThreads = new ArrayList<>(numThreads); + AtomicBoolean isPrincipalNull = new AtomicBoolean(false); + //Creating random number of connections one connection per thread with different principal + //Including one connection will null principal all of them will be using given haGroupName + //which is specific to test + for (int i = 0; i < numThreads; i++) { + isPrincipalNull.set((i + 1) % 3 == 0); + connectionThreads.add(new Thread(() -> { + try { + createConnectionWithRandomPrincipal(isPrincipalNull.get()); + } catch (SQLException e) { + e.printStackTrace(); + } + })); + } + + //Create multiple connections with given principal + String principal = RandomStringUtils.randomAlphabetic(3); + int numConnectionsWithSamePrincipal = 3; + for (int i = 0; i < numConnectionsWithSamePrincipal; i++) { + connectionThreads.add(new Thread(() -> { + try { + DriverManager.getConnection(CLUSTERS.getJdbcHAUrl(principal), clientProperties); + } catch (SQLException e) { + e.printStackTrace(); + } + })); + } + + for (Thread connectionThread : connectionThreads) { + connectionThread.start(); + } + + for (Thread connectionThread : connectionThreads) { + connectionThread.join(); + } + + //For the given ha group of current test the value in URLS set for current haGroupInfo + //should be numThreads + 1 as all the connections created with same principal should have + //one entry in map. + Assert.assertEquals(numThreads + 1, URLS.get(haGroup.getGroupInfo()).size()); + } + /** * Helper method to verify that the failover connection has expected mutation metrics. * @@ -636,4 +773,12 @@ private static void doTestActionShouldFailBecauseOfFailover(Action action) throw LOG.info("Got expected failover exception after connection is closed.", e); } // all other type of exception will fail this test. } + + private Connection createConnectionWithRandomPrincipal(boolean isPrincipalNull) throws SQLException { + String principal = RandomStringUtils.randomAlphabetic(5); + if (isPrincipalNull) { + return DriverManager.getConnection(CLUSTERS.getJdbcHAUrlWithoutPrincipal(), clientProperties); + } + return DriverManager.getConnection(CLUSTERS.getJdbcHAUrl(principal), clientProperties); + } } diff --git a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityGroupIT.java b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityGroupIT.java index f7a99ba92a8..5f1b86fdaad 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityGroupIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityGroupIT.java @@ -26,6 +26,7 @@ import static org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.getHighAvailibilityGroup; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertSame; @@ -36,6 +37,7 @@ import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; +import java.util.Map; import java.util.Optional; import java.util.Properties; import java.util.concurrent.TimeUnit; @@ -76,6 +78,7 @@ public class HighAvailabilityGroupIT { private String jdbcUrl; /** Failover HA group for to test. */ private HighAvailabilityGroup haGroup; + private HAURLInfo haURLInfo; /** HA Group name for this test. */ private String haGroupName; @@ -106,6 +109,7 @@ public void setup() throws Exception { // Make first cluster ACTIVE CLUSTERS.initClusterRole(haGroupName, HighAvailabilityPolicy.FAILOVER); jdbcUrl = CLUSTERS.getJdbcHAUrl(); + haURLInfo = HighAvailabilityGroup.getUrlInfo(jdbcUrl, clientProperties); haGroup = getHighAvailibilityGroup(jdbcUrl,clientProperties); } @@ -164,6 +168,185 @@ public void testGet() throws Exception { } } + /** + * Test HAGroup.get() method to get same HAGroups if we have different principals only and different HAGroups + * if anything else in the key changes + * @throws Exception + */ + @Test + public void testGetWithDifferentPrincipals() throws Exception { + //Client will get same HAGroup if we have difference in principal only but JDBCURLs should have different + //principals + assertEquals(CLUSTERS.getJdbcUrl1(), haGroup.getGroupInfo().getJDBCUrl(CLUSTERS.getUrl1(), haURLInfo)); + assertEquals(CLUSTERS.getJdbcUrl2(), haGroup.getGroupInfo().getJDBCUrl(CLUSTERS.getUrl2(), haURLInfo)); + assertEquals(CLUSTERS.getJdbcHAUrl(), haGroup.getGroupInfo(). + getJDBCUrl(String.format("[%s|%s]", CLUSTERS.getUrl1(), CLUSTERS.getUrl2()), haURLInfo)); + + //Try creating new HAGroup with same params except Principal + Optional haGroup2 = Optional.empty(); + try { + String principal = RandomStringUtils.randomAlphabetic(5); + String haUrl2 = CLUSTERS.getJdbcHAUrl(principal); + HAURLInfo haURLInfo2 = HighAvailabilityGroup.getUrlInfo(haUrl2, clientProperties); + haGroup2 = HighAvailabilityGroup.get(haUrl2, clientProperties); + assertTrue(haGroup2.isPresent()); + //We should get same HAGroup as we have mapping of -> HAGroup + assertSame(haGroup, haGroup2.get()); + + //URLs we are getting for haGroup2 should have newer principal i.e. Current HAURLInfo should have new + //principal instead default PRINCIPAL + assertEquals(CLUSTERS.getJdbcUrl(CLUSTERS.getUrl1(), principal), + haGroup2.get().getGroupInfo().getJDBCUrl(CLUSTERS.getUrl1(), haURLInfo2)); + assertEquals(CLUSTERS.getJdbcUrl(CLUSTERS.getUrl2(), principal), + haGroup2.get().getGroupInfo().getJDBCUrl(CLUSTERS.getUrl2(), haURLInfo2)); + assertEquals(CLUSTERS.getJdbcHAUrl(principal), haGroup2.get().getGroupInfo(). + getJDBCUrl(String.format("[%s|%s]", CLUSTERS.getUrl1(), CLUSTERS.getUrl2()), haURLInfo2)); + } finally { + haGroup2.ifPresent(HighAvailabilityGroup::close); + } + + // Client will get a different HighAvailabilityGroup when group name is different and with same principal as + // default HAGroup + String haGroupName3 = testName.getMethodName() + RandomStringUtils.randomAlphabetic(3); + CLUSTERS.initClusterRole(haGroupName3, HighAvailabilityPolicy.FAILOVER); + clientProperties.setProperty(PHOENIX_HA_GROUP_ATTR, haGroupName3); + Optional haGroup3 = Optional.empty(); + Optional haGroup4 = Optional.empty(); + try { + HAURLInfo haurlInfo3 = HighAvailabilityGroup.getUrlInfo(jdbcUrl, clientProperties); + haGroup3 = HighAvailabilityGroup.get(jdbcUrl, clientProperties); + assertTrue(haGroup3.isPresent()); + assertNotSame(haGroup, haGroup3.get()); + + assertNotSame(haGroup.getGroupInfo(), haGroup3.get().getGroupInfo()); + + //URLs we are getting for haGroup3 should have same principal as default PRINCIPAL. + assertEquals(CLUSTERS.getJdbcUrl1(), + haGroup3.get().getGroupInfo().getJDBCUrl(CLUSTERS.getUrl1(), haurlInfo3)); + assertEquals(CLUSTERS.getJdbcUrl2(), + haGroup3.get().getGroupInfo().getJDBCUrl(CLUSTERS.getUrl2(), haurlInfo3)); + assertEquals(CLUSTERS.getJdbcHAUrl(), haGroup3.get().getGroupInfo(). + getJDBCUrl(String.format("[%s|%s]", CLUSTERS.getUrl1(), CLUSTERS.getUrl2()), haurlInfo3)); + + // should get same ha Group without principal + String haUrl4 = CLUSTERS.getJdbcHAUrlWithoutPrincipal(); + HAURLInfo haURLInfo4 = HighAvailabilityGroup.getUrlInfo(haUrl4, clientProperties); + haGroup4 = HighAvailabilityGroup.get(haUrl4, clientProperties); + assertTrue(haGroup4.isPresent()); + assertNotSame(haGroup, haGroup4.get()); + assertSame(haGroup3.get(), haGroup4.get()); + + assertNotSame(haGroup.getGroupInfo(), haGroup4.get().getGroupInfo()); + assertSame(haGroup3.get().getGroupInfo(), haGroup4.get().getGroupInfo()); + assertNotEquals(haurlInfo3, haURLInfo4); + + } finally { + haGroup3.ifPresent(HighAvailabilityGroup::close); + haGroup4.ifPresent(HighAvailabilityGroup::close); + } + + // Client will get the same HighAvailabilityGroup using the same information as key again + clientProperties.setProperty(PHOENIX_HA_GROUP_ATTR, haGroup.getGroupInfo().getName()); + Optional haGroup5 = Optional.empty(); + try { + //Again using a random principal which should be used now for generating jdbcUrls + String principal = RandomStringUtils.randomAlphabetic(5); + String haUrl5 = CLUSTERS.getJdbcHAUrl(principal); + HAURLInfo haURLInfo5 = HighAvailabilityGroup.getUrlInfo(haUrl5, clientProperties); + haGroup5 = HighAvailabilityGroup.get(haUrl5, clientProperties); + assertTrue(haGroup5.isPresent()); + assertSame(haGroup, haGroup5.get()); + + //URLs we are getting for haGroup4 should have newer principal i.e. Current HAURLInfo should have new + //principal instead default PRINCIPAL + assertEquals(CLUSTERS.getJdbcUrl(CLUSTERS.getUrl1(), principal), + haGroup4.get().getGroupInfo().getJDBCUrl(CLUSTERS.getUrl1(), haURLInfo5)); + assertEquals(CLUSTERS.getJdbcUrl(CLUSTERS.getUrl2(), principal), + haGroup4.get().getGroupInfo().getJDBCUrl(CLUSTERS.getUrl2(), haURLInfo5)); + assertEquals(CLUSTERS.getJdbcHAUrl(principal), haGroup4.get().getGroupInfo(). + getJDBCUrl(String.format("[%s|%s]", CLUSTERS.getUrl1(), CLUSTERS.getUrl2()), haURLInfo5)); + + } finally { + haGroup5.ifPresent(HighAvailabilityGroup::close); + } + + } + + @Test + public void testHAGroupMappings() throws Exception { + + //Try creating new HAGroup with same params except Principal + Optional haGroup2 = Optional.empty(); + try { + String principal = RandomStringUtils.randomAlphabetic(5); + String haUrl2 = CLUSTERS.getJdbcHAUrl(principal); + HAURLInfo haURLInfo2 = HighAvailabilityGroup.getUrlInfo(haUrl2, clientProperties); + haGroup2 = HighAvailabilityGroup.get(haUrl2, clientProperties); + assertTrue(haGroup2.isPresent()); + //We should get same HAGroup as we have mapping of -> HAGroup + assertSame(haGroup, haGroup2.get()); + //We should have 2 values on URLS mapping for the given haGroup/haGroup2. + assertEquals(2, URLS.get(haGroup.getGroupInfo()).size()); + assertTrue(URLS.get(haGroup.getGroupInfo()).contains(haURLInfo2)); + + } finally { + haGroup2.ifPresent(HighAvailabilityGroup::close); + } + + //Create 2 more different urls connecting to a different HAGroup + String haGroupName3 = testName.getMethodName() + RandomStringUtils.randomAlphabetic(3); + CLUSTERS.initClusterRole(haGroupName3, HighAvailabilityPolicy.FAILOVER); + clientProperties.setProperty(PHOENIX_HA_GROUP_ATTR, haGroupName3); + Optional haGroup3 = Optional.empty(); + Optional haGroup4 = Optional.empty(); + try { + HAURLInfo haurlInfo3 = HighAvailabilityGroup.getUrlInfo(jdbcUrl, clientProperties); + haGroup3 = HighAvailabilityGroup.get(jdbcUrl, clientProperties); + assertTrue(haGroup3.isPresent()); + assertNotSame(haGroup, haGroup3.get()); + assertNotSame(haGroup.getGroupInfo(), haGroup3.get().getGroupInfo()); + assertEquals(1, URLS.get(haGroup3.get().getGroupInfo()).size()); + + + // should get same ha Group without principal + String haUrl4 = CLUSTERS.getJdbcHAUrlWithoutPrincipal(); + HAURLInfo haURLInfo4 = HighAvailabilityGroup.getUrlInfo(haUrl4, clientProperties); + haGroup4 = HighAvailabilityGroup.get(haUrl4, clientProperties); + assertTrue(haGroup4.isPresent()); + assertNotSame(haGroup, haGroup4.get()); + assertSame(haGroup3.get(), haGroup4.get()); + assertEquals(2, URLS.get(haGroup4.get().getGroupInfo()).size()); + + assertNotEquals(haurlInfo3, haURLInfo4); + + } finally { + haGroup3.ifPresent(HighAvailabilityGroup::close); + haGroup4.ifPresent(HighAvailabilityGroup::close); + } + + // Client will get the same HighAvailabilityGroup using the same information as key again + clientProperties.setProperty(PHOENIX_HA_GROUP_ATTR, haGroup.getGroupInfo().getName()); + Optional haGroup5 = Optional.empty(); + try { + String haUrl5 = CLUSTERS.getJdbcHAUrl(); + HAURLInfo haURLInfo5 = HighAvailabilityGroup.getUrlInfo(haUrl5, clientProperties); + haGroup5 = HighAvailabilityGroup.get(haUrl5, clientProperties); + assertTrue(haGroup5.isPresent()); + assertSame(haGroup, haGroup5.get()); + + //haURLInfo5 should be same as global one and URLS mapping should not change so + //set mapping for global HAGroupInfo should have 2 values + assertEquals(2, URLS.get(haGroup.getGroupInfo()).size()); + assertTrue(URLS.get(haGroup.getGroupInfo()).contains(haURLInfo5)); + assertEquals(haURLInfo5, haURLInfo); + + + } finally { + haGroup5.ifPresent(HighAvailabilityGroup::close); + } + + } + /** * Test that HA group should see latest version of cluster role record. */ @@ -317,7 +500,7 @@ public void testGetShouldFailWithNonHAJdbcString() { */ @Test public void testConnect() throws SQLException { - Connection connection = haGroup.connect(clientProperties); + Connection connection = haGroup.connect(clientProperties, haURLInfo); assertNotNull(connection); assertNotNull(connection.unwrap(FailoverPhoenixConnection.class)); } @@ -328,11 +511,11 @@ public void testConnect() throws SQLException { @Test public void testConnectToOneCluster() throws SQLException { final String url = CLUSTERS.getJdbcUrl1(); - PhoenixConnection connection = haGroup.connectToOneCluster(url, clientProperties); + PhoenixConnection connection = haGroup.connectToOneCluster(url, clientProperties, haURLInfo); assertEquals(url, connection.getURL()); try { - haGroup.connectToOneCluster(null, clientProperties); + haGroup.connectToOneCluster(null, clientProperties, haURLInfo); fail("Should have failed since null is not in any HA group"); } catch (Exception e) { LOG.info("Got expected exception with invalid null host url", e); @@ -341,7 +524,7 @@ public void testConnectToOneCluster() throws SQLException { final String randomHostUrl = String.format("%s:%d", RandomStringUtils.randomAlphabetic(4), RandomUtils.nextInt(0,65536)); try { - haGroup.connectToOneCluster(randomHostUrl, clientProperties); + haGroup.connectToOneCluster(randomHostUrl, clientProperties, haURLInfo); fail("Should have failed since '" + randomHostUrl + "' is not in HA group " + haGroup); } catch (IllegalArgumentException e) { LOG.info("Got expected exception with invalid host url '{}'", randomHostUrl, e); @@ -351,7 +534,7 @@ public void testConnectToOneCluster() throws SQLException { /** * Test that it can connect to a given cluster in this HA group after ZK service restarts. * - * Method {@link HighAvailabilityGroup#connectToOneCluster(String, Properties)} is used by + * Method {@link HighAvailabilityGroup#connectToOneCluster(String, Properties, HAURLInfo)} is used by * Phoenix HA framework to connect to one specific HBase cluster in this HA group. The cluster * may not necessarily be in ACTIVE role. For example, parallel HA connection needs to connect * to both clusters. This tests that it can connect to a specific ZK cluster after ZK restarts. @@ -376,7 +559,7 @@ public void testConnectToOneClusterAfterZKRestart() throws Exception { doTestBasicOperationsWithConnection(conn, tableName, null); } // test with HA group to get connection to one cluster - try (Connection conn = haGroup.connectToOneCluster(jdbcUrlToCluster1, clientProperties)) { + try (Connection conn = haGroup.connectToOneCluster(jdbcUrlToCluster1, clientProperties, haURLInfo)) { doTestBasicOperationsWithConnection(conn, tableName, haGroupName); } } @@ -386,9 +569,9 @@ public void testConnectToOneClusterAfterZKRestart() throws Exception { */ @Test public void testIsConnectionActive() throws Exception { - PhoenixConnection conn1 = haGroup.connectToOneCluster(CLUSTERS.getUrl1(), clientProperties); + PhoenixConnection conn1 = haGroup.connectToOneCluster(CLUSTERS.getUrl1(), clientProperties, haURLInfo); assertTrue(haGroup.isActive(conn1)); - PhoenixConnection conn2 = haGroup.connectToOneCluster(CLUSTERS.getUrl2(), clientProperties); + PhoenixConnection conn2 = haGroup.connectToOneCluster(CLUSTERS.getUrl2(), clientProperties, haURLInfo); assertFalse(haGroup.isActive(conn2)); CLUSTERS.transitClusterRole(haGroup, ClusterRole.STANDBY, ClusterRole.ACTIVE); @@ -418,7 +601,7 @@ public void testNodeChange() throws Exception { public void testCanConnectWhenStandbyHBaseClusterDown() throws Exception { doTestWhenOneHBaseDown(CLUSTERS.getHBaseCluster2(), () -> { // HA group is already initialized - Connection connection = haGroup.connect(clientProperties); + Connection connection = haGroup.connect(clientProperties, haURLInfo); assertNotNull(connection); assertNotNull(connection.unwrap(FailoverPhoenixConnection.class)); }); @@ -437,7 +620,7 @@ public void testCanConnectWhenStandbyZKClusterDown() throws Exception { HighAvailabilityGroup.CURATOR_CACHE.invalidateAll(); // HA group is already initialized - Connection connection = haGroup.connect(clientProperties); + Connection connection = haGroup.connect(clientProperties, haURLInfo); assertNotNull(connection); assertNotNull(connection.unwrap(FailoverPhoenixConnection.class)); }); @@ -459,11 +642,12 @@ public void testCanConnectNewGroupWhenStandbyHBaseClusterDown() throws Exception CLUSTERS.initClusterRole(haGroupName2, HighAvailabilityPolicy.FAILOVER); Optional haGroup2 = Optional.empty(); try { + HAURLInfo haURLInfo = HighAvailabilityGroup.getUrlInfo(jdbcUrl, clientProperties); haGroup2 = HighAvailabilityGroup.get(jdbcUrl, clientProperties); assertTrue(haGroup2.isPresent()); assertNotSame(haGroup2.get(), haGroup); // get a new connection in this new HA group; should be pointing to ACTIVE cluster1 - try (Connection connection = haGroup2.get().connect(clientProperties)) { + try (Connection connection = haGroup2.get().connect(clientProperties, haURLInfo)) { assertNotNull(connection); assertNotNull(connection.unwrap(FailoverPhoenixConnection.class)); } @@ -489,11 +673,12 @@ public void testCanConnectNewGroupWhenStandbyZKClusterDown() throws Exception { doTestWhenOneZKDown(CLUSTERS.getHBaseCluster2(), () -> { Optional haGroup2 = Optional.empty(); try { + HAURLInfo haURLInfo = HighAvailabilityGroup.getUrlInfo(jdbcUrl, clientProperties); haGroup2 = HighAvailabilityGroup.get(jdbcUrl, clientProperties); assertTrue(haGroup2.isPresent()); assertNotSame(haGroup2.get(), haGroup); // get a new connection in this new HA group; should be pointing to ACTIVE cluster1 - Connection connection = haGroup2.get().connect(clientProperties); + Connection connection = haGroup2.get().connect(clientProperties, haURLInfo); assertNotNull(connection); assertNotNull(connection.unwrap(FailoverPhoenixConnection.class)); } finally { @@ -509,7 +694,7 @@ public void testCanConnectNewGroupWhenStandbyZKClusterDown() throws Exception { public void testCanNotEstablishConnectionWhenActiveHBaseClusterDown() throws Exception { doTestWhenOneHBaseDown(CLUSTERS.getHBaseCluster1(), () -> { try { - haGroup.connectActive(clientProperties); + haGroup.connectActive(clientProperties, haURLInfo); fail("Should have failed because ACTIVE HBase cluster is down."); } catch (SQLException e) { LOG.info("Got expected exception when ACTIVE HBase cluster is down", e); @@ -529,7 +714,7 @@ public void testCanNotEstablishConnectionWhenActiveHBaseClusterDown() throws Exc public void testConnectActiveWhenActiveZKClusterRestarts() throws Exception { doTestWhenOneZKDown(CLUSTERS.getHBaseCluster1(), () -> { try { - haGroup.connectActive(clientProperties); + haGroup.connectActive(clientProperties, haURLInfo); fail("Should have failed because of ACTIVE ZK cluster is down."); } catch (SQLException e) { LOG.info("Got expected exception when ACTIVE ZK cluster is down", e); @@ -537,7 +722,7 @@ public void testConnectActiveWhenActiveZKClusterRestarts() throws Exception { } }); - try (Connection conn = haGroup.connectActive(clientProperties)) { + try (Connection conn = haGroup.connectActive(clientProperties, haURLInfo)) { assertNotNull(conn); LOG.info("Successfully connect to HA group {} after restarting ACTIVE ZK", haGroup); } // all other exceptions will fail the test diff --git a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityGroupTestIT.java b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityGroupTestIT.java index 304b048e498..b8bd5c61faa 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityGroupTestIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityGroupTestIT.java @@ -90,6 +90,7 @@ public class HighAvailabilityGroupTestIT { private final ClusterRoleRecord record = mock(ClusterRoleRecord.class); /** The HA group to test. This is spied but not mocked. */ private HighAvailabilityGroup haGroup; + private HAURLInfo haURLInfo; @Rule public final TestName testName = new TestName(); @@ -129,6 +130,7 @@ public void init() { HAGroupInfo haGroupInfo = new HAGroupInfo(haGroupName, ZK1, ZK2); haGroup = spy(new HighAvailabilityGroup(haGroupInfo, clientProperties, record, READY)); + haURLInfo = spy(new HAURLInfo(haGroupName)); } /** @@ -138,18 +140,19 @@ public void init() { */ @Test public void testConnect() throws SQLException { - final Connection conn = haGroup.connect(clientProperties); + final Connection conn = haGroup.connect(clientProperties, haURLInfo); assertTrue(conn instanceof FailoverPhoenixConnection); FailoverPhoenixConnection failoverConnection = conn.unwrap(FailoverPhoenixConnection.class); assertNotNull(failoverConnection); // Verify that the failover should have connected to ACTIVE cluster once - verify(haGroup, times(1)).connectActive(any(Properties.class)); - verify(haGroup, times(1)).connectToOneCluster(anyString(), eq(clientProperties)); + verify(haGroup, times(1)).connectActive(any(Properties.class), any(HAURLInfo.class)); + verify(haGroup, times(1)).connectToOneCluster(anyString(), + eq(clientProperties), any(HAURLInfo.class)); verify(DRIVER, atLeastOnce()).getConnectionQueryServices(anyString(), eq(clientProperties)); when(record.getPolicy()).thenReturn(HighAvailabilityPolicy.PARALLEL); // get a new connection from this HA group - final Connection conn2 = haGroup.connect(clientProperties); + final Connection conn2 = haGroup.connect(clientProperties, haURLInfo); assertTrue(conn2 instanceof ParallelPhoenixConnection); } @@ -161,7 +164,7 @@ public void testConnectShouldFailWhenNotReady() throws SQLException { final HAGroupInfo info = haGroup.getGroupInfo(); haGroup = spy(new HighAvailabilityGroup(info, clientProperties, record, UNINITIALIZED)); try { - haGroup.connect(clientProperties); + haGroup.connect(clientProperties, haURLInfo); fail("Should have failed since HA group is not READY!"); } catch (SQLException e) { LOG.info("Got expected exception", e); @@ -178,11 +181,11 @@ public void testConnectShouldFailWhenNotReady() throws SQLException { public void testConnectToOneCluster() throws SQLException { // test with JDBC string final String jdbcString = String.format("jdbc:phoenix:%s", ZK1); - haGroup.connectToOneCluster(jdbcString, clientProperties); + haGroup.connectToOneCluster(jdbcString, clientProperties, haURLInfo); verify(DRIVER, times(1)).getConnectionQueryServices(anyString(), eq(clientProperties)); // test with only ZK string - haGroup.connectToOneCluster(ZK1, clientProperties); + haGroup.connectToOneCluster(ZK1, clientProperties, haURLInfo); verify(DRIVER, times(2)).getConnectionQueryServices(anyString(), eq(clientProperties)); } @@ -195,7 +198,7 @@ public void testConnectToOneClusterShouldFailIfNotConnectable() throws SQLExcept // test with JDBC string and UNKNOWN cluster role final String jdbcString = String.format("jdbc:phoenix:%s", ZK1); try { - haGroup.connectToOneCluster(jdbcString, clientProperties); + haGroup.connectToOneCluster(jdbcString, clientProperties, haURLInfo); fail("Should have failed because cluster is in UNKNOWN role"); } catch (SQLException e) { // expected exception assertEquals(SQLExceptionCode.HA_CLUSTER_CAN_NOT_CONNECT.getErrorCode(), @@ -206,7 +209,7 @@ public void testConnectToOneClusterShouldFailIfNotConnectable() throws SQLExcept // test with only ZK string and OFFLINE cluster role when(record.getRole(eq(ZK1))).thenReturn(ClusterRole.OFFLINE); try { - haGroup.connectToOneCluster(jdbcString, clientProperties); + haGroup.connectToOneCluster(jdbcString, clientProperties, haURLInfo); fail("Should have failed because cluster is in OFFLINE role"); } catch (SQLException e) { // expected exception assertEquals(SQLExceptionCode.HA_CLUSTER_CAN_NOT_CONNECT.getErrorCode(), @@ -221,7 +224,7 @@ public void testConnectToOneClusterShouldFailIfNotConnectable() throws SQLExcept @Test (expected = IllegalArgumentException.class) public void testConnectToOneClusterShouldFailWithNonHAJdbcString() throws SQLException { final String jdbcString = "jdbc:phoenix:dummyhost"; - haGroup.connectToOneCluster(jdbcString, clientProperties); + haGroup.connectToOneCluster(jdbcString, clientProperties, haURLInfo); verify(DRIVER, never()).getConnectionQueryServices(anyString(), eq(clientProperties)); } @@ -233,7 +236,7 @@ public void testConnectToOneClusterShouldNotFailWithDifferentHostOrderJdbcString // test with JDBC string final String hosts = "zk1-2,zk1-1:2181:/hbase"; final String jdbcString = String.format("jdbc:phoenix+zk:%s", hosts); - haGroup.connectToOneCluster(jdbcString, clientProperties); + haGroup.connectToOneCluster(jdbcString, clientProperties, haURLInfo); verify(DRIVER, times(1)).getConnectionQueryServices(eq(String.format("jdbc:phoenix+zk:%s",ZK1)), eq(clientProperties)); } @@ -260,7 +263,7 @@ public void testGetShouldFailWithoutHAGroupName() throws SQLException { @Test public void testIsConnectionActive() throws SQLException { assertFalse(haGroup.isActive(null)); - PhoenixConnection connection = haGroup.connectToOneCluster(ZK1, clientProperties); + PhoenixConnection connection = haGroup.connectToOneCluster(ZK1, clientProperties, haURLInfo); assertTrue(haGroup.isActive(connection)); } diff --git a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityTestingUtility.java b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityTestingUtility.java index 1cb6c0ba799..a733e2eac86 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityTestingUtility.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityTestingUtility.java @@ -346,9 +346,23 @@ public String getJdbcHAUrl() { return getJdbcUrl(String.format("[%s|%s]", url1, url2)); } + public String getJdbcHAUrl(String principal) { + return getJdbcUrl(String.format("[%s|%s]", url1, url2), principal); + } + + public String getJdbcHAUrlWithoutPrincipal() { + return getJdbcUrlWithoutPrincipal(String.format("[%s|%s]", url1, url2)); + } + + public String getJdbcUrl1() { return getJdbcUrl(url1); } + + public String getJdbcUrl1(String principal) { + return getJdbcUrl(url1, principal); + } + public String getJdbcUrl2() { return getJdbcUrl(url2); } @@ -357,6 +371,14 @@ public String getJdbcUrl(String zkUrl) { return String.format("jdbc:phoenix+zk:%s:%s", zkUrl, PRINCIPAL); } + public String getJdbcUrl(String zkUrl, String principal) { + return String.format("jdbc:phoenix+zk:%s:%s", zkUrl, principal); + } + + public String getJdbcUrlWithoutPrincipal(String zkUrl) { + return String.format("jdbc:phoenix+zk:%s", zkUrl); + } + public String getUrl1() { return url1; } diff --git a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionIT.java index 7ecacd2261b..5e4b7f89186 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionIT.java @@ -137,24 +137,25 @@ public void testUserPrincipal() throws Exception { try (Connection conn = getParallelConnection()) { ParallelPhoenixConnection pr = conn.unwrap(ParallelPhoenixConnection.class); ParallelPhoenixContext context = pr.getContext(); + HAURLInfo haurlInfo = context.getHaurlInfo(); HighAvailabilityGroup.HAGroupInfo group = context.getHaGroup().getGroupInfo(); if (CLUSTERS.getUrl1().compareTo(CLUSTERS.getUrl2()) <= 0) { - Assert.assertEquals(CLUSTERS.getJdbcUrl1(), group.getJDBCUrl1()); - Assert.assertEquals(CLUSTERS.getJdbcUrl2(), group.getJDBCUrl2()); + Assert.assertEquals(CLUSTERS.getJdbcUrl1(), group.getJDBCUrl1(haurlInfo)); + Assert.assertEquals(CLUSTERS.getJdbcUrl2(), group.getJDBCUrl2(haurlInfo)); } else { - Assert.assertEquals(CLUSTERS.getJdbcUrl2(), group.getJDBCUrl1()); - Assert.assertEquals(CLUSTERS.getJdbcUrl1(), group.getJDBCUrl2()); + Assert.assertEquals(CLUSTERS.getJdbcUrl2(), group.getJDBCUrl1(haurlInfo)); + Assert.assertEquals(CLUSTERS.getJdbcUrl1(), group.getJDBCUrl2(haurlInfo)); } ConnectionQueryServices cqsi; // verify connection#1 - cqsi = PhoenixDriver.INSTANCE.getConnectionQueryServices(group.getJDBCUrl1(), clientProperties); + cqsi = PhoenixDriver.INSTANCE.getConnectionQueryServices(group.getJDBCUrl1(haurlInfo), clientProperties); Assert.assertEquals(HBaseTestingUtilityPair.PRINCIPAL, cqsi.getUserName()); PhoenixConnection pConn = pr.getFutureConnection1().get(); ConnectionQueryServices cqsiFromConn = pConn.getQueryServices(); Assert.assertEquals(HBaseTestingUtilityPair.PRINCIPAL, cqsiFromConn.getUserName()); Assert.assertTrue(cqsi == cqsiFromConn); // verify connection#2 - cqsi = PhoenixDriver.INSTANCE.getConnectionQueryServices(group.getJDBCUrl2(), clientProperties); + cqsi = PhoenixDriver.INSTANCE.getConnectionQueryServices(group.getJDBCUrl2(haurlInfo), clientProperties); Assert.assertEquals(HBaseTestingUtilityPair.PRINCIPAL, cqsi.getUserName()); pConn = pr.getFutureConnection2().get(); cqsiFromConn = pConn.getQueryServices(); diff --git a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/FailoverPhoenixConnectionTest.java b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/FailoverPhoenixConnectionTest.java index 8ee3d028bdd..dcf00005324 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/FailoverPhoenixConnectionTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/FailoverPhoenixConnectionTest.java @@ -59,15 +59,18 @@ public class FailoverPhoenixConnectionTest { @Mock HighAvailabilityGroup haGroup; final HAGroupInfo haGroupInfo = new HAGroupInfo("fake", "zk1", "zk2"); + final HAURLInfo haURLInfo = new HAURLInfo("fake"); + FailoverPhoenixContext context; FailoverPhoenixConnection failoverConnection; // this connection itself is not mocked or spied. + @Before public void init() throws SQLException { MockitoAnnotations.initMocks(this); when(haGroup.getGroupInfo()).thenReturn(haGroupInfo); - when(haGroup.connectActive(any(Properties.class))).thenReturn(connection1); - - failoverConnection = new FailoverPhoenixConnection(haGroup, new Properties()); + when(haGroup.connectActive(any(Properties.class), any(HAURLInfo.class))).thenReturn(connection1); + context = new FailoverPhoenixContext(new Properties(), haGroup, haURLInfo); + failoverConnection = new FailoverPhoenixConnection(context); } /** @@ -92,7 +95,7 @@ public void testWrapActionDuringFailover() throws SQLException { @Test public void testFailover() throws SQLException { // Make HAGroup return a different phoenix connection when it gets called next time - when(haGroup.connectActive(any(Properties.class))).thenReturn(connection2); + when(haGroup.connectActive(any(Properties.class), any(HAURLInfo.class))).thenReturn(connection2); // explicit call failover failoverConnection.failover(1000L); @@ -128,7 +131,7 @@ public void testFailoverStatic() throws SQLException { public void testActiveFailoverIsNoOp() throws SQLException { when(haGroup.isActive(connection1)).thenReturn(true); // Make HAGroup return a different phoenix connection when it gets called next time - when(haGroup.connectActive(any(Properties.class))).thenReturn(connection2); + when(haGroup.connectActive(any(Properties.class), any(HAURLInfo.class))).thenReturn(connection2); failoverConnection.failover(1000L); @@ -145,11 +148,12 @@ public void testFailoverToActivePolicy() throws SQLException { Properties properties = new Properties(); properties.setProperty(FailoverPolicy.PHOENIX_HA_FAILOVER_POLICY_ATTR, FailoverPolicy.FailoverToActivePolicy.NAME); - failoverConnection = new FailoverPhoenixConnection(haGroup, properties); + FailoverPhoenixContext context = new FailoverPhoenixContext(properties, haGroup, haURLInfo); + failoverConnection = new FailoverPhoenixConnection(context); LOG.info("Close the wrapped phoenix connection due to failover..."); // Make HAGroup return a different phoenix connection when it gets called next time - when(haGroup.connectActive(any(Properties.class))).thenReturn(connection2); + when(haGroup.connectActive(any(Properties.class), any(HAURLInfo.class))).thenReturn(connection2); // Mimic wrapped phoenix connection gets closed by HA group doThrow(new FailoverSQLException("", "", new Exception())).when(connection1).commit(); @@ -204,8 +208,8 @@ public void testCloseOnceMore() throws SQLException { @Test public void testCheckConnection() throws SQLException { // Make the wrapped phoenix connection null. This could happen if HAGroup is failing. - when(haGroup.connectActive(any(Properties.class))).thenReturn(null); - failoverConnection = new FailoverPhoenixConnection(haGroup, new Properties()); + when(haGroup.connectActive(any(Properties.class), any(HAURLInfo.class))).thenReturn(null); + failoverConnection = new FailoverPhoenixConnection(context); assertNull(failoverConnection.getWrappedConnection()); try { diff --git a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionFailureTest.java b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionFailureTest.java index c64252d5304..064f587e207 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionFailureTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionFailureTest.java @@ -72,7 +72,7 @@ public void testExecuteQueryChainFailure() throws SQLException { new ParallelPhoenixContext(new Properties(), Mockito.mock(HighAvailabilityGroup.class), HighAvailabilityTestingUtility.getListOfSingleThreadExecutorServices(), - null); + null, Mockito.mock(HAURLInfo.class)); ParallelPhoenixConnection parallelConn = new ParallelPhoenixConnection(context, CompletableFuture.completedFuture(connSpy1), CompletableFuture.completedFuture(connSpy2)); diff --git a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionTest.java b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionTest.java index cdc8c2ddb5e..4c1c4e85537 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionTest.java @@ -53,7 +53,7 @@ public class ParallelPhoenixConnectionTest { @Before public void init() throws SQLException { context = new ParallelPhoenixContext(new Properties(), Mockito.mock(HighAvailabilityGroup.class), - HighAvailabilityTestingUtility.getListOfSingleThreadExecutorServices(), null); + HighAvailabilityTestingUtility.getListOfSingleThreadExecutorServices(), null, Mockito.mock(HAURLInfo.class)); parallelPhoenixConnection = new ParallelPhoenixConnection(context,CompletableFuture.completedFuture(connection1),CompletableFuture.completedFuture(connection2)); } @@ -185,7 +185,8 @@ public void testOpenConnection1Delay() throws Exception { "1000"); ParallelPhoenixContext context = new ParallelPhoenixContext(properties, Mockito.mock(HighAvailabilityGroup.class), - HighAvailabilityTestingUtility.getListOfSingleThreadExecutorServices(), null); + HighAvailabilityTestingUtility.getListOfSingleThreadExecutorServices(), null, + Mockito.mock(HAURLInfo.class)); CountDownLatch cdl = new CountDownLatch(1); CompletableFuture futureConnection1 = CompletableFuture.supplyAsync(getDelayConnectionSupplier(cdl, connection1)); @@ -212,7 +213,8 @@ public void testOpenConnection2Delay() throws Exception { "1000"); ParallelPhoenixContext context = new ParallelPhoenixContext(properties, Mockito.mock(HighAvailabilityGroup.class), - HighAvailabilityTestingUtility.getListOfSingleThreadExecutorServices(), null); + HighAvailabilityTestingUtility.getListOfSingleThreadExecutorServices(), null, + Mockito.mock(HAURLInfo.class)); CountDownLatch cdl = new CountDownLatch(1); CompletableFuture futureConnection1 = CompletableFuture.completedFuture(connection1); @@ -239,7 +241,8 @@ public void testOpenBothConnectionDelay() throws SQLException { "1000"); ParallelPhoenixContext context = new ParallelPhoenixContext(properties, Mockito.mock(HighAvailabilityGroup.class), - HighAvailabilityTestingUtility.getListOfSingleThreadExecutorServices(), null); + HighAvailabilityTestingUtility.getListOfSingleThreadExecutorServices(), null, + Mockito.mock(HAURLInfo.class)); CountDownLatch cdl1 = new CountDownLatch(1); CompletableFuture futureConnection1 = CompletableFuture.supplyAsync(getDelayConnectionSupplier(cdl1, connection1)); @@ -346,7 +349,8 @@ public void testConnectionCloseNoTimeout() throws Exception { "1000"); ParallelPhoenixContext context = new ParallelPhoenixContext(properties, Mockito.mock(HighAvailabilityGroup.class), - HighAvailabilityTestingUtility.getListOfSingleThreadExecutorServices(), null); + HighAvailabilityTestingUtility.getListOfSingleThreadExecutorServices(), null, + Mockito.mock(HAURLInfo.class)); parallelPhoenixConnection = new ParallelPhoenixConnection(context, CompletableFuture.completedFuture(connection1), diff --git a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ParallelPhoenixContextTest.java b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ParallelPhoenixContextTest.java index 1d0bc2c898c..cd45b619bf9 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ParallelPhoenixContextTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ParallelPhoenixContextTest.java @@ -70,7 +70,9 @@ public void testContructionFailsWithLessThan2ThreadPools() { ParallelPhoenixContext context = new ParallelPhoenixContext(new Properties(), Mockito.mock(HighAvailabilityGroup.class), - Lists.newArrayList(Mockito.mock(PhoenixHAExecutorServiceProvider.PhoenixHAClusterExecutorServices.class)), null); + Lists.newArrayList(Mockito.mock(PhoenixHAExecutorServiceProvider. + PhoenixHAClusterExecutorServices.class)), null, + Mockito.mock(HAURLInfo.class)); fail("Should not construct with less than 2 ThreadPools"); } catch (IllegalArgumentException e) { } @@ -85,7 +87,8 @@ public void testPool1OutOfCapacity() throws Exception { Mockito.mock(Properties.class), Mockito.mock(ClusterRoleRecord.class), HighAvailabilityGroup.State.READY), - executorList, Lists.newArrayList(Boolean.FALSE, Boolean.TRUE)); + executorList, Lists.newArrayList(Boolean.FALSE, Boolean.TRUE), + Mockito.mock(HAURLInfo.class)); CompletableFuture future1 = context.chainOnConn1(() -> true); assertTrue(future1.isCompletedExceptionally()); assertEquals(0, ((TrackingThreadPoolExecutor) executorList.get(0).getExecutorService()).tasksExecuted.get()); @@ -105,7 +108,8 @@ public void testPool2OutOfCapacity() throws Exception { Mockito.mock(Properties.class), Mockito.mock(ClusterRoleRecord.class), HighAvailabilityGroup.State.READY), - executorList, Lists.newArrayList(Boolean.TRUE, Boolean.FALSE)); + executorList, Lists.newArrayList(Boolean.TRUE, Boolean.FALSE), + Mockito.mock(HAURLInfo.class)); CompletableFuture future1 = context.chainOnConn1(() -> true); assertTrue(future1.get()); assertEquals(1, ((TrackingThreadPoolExecutor) executorList.get(0).getExecutorService()).tasksExecuted.get()); @@ -121,7 +125,8 @@ public void testPoolsHaveCapacity() throws Exception { ParallelPhoenixContext context = new ParallelPhoenixContext(new Properties(), Mockito.mock(HighAvailabilityGroup.class), executorList, - Lists.newArrayList(Boolean.TRUE, Boolean.TRUE)); + Lists.newArrayList(Boolean.TRUE, Boolean.TRUE), + Mockito.mock(HAURLInfo.class)); CompletableFuture future1 = context.chainOnConn1(() -> true); assertTrue(future1.get()); assertEquals(1, ((TrackingThreadPoolExecutor) executorList.get(0).getExecutorService()).tasksExecuted.get()); diff --git a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ParallelPhoenixNullComparingResultSetTest.java b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ParallelPhoenixNullComparingResultSetTest.java index dd6ccb7a1dc..dcea2ad8e62 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ParallelPhoenixNullComparingResultSetTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ParallelPhoenixNullComparingResultSetTest.java @@ -55,7 +55,8 @@ public void init() { Mockito.mock(Properties.class), Mockito.mock(ClusterRoleRecord.class), HighAvailabilityGroup.State.READY), - HighAvailabilityTestingUtility.getListOfSingleThreadExecutorServices(), null); + HighAvailabilityTestingUtility.getListOfSingleThreadExecutorServices(), null, + Mockito.mock(HAURLInfo.class)); rs1 = Mockito.mock(ResultSet.class); rs2 = Mockito.mock(ResultSet.class); completableRs1 = CompletableFuture.completedFuture(rs1); diff --git a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ParallelPhoenixPreparedStatementTest.java b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ParallelPhoenixPreparedStatementTest.java index 1c157d025bd..a5e1a4333ff 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ParallelPhoenixPreparedStatementTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ParallelPhoenixPreparedStatementTest.java @@ -47,7 +47,8 @@ public class ParallelPhoenixPreparedStatementTest { @Before public void init() throws Exception { context = new ParallelPhoenixContext(new Properties(), Mockito.mock(HighAvailabilityGroup.class), - HighAvailabilityTestingUtility.getListOfSingleThreadExecutorServices(), null); + HighAvailabilityTestingUtility.getListOfSingleThreadExecutorServices(), null, + Mockito.mock(HAURLInfo.class)); statement1 = Mockito.mock(PhoenixMonitoredPreparedStatement.class); statement2 = Mockito.mock(PhoenixMonitoredPreparedStatement.class); diff --git a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ParallelPhoenixResultSetTest.java b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ParallelPhoenixResultSetTest.java index 3e9a7e0f254..9e967e1ab67 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ParallelPhoenixResultSetTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ParallelPhoenixResultSetTest.java @@ -53,7 +53,8 @@ public void init() { new ParallelPhoenixContext(new Properties(), null, HighAvailabilityTestingUtility .getListOfSingleThreadExecutorServices(), - null), + null, + Mockito.mock(HAURLInfo.class)), completableRs1, completableRs2); } @@ -109,7 +110,8 @@ public void testRS1WinsNext() throws Exception { new ParallelPhoenixContext(new Properties(), null, HighAvailabilityTestingUtility .getListOfSingleThreadExecutorServices(), - null), + null, + Mockito.mock(HAURLInfo.class)), completableRs1, completableRs2); resultSet.next(); @@ -149,7 +151,8 @@ public void testRS2WinsNext() throws Exception { new ParallelPhoenixContext(new Properties(), null, HighAvailabilityTestingUtility .getListOfSingleThreadExecutorServices(), - null), + null, + Mockito.mock(HAURLInfo.class)), completableRs1, completableRs2); resultSet.next(); @@ -189,7 +192,8 @@ public void testRS1FailsImmediatelyNext() throws Exception { new ParallelPhoenixContext(new Properties(), null, HighAvailabilityTestingUtility .getListOfSingleThreadExecutorServices(), - null), + null, + Mockito.mock(HAURLInfo.class)), completableRs1, completableRs2); resultSet.next(); @@ -250,7 +254,8 @@ public void testRS1SucceedsDuringNext() throws Exception { new ParallelPhoenixContext(new Properties(), null, HighAvailabilityTestingUtility .getListOfSingleThreadExecutorServices(), - null), + null, + Mockito.mock(HAURLInfo.class)), completableRs1, completableRs2); //run next in the background diff --git a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ParallelPhoenixUtilTest.java b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ParallelPhoenixUtilTest.java index 29d672a4e23..726fe049959 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ParallelPhoenixUtilTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ParallelPhoenixUtilTest.java @@ -46,7 +46,8 @@ public class ParallelPhoenixUtilTest { private static final ParallelPhoenixContext context = new ParallelPhoenixContext(new Properties(), null, - HighAvailabilityTestingUtility.getListOfSingleThreadExecutorServices(), null); + HighAvailabilityTestingUtility.getListOfSingleThreadExecutorServices(), null, + Mockito.mock(HAURLInfo.class)); @Test public void getAnyOfNonExceptionallySingleFutureTest() throws Exception { @@ -114,7 +115,7 @@ public void getAnyOfNonExceptionallyTimeoutTest() throws Exception { ParallelPhoenixContext ctx = new ParallelPhoenixContext(props, null, HighAvailabilityTestingUtility.getListOfSingleThreadExecutorServices(), - null); + null, Mockito.mock(HAURLInfo.class)); long startTime = EnvironmentEdgeManager.currentTime(); try { util.getAnyOfNonExceptionally(futures, ctx);