Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PHOENIX-7478 HBase 3 compatibility changes: Replace ClusterConnection… #2036

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,6 @@
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.CheckAndMutate;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Connection;
Expand Down Expand Up @@ -771,7 +770,13 @@ public ConnectionQueryServices getChildQueryServices(ImmutableBytesWritable tena

@Override
public void clearTableRegionCache(TableName tableName) throws SQLException {
((ClusterConnection)connection).clearRegionCache(tableName);
try {
connection.getRegionLocator(tableName).clearRegionLocationCache();
} catch (IOException e) {
LOGGER.info("Exception while clearing table region cache", e);
//TODO allow passing cause to TableNotFoundException
throw new TableNotFoundException(tableName.toString());
}
}

public byte[] getNextRegionStartKey(HRegionLocation regionLocation, byte[] currentKey,
Expand Down Expand Up @@ -875,8 +880,7 @@ public List<HRegionLocation> getTableRegions(final byte[] tableName, final byte[
currentKey = startRowKey;
do {
HRegionLocation regionLocation =
((ClusterConnection) connection).getRegionLocation(table,
currentKey, false);
connection.getRegionLocator(table).getRegionLocation(currentKey, false);
currentKey =
getNextRegionStartKey(regionLocation, currentKey, prevRegionLocation);
locations.add(regionLocation);
Expand Down Expand Up @@ -2179,8 +2183,9 @@ private MetaDataMutationResult metaDataCoprocessorExec(String tableName, byte[]
long startTime = EnvironmentEdgeManager.currentTimeMillis();
while (true) {
if (retried) {
((ClusterConnection) connection).relocateRegion(
SchemaUtil.getPhysicalName(systemTableName, this.getProps()), tableKey);
connection.getRegionLocator(SchemaUtil.getPhysicalName(
systemTableName, this.getProps()))
.getRegionLocation(tableKey, true);
}

Table ht = this.getTable(SchemaUtil.getPhysicalName(systemTableName, this.getProps()).getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,6 @@
import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Delete;
Expand Down Expand Up @@ -6573,7 +6572,7 @@ public MutationState changePermissions(ChangePermsStatement changePermsStatement
LOGGER.info(changePermsStatement.toString());

try(Admin admin = connection.getQueryServices().getAdmin()) {
ClusterConnection clusterConnection = (ClusterConnection) admin.getConnection();
org.apache.hadoop.hbase.client.Connection hConnection = admin.getConnection();

if (changePermsStatement.getSchemaName() != null) {
// SYSTEM.CATALOG doesn't have any entry for "default" HBase namespace, hence we will bypass the check
Expand All @@ -6583,7 +6582,7 @@ public MutationState changePermissions(ChangePermsStatement changePermsStatement
connection);
}

changePermsOnSchema(clusterConnection, changePermsStatement);
changePermsOnSchema(hConnection, changePermsStatement);
} else if (changePermsStatement.getTableName() != null) {
PTable inputTable = connection.getTable(SchemaUtil.
normalizeFullTableName(changePermsStatement.getTableName().toString()));
Expand All @@ -6593,11 +6592,11 @@ public MutationState changePermissions(ChangePermsStatement changePermsStatement

// Changing perms on base table and update the perms for global and view indexes
// Views and local indexes are not physical tables and hence update perms is not needed
changePermsOnTables(clusterConnection, admin, changePermsStatement, inputTable);
changePermsOnTables(hConnection, admin, changePermsStatement, inputTable);
} else {

// User can be given perms at the global level
changePermsOnUser(clusterConnection, changePermsStatement);
changePermsOnUser(hConnection, changePermsStatement);
}

} catch (SQLException e) {
Expand All @@ -6612,20 +6611,25 @@ public MutationState changePermissions(ChangePermsStatement changePermsStatement
return new MutationState(0, 0, connection);
}

private void changePermsOnSchema(ClusterConnection clusterConnection, ChangePermsStatement changePermsStatement) throws Throwable {
private void changePermsOnSchema(org.apache.hadoop.hbase.client.Connection hConnection,
ChangePermsStatement changePermsStatement) throws Throwable {
if (changePermsStatement.isGrantStatement()) {
AccessControlClient.grant(clusterConnection, changePermsStatement.getSchemaName(), changePermsStatement.getName(), changePermsStatement.getPermsList());
AccessControlClient.grant(hConnection, changePermsStatement.getSchemaName(),
changePermsStatement.getName(), changePermsStatement.getPermsList());
} else {
AccessControlClient.revoke(clusterConnection, changePermsStatement.getSchemaName(), changePermsStatement.getName(), Permission.Action.values());
AccessControlClient.revoke(hConnection, changePermsStatement.getSchemaName(),
changePermsStatement.getName(), Permission.Action.values());
}
}

private void changePermsOnTables(ClusterConnection clusterConnection, Admin admin, ChangePermsStatement changePermsStatement, PTable inputTable) throws Throwable {
private void changePermsOnTables(org.apache.hadoop.hbase.client.Connection hConnection,
Admin admin, ChangePermsStatement changePermsStatement,
PTable inputTable) throws Throwable {

org.apache.hadoop.hbase.TableName tableName = SchemaUtil.getPhysicalTableName
(inputTable.getPhysicalName().getBytes(), inputTable.isNamespaceMapped());

changePermsOnTable(clusterConnection, changePermsStatement, tableName);
changePermsOnTable(hConnection, changePermsStatement, tableName);

boolean schemaInconsistency = false;
List<PTable> inconsistentTables = null;
Expand All @@ -6646,7 +6650,7 @@ private void changePermsOnTables(ClusterConnection clusterConnection, Admin admi
LOGGER.info("Updating permissions for Index Table: " +
indexTable.getName() + " Base Table: " + inputTable.getName());
tableName = SchemaUtil.getPhysicalTableName(indexTable.getPhysicalName().getBytes(), indexTable.isNamespaceMapped());
changePermsOnTable(clusterConnection, changePermsStatement, tableName);
changePermsOnTable(hConnection, changePermsStatement, tableName);
}

if (schemaInconsistency) {
Expand All @@ -6664,7 +6668,7 @@ private void changePermsOnTables(ClusterConnection clusterConnection, Admin admi
if (viewIndexTableExists) {
LOGGER.info("Updating permissions for View Index Table: " +
Bytes.toString(viewIndexTableBytes) + " Base Table: " + inputTable.getName());
changePermsOnTable(clusterConnection, changePermsStatement, tableName);
changePermsOnTable(hConnection, changePermsStatement, tableName);
} else {
if (inputTable.isMultiTenant()) {
LOGGER.error("View Index Table not found for MultiTenant Table: " + inputTable.getName());
Expand All @@ -6675,23 +6679,28 @@ private void changePermsOnTables(ClusterConnection clusterConnection, Admin admi
}
}

private void changePermsOnTable(ClusterConnection clusterConnection, ChangePermsStatement changePermsStatement, org.apache.hadoop.hbase.TableName tableName)
private void changePermsOnTable(org.apache.hadoop.hbase.client.Connection hConnection,
ChangePermsStatement changePermsStatement,
org.apache.hadoop.hbase.TableName tableName)
throws Throwable {
if (changePermsStatement.isGrantStatement()) {
AccessControlClient.grant(clusterConnection, tableName, changePermsStatement.getName(),
AccessControlClient.grant(hConnection, tableName, changePermsStatement.getName(),
null, null, changePermsStatement.getPermsList());
} else {
AccessControlClient.revoke(clusterConnection, tableName, changePermsStatement.getName(),
AccessControlClient.revoke(hConnection, tableName, changePermsStatement.getName(),
null, null, Permission.Action.values());
}
}

private void changePermsOnUser(ClusterConnection clusterConnection, ChangePermsStatement changePermsStatement)
private void changePermsOnUser(org.apache.hadoop.hbase.client.Connection hConnection,
ChangePermsStatement changePermsStatement)
throws Throwable {
if (changePermsStatement.isGrantStatement()) {
AccessControlClient.grant(clusterConnection, changePermsStatement.getName(), changePermsStatement.getPermsList());
AccessControlClient.grant(hConnection, changePermsStatement.getName(),
changePermsStatement.getPermsList());
} else {
AccessControlClient.revoke(clusterConnection, changePermsStatement.getName(), Permission.Action.values());
AccessControlClient.revoke(hConnection, changePermsStatement.getName(),
Permission.Action.values());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,11 @@
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
Expand All @@ -56,12 +57,9 @@
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.phoenix.cache.GlobalCache;
import org.apache.phoenix.compile.MutationPlan;
import org.apache.phoenix.compile.PostDDLCompiler;
Expand Down Expand Up @@ -677,22 +675,26 @@ public static PhoenixConnection getRebuildIndexConnection(Configuration config)
}

public static boolean tableRegionsOnline(Configuration conf, PTable table) {
try (ClusterConnection hcon =
(ClusterConnection) ConnectionFactory.createConnection(conf)) {
List<HRegionLocation> locations = hcon.locateRegions(
org.apache.hadoop.hbase.TableName.valueOf(table.getPhysicalName().getBytes()));

for (HRegionLocation loc : locations) {
try (Connection hcon = ConnectionFactory.createConnection(conf)) {
Admin admin = hcon.getAdmin();
List<RegionInfo> regionInfos = admin.getRegions(TableName.valueOf(
table.getPhysicalName().getBytes()));
// This makes Number of Regions RPC calls sequentially.
// For large tables this can be slow.
for (RegionInfo regionInfo : regionInfos) {
try {
ServerName sn = loc.getServerName();
if (sn == null) continue;

AdminProtos.AdminService.BlockingInterface admin = hcon.getAdmin(sn);
HBaseRpcController controller = hcon.getRpcControllerFactory().newController();
org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.getRegionInfo(controller,
admin, loc.getRegion().getRegionName());
} catch (RemoteException e) {
LOGGER.debug("Cannot get region " + loc.getRegion().getEncodedName() + " info due to error:" + e);
// We don't actually care about the compaction state, we are only calling this
// because this will trigger a call to the RS (from master), and we want to make
// sure that all RSs are available
// There are only a few methods in HBase 3.0 that are directly calling the RS,
// this is one of them.
admin.getCompactionStateForRegion(regionInfo.getRegionName());
// This used to make a direct RPC call to the region, but HBase 3 makes that
// very hard (needs reflection, or a bridge class in the same package),
// and it's not necessary for checking the RS liveness
} catch (IOException e) {
LOGGER.debug("Cannot get region " + regionInfo.getEncodedName()
+ " info due to error:" + e);
return false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
Expand Down Expand Up @@ -259,7 +258,7 @@ public void testRecoveryRegionPostOpen() throws Exception {
scan = new Scan();
primaryTable.close();
primaryTable = hbaseConn.getTable(TableName.valueOf(DATA_TABLE_NAME));
((ClusterConnection)hbaseConn).clearRegionLocationCache();
hbaseConn.clearRegionLocationCache();
resultScanner = primaryTable.getScanner(scan);
count = 0;
for (Result result : resultScanner) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.util.Bytes;
Expand Down Expand Up @@ -108,8 +107,8 @@ public void testSplitWithCachedMeta() throws Exception {
admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
Configuration configuration = conn.unwrap(PhoenixConnection.class).getQueryServices().getConfiguration();
org.apache.hadoop.hbase.client.Connection hbaseConn = ConnectionFactory.createConnection(configuration);
((ClusterConnection)hbaseConn).clearRegionCache(TableName.valueOf(tableName));
RegionLocator regionLocator = hbaseConn.getRegionLocator(TableName.valueOf(tableName));
regionLocator.clearRegionLocationCache();
int nRegions = regionLocator.getAllRegionLocations().size();
admin.split(tn, ByteUtil.concat(Bytes.toBytes(tenantId), Bytes.toBytes("00A3")));
int retryCount = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.util.Bytes;
Expand Down Expand Up @@ -135,8 +134,8 @@ public void testSplitWithCachedMeta() throws Exception {
conn.unwrap(PhoenixConnection.class).getQueryServices().getConfiguration();
org.apache.hadoop.hbase.client.Connection hbaseConn =
ConnectionFactory.createConnection(configuration);
((ClusterConnection) hbaseConn).clearRegionCache(TableName.valueOf(tableName));
RegionLocator regionLocator = hbaseConn.getRegionLocator(TableName.valueOf(tableName));
regionLocator.clearRegionLocationCache();
int nRegions = regionLocator.getAllRegionLocations().size();
admin.split(tn, ByteUtil.concat(Bytes.toBytes(tenantId), Bytes.toBytes("00A3")));
int retryCount = 0;
Expand Down