Skip to content

Commit f6e29e9

Browse files
authored
HIVE-29254: Display TxnId associated with the query in show processlist command (#6141)
* HIVE-29254-Display TxnId associated with the query in show processlist command * HIVE-29254-Display TxnId associated with the query in show processlist command * HIVE-29254-Display TxnId associated with the query in show processlist command * HIVE-29254-Display TxnId associated with the query in show processlist command * HIVE-29254-Display TxnId associated with the query in show processlist command
1 parent 330b018 commit f6e29e9

File tree

4 files changed

+165
-3
lines changed

4 files changed

+165
-3
lines changed
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.hive.ql.processor;
20+
21+
import org.apache.hadoop.hive.conf.HiveConf;
22+
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
23+
import org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil;
24+
import org.apache.hive.jdbc.miniHS2.MiniHS2;
25+
import org.junit.AfterClass;
26+
import org.junit.Assert;
27+
import org.junit.BeforeClass;
28+
import org.junit.Test;
29+
import org.slf4j.Logger;
30+
import org.slf4j.LoggerFactory;
31+
32+
import java.io.File;
33+
import java.net.URI;
34+
35+
import java.sql.Connection;
36+
import java.sql.DriverManager;
37+
import java.sql.ResultSet;
38+
import java.sql.Statement;
39+
import java.util.Arrays;
40+
import java.util.HashMap;
41+
import java.util.HashSet;
42+
import java.util.Map;
43+
import java.util.Set;
44+
import java.util.Collections;
45+
46+
import java.util.concurrent.LinkedBlockingQueue;
47+
import java.util.concurrent.ThreadPoolExecutor;
48+
import java.util.concurrent.TimeUnit;
49+
50+
public class TestShowProcessList {
51+
protected static final Logger LOG = LoggerFactory.getLogger(TestShowProcessList.class);
52+
53+
private static MiniHS2 miniHS2 = null;
54+
private static HiveConf conf;
55+
private static String user;
56+
private static ThreadPoolExecutor executor;
57+
58+
static HiveConf defaultConf() throws Exception {
59+
String confDir = "../../data/conf/llap/";
60+
HiveConf.setHiveSiteLocation(new URI("file://" + new File(confDir).toURI().getPath() + "/hive-site.xml").toURL());
61+
System.out.println("Setting hive-site: " + HiveConf.getHiveSiteLocation());
62+
HiveConf defaultConf = new HiveConf();
63+
defaultConf.addResource(new URI("file://" + new File(confDir).toURI().getPath() + "/tez-site.xml").toURL());
64+
return defaultConf;
65+
}
66+
67+
@BeforeClass
68+
public static void beforeTest() throws Exception {
69+
conf = defaultConf();
70+
user = System.getProperty("user.name");
71+
conf.setVar(HiveConf.ConfVars.USERS_IN_ADMIN_ROLE, user);
72+
conf.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
73+
"org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory");
74+
String suffix = TestShowProcessList.class.getCanonicalName() + "-" + System.currentTimeMillis();
75+
String dir = new File(System.getProperty("java.io.tmpdir") + File.separator + suffix).getPath()
76+
.replaceAll("\\\\", "/") + "/warehouse";
77+
conf.set(MetastoreConf.ConfVars.WAREHOUSE.name(), dir);
78+
TestTxnDbUtil.setConfValues(conf);
79+
TestTxnDbUtil.prepDb(conf);
80+
MiniHS2.cleanupLocalDir();
81+
Class.forName(MiniHS2.getJdbcDriverName());
82+
miniHS2 = new MiniHS2(conf, MiniHS2.MiniClusterType.LLAP);
83+
Map<String, String> confOverlay = new HashMap<>();
84+
miniHS2.start(confOverlay);
85+
86+
executor = new ThreadPoolExecutor(5, 10, 50, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
87+
}
88+
89+
@AfterClass
90+
public static void afterTest() throws Exception {
91+
if (miniHS2 != null && miniHS2.isStarted()) {
92+
miniHS2.stop();
93+
}
94+
TestTxnDbUtil.cleanDb(conf);
95+
}
96+
97+
@Test
98+
public void testQueries() throws Exception {
99+
int connections = 10;
100+
//Initiate 10 parallel connections, each with a query that begins a transaction.
101+
for (int i = 0; i < connections; i++) {
102+
executor.submit(() -> {
103+
try (Connection con = DriverManager.getConnection(miniHS2.getJdbcURL(), user, "bar");
104+
Statement stmt = con.createStatement()) {
105+
stmt.execute("drop database if exists DB_" + Thread.currentThread().threadId() + " cascade");
106+
} catch (Exception exception) {
107+
LOG.error(exception.getMessage());
108+
LOG.error(Arrays.toString(exception.getStackTrace()));
109+
}
110+
});
111+
}
112+
Set<Integer> txnIds = new HashSet<>();
113+
try (Connection testCon = DriverManager.getConnection(miniHS2.getJdbcURL(), user, "bar");
114+
Statement s = testCon.createStatement()) {
115+
while (executor.getActiveCount() > 0) {
116+
// retrieve txnIds from show processlist output
117+
txnIds.addAll(getTxnIdsFromShowProcesslist(s));
118+
}
119+
}
120+
System.out.println(txnIds);
121+
// max txnId should be equal to the number of connections
122+
int maxTxnId = Collections.max(txnIds);
123+
Assert.assertEquals(maxTxnId, connections);
124+
}
125+
126+
private static Set<Integer> getTxnIdsFromShowProcesslist(Statement s) {
127+
Set<Integer> txnIds = new HashSet<>();
128+
try (ResultSet rs = s.executeQuery("show processlist")) {
129+
while (rs.next()) {
130+
int txnId = Integer.parseInt(rs.getString("Txn ID"));
131+
// TxnId can be 0 because the query has not yet opened txn when show processlist is run.
132+
if (txnId > 0) {
133+
txnIds.add(txnId);
134+
}
135+
}
136+
} catch (Exception exception) {
137+
LOG.error("Exception when checking hive state", exception);
138+
LOG.error(Arrays.toString(exception.getStackTrace()));
139+
140+
}
141+
return txnIds;
142+
}
143+
144+
}

ql/src/java/org/apache/hadoop/hive/ql/processors/ShowProcessListProcessor.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ private Schema getSchema() {
5959
sch.addToFieldSchemas(new FieldSchema("Session Idle Time (s)", STRING_TYPE_NAME, ""));
6060
sch.addToFieldSchemas(new FieldSchema("Query ID", STRING_TYPE_NAME, ""));
6161
sch.addToFieldSchemas(new FieldSchema("State", STRING_TYPE_NAME, ""));
62+
sch.addToFieldSchemas(new FieldSchema("Txn ID", STRING_TYPE_NAME, ""));
6263
sch.addToFieldSchemas(new FieldSchema("Opened Timestamp (s)", STRING_TYPE_NAME, ""));
6364
sch.addToFieldSchemas(new FieldSchema("Elapsed Time (s)", STRING_TYPE_NAME, ""));
6465
sch.addToFieldSchemas(new FieldSchema("Runtime (s)", STRING_TYPE_NAME, ""));
@@ -91,6 +92,7 @@ public CommandProcessorResponse run(String command) throws CommandProcessorExcep
9192
query.getSessionIdleTime(),
9293
query.getQueryId(),
9394
query.getState(),
95+
query.getTxnId(),
9496
query.getBeginTime(),
9597
query.getElapsedTime(),
9698
query.getRuntime()

ql/src/java/org/apache/hadoop/hive/ql/session/ProcessListInfo.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,11 @@ public class ProcessListInfo {
3232
private final String runtime; // tracks only running portion of the query.
3333
private final long elapsedTime;
3434
private final String state;
35+
private final long txnId;
3536

3637
private ProcessListInfo(String userName, String ipAddr, String sessionId, long sessionActiveTime,
3738
long sessionIdleTime, String queryId, String executionEngine, String beginTime,
38-
String runtime, long elapsedTime, String state) {
39+
String runtime, long elapsedTime, String state, long txnId) {
3940
this.userName = userName;
4041
this.ipAddr = ipAddr;
4142
this.sessionId = sessionId;
@@ -47,6 +48,7 @@ private ProcessListInfo(String userName, String ipAddr, String sessionId, long s
4748
this.runtime = runtime;
4849
this.elapsedTime = elapsedTime;
4950
this.state = state;
51+
this.txnId = txnId;
5052
}
5153

5254
public String getSessionId() {
@@ -93,6 +95,10 @@ public String getState() {
9395
return state;
9496
}
9597

98+
public Long getTxnId() {
99+
return txnId;
100+
}
101+
96102
public static class Builder {
97103
private String userName;
98104
private String ipAddr;
@@ -105,6 +111,7 @@ public static class Builder {
105111
private String runtime;
106112
private long elapsedTime;
107113
private String state;
114+
private long txnId;
108115

109116
public Builder setSessionId(String sessionId) {
110117
this.sessionId = sessionId;
@@ -161,10 +168,15 @@ public Builder setState(String state) {
161168
return this;
162169
}
163170

171+
public Builder setTxnId(long txnId) {
172+
this.txnId = txnId;
173+
return this;
174+
}
175+
164176
public ProcessListInfo build() {
165177
ProcessListInfo processListInfo = new ProcessListInfo(userName, ipAddr, sessionId, sessionActiveTime,
166178
sessionIdleTime, queryId, executionEngine, beginTime, runtime,
167-
elapsedTime, state);
179+
elapsedTime, state, txnId);
168180
return processListInfo;
169181
}
170182
}

service/src/java/org/apache/hive/service/cli/operation/ShowProcessListOperation.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,10 @@ private List<ProcessListInfo> getLiveQueryInfos(HiveSession parentSession) {
7171
LocalDateTime beginTime = LocalDateTime.ofInstant(
7272
Instant.ofEpochMilli(query.getBeginTime()), ZoneId.systemDefault()
7373
);
74-
74+
long txnId = 0;
75+
if (op.queryState != null && op.queryState.getTxnManager() != null) {
76+
txnId = op.queryState.getTxnManager().getCurrentTxnId();
77+
}
7578
return new ProcessListInfo.Builder()
7679
.setUserName(session.getUserName())
7780
.setIpAddr(session.getIpAddress())
@@ -84,6 +87,7 @@ private List<ProcessListInfo> getLiveQueryInfos(HiveSession parentSession) {
8487
.setRuntime(query.getRuntime() == null ? "Not finished" : String.valueOf(query.getRuntime() / 1000))
8588
.setElapsedTime(query.getElapsedTime() / 1000)
8689
.setState(query.getState())
90+
.setTxnId(txnId)
8791
.build();
8892
})
8993
.collect(Collectors.toList());

0 commit comments

Comments
 (0)