Skip to content

Commit 0f35489

Browse files
richardantalRichard Antal
authored and
Richard Antal
committed
PHOENIX-7474 Migrate IndexTool tables and make sure they are created
1 parent a75c4dc commit 0f35489

15 files changed

+762
-97
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.phoenix.mapreduce.index;
19+
20+
21+
22+
import java.io.IOException;
23+
import java.sql.Connection;
24+
25+
import java.sql.SQLException;
26+
import java.util.UUID;
27+
28+
29+
import org.apache.hadoop.hbase.NamespaceDescriptor;
30+
import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
31+
import org.apache.hadoop.hbase.TableExistsException;
32+
import org.apache.hadoop.hbase.client.*;
33+
import org.apache.phoenix.coprocessorclient.MetaDataProtocol;
34+
import org.apache.phoenix.query.QueryConstants;
35+
36+
import org.apache.hadoop.conf.Configured;
37+
import org.apache.hadoop.hbase.TableName;
38+
39+
import org.apache.phoenix.jdbc.PhoenixConnection;
40+
41+
import org.apache.phoenix.query.ConnectionQueryServices;
42+
43+
import org.apache.phoenix.schema.PTableType;
44+
import org.apache.phoenix.util.ClientUtil;
45+
import org.apache.phoenix.util.SchemaUtil;
46+
import org.slf4j.Logger;
47+
import org.slf4j.LoggerFactory;
48+
49+
import static org.apache.phoenix.query.QueryConstants.SYSTEM_SCHEMA_NAME;
50+
51+
/**
52+
* Utility class to create index tables and/or migrate them.
53+
*
54+
*/
55+
public class IndexToolTableUtil extends Configured {
56+
private static final Logger LOGGER = LoggerFactory.getLogger(IndexToolTableUtil.class);
57+
58+
public final static String OUTPUT_TABLE_NAME = "PHOENIX_INDEX_TOOL";
59+
public static String SYSTEM_OUTPUT_TABLE_NAME = SchemaUtil.getTableName(SYSTEM_SCHEMA_NAME,
60+
OUTPUT_TABLE_NAME);
61+
62+
public final static String RESULT_TABLE_NAME = "PHOENIX_INDEX_TOOL_RESULT";
63+
public static String SYSTEM_RESULT_TABLE_NAME = SchemaUtil.getTableName(SYSTEM_SCHEMA_NAME,
64+
RESULT_TABLE_NAME);
65+
66+
public static void setIndexToolTableName(Connection connection) throws Exception {
67+
ConnectionQueryServices queryServices = connection.unwrap(PhoenixConnection.class).getQueryServices();
68+
if (SchemaUtil.isNamespaceMappingEnabled(PTableType.SYSTEM, queryServices.getConfiguration())) {
69+
SYSTEM_OUTPUT_TABLE_NAME = SchemaUtil.getTableName(SYSTEM_SCHEMA_NAME, OUTPUT_TABLE_NAME).replace(
70+
QueryConstants.NAME_SEPARATOR,
71+
QueryConstants.NAMESPACE_SEPARATOR);
72+
SYSTEM_RESULT_TABLE_NAME = SchemaUtil.getTableName(SYSTEM_SCHEMA_NAME, RESULT_TABLE_NAME).replace(
73+
QueryConstants.NAME_SEPARATOR,
74+
QueryConstants.NAMESPACE_SEPARATOR);
75+
} else {
76+
SYSTEM_OUTPUT_TABLE_NAME = SchemaUtil.getTableName(SYSTEM_SCHEMA_NAME, OUTPUT_TABLE_NAME);
77+
SYSTEM_RESULT_TABLE_NAME = SchemaUtil.getTableName(SYSTEM_SCHEMA_NAME, RESULT_TABLE_NAME);
78+
}
79+
}
80+
81+
public static Table createResultTable(Connection connection) throws IOException, SQLException {
82+
ConnectionQueryServices queryServices = connection.unwrap(PhoenixConnection.class).getQueryServices();
83+
try (Admin admin = queryServices.getAdmin()) {
84+
TableName resultTableName = TableName.valueOf(SYSTEM_RESULT_TABLE_NAME);
85+
if (SYSTEM_RESULT_TABLE_NAME.contains(QueryConstants.NAMESPACE_SEPARATOR)) {
86+
createSystemNamespaceTable(connection);
87+
}
88+
return createTable(admin, resultTableName);
89+
}
90+
}
91+
92+
public static Table createOutputTable(Connection connection) throws IOException, SQLException {
93+
ConnectionQueryServices queryServices = connection.unwrap(PhoenixConnection.class).getQueryServices();
94+
try (Admin admin = queryServices.getAdmin()) {
95+
TableName outputTableName = TableName.valueOf(SYSTEM_OUTPUT_TABLE_NAME);
96+
if (SYSTEM_OUTPUT_TABLE_NAME.contains(QueryConstants.NAMESPACE_SEPARATOR)) {
97+
createSystemNamespaceTable(connection);
98+
}
99+
return createTable(admin, outputTableName);
100+
}
101+
}
102+
103+
public static void createSystemNamespaceTable(Connection connection) throws IOException, SQLException {
104+
ConnectionQueryServices queryServices = connection.unwrap(PhoenixConnection.class).getQueryServices();
105+
if (SchemaUtil.isNamespaceMappingEnabled(PTableType.SYSTEM, queryServices.getConfiguration())) {
106+
try (Admin admin = queryServices.getAdmin()) {
107+
if (!ClientUtil.isHBaseNamespaceAvailable(admin, SYSTEM_SCHEMA_NAME)) {
108+
NamespaceDescriptor namespaceDescriptor =
109+
NamespaceDescriptor.create(SYSTEM_SCHEMA_NAME).build();
110+
admin.createNamespace(namespaceDescriptor);
111+
}
112+
}
113+
}
114+
}
115+
116+
@VisibleForTesting
117+
private static Table createTable(Admin admin, TableName tableName) throws IOException {
118+
if (!admin.tableExists(tableName)) {
119+
ColumnFamilyDescriptor columnDescriptor =
120+
ColumnFamilyDescriptorBuilder
121+
.newBuilder(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES)
122+
.setTimeToLive(MetaDataProtocol.DEFAULT_LOG_TTL)
123+
.build();
124+
TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName)
125+
.setColumnFamily(columnDescriptor).build();
126+
try {
127+
admin.createTable(tableDescriptor);
128+
} catch (TableExistsException e) {
129+
LOGGER.warn("Table exists, ignoring", e);
130+
}
131+
}
132+
return admin.getConnection().getTable(tableName);
133+
}
134+
135+
136+
public static void createNewIndexToolTables(Connection connection) throws Exception {
137+
setIndexToolTableName(connection);
138+
139+
migrateTable(connection, OUTPUT_TABLE_NAME);
140+
migrateTable(connection, RESULT_TABLE_NAME);
141+
}
142+
143+
private static void migrateTable(Connection connection, String tableName) throws Exception {
144+
if (!tableName.equals(OUTPUT_TABLE_NAME) && !tableName.equals(RESULT_TABLE_NAME)) {
145+
LOGGER.info("Only migrating PHOENIX_INDEX_TOOL tables!");
146+
} else {
147+
ConnectionQueryServices queryServices = connection.unwrap(PhoenixConnection.class).getQueryServices();
148+
try (Admin admin = queryServices.getAdmin()) {
149+
TableName oldTableName = TableName.valueOf(tableName);
150+
String newTableNameString = tableName.equals(OUTPUT_TABLE_NAME) ?
151+
SYSTEM_OUTPUT_TABLE_NAME : SYSTEM_RESULT_TABLE_NAME;
152+
153+
TableName newTableName = TableName.valueOf(newTableNameString);
154+
155+
if (admin.tableExists(oldTableName)) {
156+
String snapshotName = tableName + "_" + UUID.randomUUID();
157+
admin.disableTable(oldTableName);
158+
admin.snapshot(snapshotName, oldTableName);
159+
admin.cloneSnapshot(snapshotName, newTableName);
160+
admin.deleteSnapshot(snapshotName);
161+
admin.deleteTable(oldTableName);
162+
} else {
163+
createTable(admin, newTableName);
164+
}
165+
}
166+
}
167+
}
168+
}

phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java

+15
Original file line numberDiff line numberDiff line change
@@ -323,6 +323,8 @@
323323
import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
324324
import org.apache.phoenix.thirdparty.com.google.common.collect.Sets;
325325

326+
import org.apache.phoenix.mapreduce.index.IndexToolTableUtil;
327+
326328
public class ConnectionQueryServicesImpl extends DelegateQueryServices implements ConnectionQueryServices {
327329
private static final Logger LOGGER =
328330
LoggerFactory.getLogger(ConnectionQueryServicesImpl.class);
@@ -4025,6 +4027,12 @@ private void createOtherSystemTables(PhoenixConnection metaConnection) throws SQ
40254027
try {
40264028
metaConnection.createStatement().executeUpdate(getCDCStreamDDL());
40274029
} catch (TableAlreadyExistsException ignore) {}
4030+
try {
4031+
// check if we have old PHOENIX_INDEX_TOOL tables
4032+
// move data to the new tables under System, or simply create the new tables
4033+
IndexToolTableUtil.createNewIndexToolTables(metaConnection);
4034+
4035+
} catch (Exception ignore) {}
40284036
}
40294037

40304038
/**
@@ -4588,6 +4596,13 @@ public void upgradeSystemTables(final String url, final Properties props) throws
45884596
// with SYSTEM Namespace
45894597
createSchemaIfNotExistsSystemNSMappingEnabled(metaConnection);
45904598

4599+
try {
4600+
// check if we have old PHOENIX_INDEX_TOOL tables
4601+
// move data to the new tables under System, or simply create the new tables
4602+
IndexToolTableUtil.createNewIndexToolTables(metaConnection);
4603+
4604+
} catch (Exception ignore) {}
4605+
45914606
clearUpgradeRequired();
45924607
success = true;
45934608
} catch (UpgradeInProgressException | UpgradeNotRequiredException e) {

phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java

+14
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@
3838
import java.util.Map;
3939
import java.util.UUID;
4040

41+
import org.apache.phoenix.query.QueryConstants;
42+
import org.apache.phoenix.schema.PTableType;
4143
import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
4244
import org.apache.phoenix.thirdparty.com.google.common.base.Strings;
4345
import org.apache.phoenix.hbase.index.AbstractValueGetter;
@@ -985,10 +987,22 @@ private void setupIndexAndDataTable(Connection connection) throws SQLException,
985987
indexTable, qDataTable));
986988
}
987989
qSchemaName = SchemaUtil.normalizeIdentifier(schemaName);
990+
988991
pIndexTable = connection.unwrap(PhoenixConnection.class).getTable(
989992
SchemaUtil.getQualifiedTableName(schemaName, indexTable));
993+
if (SchemaUtil.isNamespaceMappingEnabled(PTableType.SYSTEM, getConf())) {
994+
pIndexTable = connection.unwrap(PhoenixConnection.class).getTable(
995+
SchemaUtil.getQualifiedTableName(schemaName, indexTable).replace(
996+
QueryConstants.NAME_SEPARATOR,
997+
QueryConstants.NAMESPACE_SEPARATOR));
998+
}
999+
9901000
indexType = pIndexTable.getIndexType();
9911001
qIndexTable = SchemaUtil.getQualifiedTableName(schemaName, indexTable);
1002+
if (SchemaUtil.isNamespaceMappingEnabled(PTableType.SYSTEM, getConf())) {
1003+
qIndexTable = qIndexTable.replace(QueryConstants.NAME_SEPARATOR, QueryConstants.NAMESPACE_SEPARATOR);
1004+
}
1005+
9921006
if (IndexType.LOCAL.equals(indexType)) {
9931007
isLocalIndexBuild = true;
9941008
if (useSnapshot) {

phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationOutputRepository.java

+3-30
Original file line numberDiff line numberDiff line change
@@ -19,20 +19,12 @@
1919

2020
import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
2121

22-
import org.apache.hadoop.hbase.TableExistsException;
23-
import org.apache.hadoop.hbase.TableName;
24-
import org.apache.hadoop.hbase.client.Admin;
25-
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
26-
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
2722
import org.apache.hadoop.hbase.client.Put;
2823
import org.apache.hadoop.hbase.client.Result;
2924
import org.apache.hadoop.hbase.client.ResultScanner;
3025
import org.apache.hadoop.hbase.client.Scan;
3126
import org.apache.hadoop.hbase.client.Table;
32-
import org.apache.hadoop.hbase.client.TableDescriptor;
33-
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
3427
import org.apache.hadoop.hbase.util.Bytes;
35-
import org.apache.phoenix.coprocessorclient.MetaDataProtocol;
3628
import org.apache.phoenix.hbase.index.table.HTableFactory;
3729
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
3830
import org.apache.phoenix.jdbc.PhoenixConnection;
@@ -60,8 +52,8 @@ public class IndexVerificationOutputRepository implements AutoCloseable {
6052
IndexTool.IndexDisableLoggingType.NONE;
6153
private boolean shouldLogBeyondMaxLookback = true;
6254

63-
public final static String OUTPUT_TABLE_NAME = "PHOENIX_INDEX_TOOL";
64-
public final static byte[] OUTPUT_TABLE_NAME_BYTES = Bytes.toBytes(OUTPUT_TABLE_NAME);
55+
public final static String OUTPUT_TABLE_NAME = IndexToolTableUtil.SYSTEM_OUTPUT_TABLE_NAME;
56+
public final static byte[] OUTPUT_TABLE_NAME_BYTES = Bytes.toBytes(IndexToolTableUtil.SYSTEM_OUTPUT_TABLE_NAME);
6557
public final static byte[] OUTPUT_TABLE_COLUMN_FAMILY = QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES;
6658

6759
public final static String DATA_TABLE_NAME = "DTName";
@@ -177,26 +169,7 @@ private static byte[] generatePartialOutputTableRowKey(long ts, byte[] indexTabl
177169
}
178170

179171
public void createOutputTable(Connection connection) throws IOException, SQLException {
180-
ConnectionQueryServices queryServices = connection.unwrap(PhoenixConnection.class).getQueryServices();
181-
try (Admin admin = queryServices.getAdmin()) {
182-
TableName outputTableName = TableName.valueOf(OUTPUT_TABLE_NAME);
183-
if (!admin.tableExists(outputTableName)) {
184-
ColumnFamilyDescriptor columnDescriptor =
185-
ColumnFamilyDescriptorBuilder
186-
.newBuilder(OUTPUT_TABLE_COLUMN_FAMILY)
187-
.setTimeToLive(MetaDataProtocol.DEFAULT_LOG_TTL)
188-
.build();
189-
TableDescriptor tableDescriptor = TableDescriptorBuilder
190-
.newBuilder(TableName.valueOf(OUTPUT_TABLE_NAME))
191-
.setColumnFamily(columnDescriptor).build();
192-
try {
193-
admin.createTable(tableDescriptor);
194-
} catch (TableExistsException e) {
195-
LOGGER.warn("Table exists, ignoring", e);
196-
}
197-
outputTable = admin.getConnection().getTable(outputTableName);
198-
}
199-
}
172+
outputTable = IndexToolTableUtil.createOutputTable(connection);
200173
}
201174

202175
@VisibleForTesting

phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationResultRepository.java

+6-34
Original file line numberDiff line numberDiff line change
@@ -19,27 +19,18 @@
1919

2020
import org.apache.hadoop.hbase.Cell;
2121

22-
import org.apache.hadoop.hbase.TableExistsException;
23-
import org.apache.hadoop.hbase.TableName;
24-
import org.apache.hadoop.hbase.client.Admin;
25-
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
26-
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
2722
import org.apache.hadoop.hbase.client.Get;
2823
import org.apache.hadoop.hbase.client.Put;
2924
import org.apache.hadoop.hbase.client.Result;
3025
import org.apache.hadoop.hbase.client.ResultScanner;
3126
import org.apache.hadoop.hbase.client.Scan;
3227
import org.apache.hadoop.hbase.client.Table;
33-
import org.apache.hadoop.hbase.client.TableDescriptor;
34-
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
3528
import org.apache.hadoop.hbase.regionserver.Region;
3629
import org.apache.hadoop.hbase.util.Bytes;
3730
import org.apache.phoenix.coprocessor.IndexToolVerificationResult;
38-
import org.apache.phoenix.coprocessorclient.MetaDataProtocol;
3931
import org.apache.phoenix.hbase.index.table.HTableFactory;
4032
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
4133
import org.apache.phoenix.jdbc.PhoenixConnection;
42-
import org.apache.phoenix.query.ConnectionQueryServices;
4334
import org.apache.phoenix.query.QueryConstants;
4435
import org.apache.phoenix.util.ByteUtil;
4536
import org.slf4j.Logger;
@@ -58,8 +49,8 @@ public class IndexVerificationResultRepository implements AutoCloseable {
5849
private Table indexTable;
5950
public static final String ROW_KEY_SEPARATOR = "|";
6051
public static final byte[] ROW_KEY_SEPARATOR_BYTE = Bytes.toBytes(ROW_KEY_SEPARATOR);
61-
public final static String RESULT_TABLE_NAME = "PHOENIX_INDEX_TOOL_RESULT";
62-
public final static byte[] RESULT_TABLE_NAME_BYTES = Bytes.toBytes(RESULT_TABLE_NAME);
52+
public static String RESULT_TABLE_NAME = IndexToolTableUtil.SYSTEM_RESULT_TABLE_NAME;
53+
public static byte[] RESULT_TABLE_NAME_BYTES = Bytes.toBytes(IndexToolTableUtil.SYSTEM_RESULT_TABLE_NAME);
6354
public final static byte[] RESULT_TABLE_COLUMN_FAMILY = QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES;
6455
public final static String SCANNED_DATA_ROW_COUNT = "ScannedDataRowCount";
6556
public final static byte[] SCANNED_DATA_ROW_COUNT_BYTES = Bytes.toBytes(SCANNED_DATA_ROW_COUNT);
@@ -152,37 +143,18 @@ public IndexVerificationResultRepository(){
152143
}
153144

154145
public IndexVerificationResultRepository(Connection conn, byte[] indexNameBytes) throws SQLException {
155-
resultTable = getTable(conn, RESULT_TABLE_NAME_BYTES);
146+
resultTable = getTable(conn, Bytes.toBytes(IndexToolTableUtil.SYSTEM_RESULT_TABLE_NAME));
156147
indexTable = getTable(conn, indexNameBytes);
157148
}
158149

159150
public IndexVerificationResultRepository(byte[] indexName,
160151
HTableFactory hTableFactory) throws IOException {
161-
resultTable = hTableFactory.getTable(new ImmutableBytesPtr(RESULT_TABLE_NAME_BYTES));
152+
resultTable = hTableFactory.getTable(new ImmutableBytesPtr(Bytes.toBytes(IndexToolTableUtil.SYSTEM_RESULT_TABLE_NAME)));
162153
indexTable = hTableFactory.getTable(new ImmutableBytesPtr(indexName));
163154
}
164155

165156
public void createResultTable(Connection connection) throws IOException, SQLException {
166-
ConnectionQueryServices queryServices = connection.unwrap(PhoenixConnection.class).getQueryServices();
167-
try (Admin admin = queryServices.getAdmin()) {
168-
TableName resultTableName = TableName.valueOf(RESULT_TABLE_NAME);
169-
if (!admin.tableExists(resultTableName)) {
170-
ColumnFamilyDescriptor columnDescriptor =
171-
ColumnFamilyDescriptorBuilder
172-
.newBuilder(RESULT_TABLE_COLUMN_FAMILY)
173-
.setTimeToLive(MetaDataProtocol.DEFAULT_LOG_TTL)
174-
.build();
175-
TableDescriptor tableDescriptor =
176-
TableDescriptorBuilder.newBuilder(resultTableName)
177-
.setColumnFamily(columnDescriptor).build();
178-
try {
179-
admin.createTable(tableDescriptor);
180-
} catch (TableExistsException e) {
181-
LOGGER.warn("Table exists, ignoring", e);
182-
}
183-
resultTable = admin.getConnection().getTable(resultTableName);
184-
}
185-
}
157+
resultTable = IndexToolTableUtil.createResultTable(connection);
186158
}
187159

188160
private static byte[] generatePartialResultTableRowKey(long ts, byte[] indexTableName) {
@@ -356,7 +328,7 @@ private IndexToolVerificationResult aggregateVerificationResult(
356328

357329
public IndexToolVerificationResult getVerificationResult(Connection conn,
358330
long ts, byte[] indexTableName) throws IOException, SQLException {
359-
try (Table hTable = getTable(conn, RESULT_TABLE_NAME_BYTES)) {
331+
try (Table hTable = getTable(conn, Bytes.toBytes(IndexToolTableUtil.SYSTEM_RESULT_TABLE_NAME))) {
360332
byte[] startRowKey = generatePartialResultTableRowKey(ts,
361333
indexTableName);
362334
byte[] stopRowKey = ByteUtil.calculateTheClosestNextRowKeyForPrefix(

phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java

+2
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,10 @@
3535
import org.apache.phoenix.jdbc.PhoenixConnection;
3636
import org.apache.phoenix.mapreduce.util.ConnectionUtil;
3737
import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
38+
import org.apache.phoenix.query.QueryServices;
3839
import org.apache.phoenix.schema.PIndexState;
3940
import org.apache.phoenix.schema.PTable;
41+
import org.apache.phoenix.schema.PTableType;
4042
import org.apache.phoenix.schema.task.Task;
4143
import org.apache.phoenix.schema.transform.Transform;
4244
import org.apache.phoenix.util.SchemaUtil;

0 commit comments

Comments
 (0)