diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java index 30bc118da91..66baf4a7e8c 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java @@ -85,6 +85,7 @@ import org.apache.phoenix.filter.EncodedQualifiersColumnProjectionFilter; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.hbase.index.util.VersionUtil; +import org.apache.phoenix.job.JobManager; import org.apache.phoenix.join.HashCacheClient; import org.apache.phoenix.monitoring.OverAllQueryMetrics; import org.apache.phoenix.parse.FilterableStatement; @@ -111,7 +112,6 @@ import org.apache.phoenix.schema.stats.GuidePostsInfo; import org.apache.phoenix.schema.stats.GuidePostsKey; import org.apache.phoenix.schema.stats.StatisticsUtil; -import org.apache.phoenix.schema.types.PVarbinaryEncoded; import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.ClientUtil; @@ -1439,6 +1439,8 @@ private List getIterators(List> scan, Connecti SQLException toThrow = null; final HashCacheClient hashCacheClient = new HashCacheClient(context.getConnection()); int queryTimeOut = context.getStatement().getQueryTimeoutInMillis(); + long maxTaskQueueWaitTime = 0; + long maxTaskEndToEndTime = 0; try { submitWork(scan, futures, allIterators, splitSize, isReverse, scanGrouper, maxQueryEndTime); boolean clearedCache = false; @@ -1467,7 +1469,13 @@ private List getIterators(List> scan, Connecti && Bytes.compareTo(scanPair.getFirst().getAttribute(SCAN_START_ROW_SUFFIX), previousScan.getScan().getAttribute(SCAN_START_ROW_SUFFIX))==0)) { continue; } - PeekingResultIterator iterator = scanPair.getSecond().get(timeOutForScan, TimeUnit.MILLISECONDS); + Future futureTask = scanPair.getSecond(); + PeekingResultIterator iterator = futureTask.get(timeOutForScan, + TimeUnit.MILLISECONDS); + long taskQueueWaitTime = JobManager.getTaskQueueWaitTime(futureTask); + long taskEndToEndTime = JobManager.getTaskEndToEndTime(futureTask); + maxTaskQueueWaitTime = Math.max(maxTaskQueueWaitTime, taskQueueWaitTime); + maxTaskEndToEndTime = Math.max(maxTaskEndToEndTime, taskEndToEndTime); concatIterators.add(iterator); previousScan.setScan(scanPair.getFirst()); } catch (ExecutionException e) { @@ -1571,9 +1579,11 @@ private List getIterators(List> scan, Connecti } } } finally { + OverAllQueryMetrics overAllQueryMetrics = context.getOverallQueryMetrics(); + overAllQueryMetrics.updateQueryWaitTime(maxTaskQueueWaitTime); + overAllQueryMetrics.updateQueryTaskEndToEndTime(maxTaskEndToEndTime); if (toThrow != null) { GLOBAL_FAILED_QUERY_COUNTER.increment(); - OverAllQueryMetrics overAllQueryMetrics = context.getOverallQueryMetrics(); overAllQueryMetrics.queryFailed(); if (context.getScanRanges().isPointLookup()) { overAllQueryMetrics.queryPointLookupFailed(); diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/RoundRobinResultIterator.java b/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/RoundRobinResultIterator.java index 2daee7983dd..e2dee154816 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/RoundRobinResultIterator.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/RoundRobinResultIterator.java @@ -19,12 +19,12 @@ import static org.apache.phoenix.thirdparty.com.google.common.base.Preconditions.checkArgument; import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_FAILED_QUERY_COUNTER; +import static org.apache.phoenix.job.JobManager.JobCallable; import java.sql.SQLException; import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; @@ -32,7 +32,10 @@ .ExplainPlanAttributesBuilder; import org.apache.phoenix.compile.QueryPlan; import org.apache.phoenix.compile.StatementContext; +import org.apache.phoenix.job.JobManager; import org.apache.phoenix.monitoring.OverAllQueryMetrics; +import org.apache.phoenix.monitoring.ReadMetricQueue; +import org.apache.phoenix.monitoring.TaskExecutionMetricsHolder; import org.apache.phoenix.query.ConnectionQueryServices; import org.apache.phoenix.schema.tuple.Tuple; import org.apache.phoenix.util.ClientUtil; @@ -167,7 +170,7 @@ public void explain(List planSteps, } @VisibleForTesting - int getNumberOfParallelFetches() { + public int getNumberOfParallelFetches() { return numParallelFetches; } @@ -230,6 +233,8 @@ private List fetchNextBatch() throws SQLException { Collections.shuffle(openIterators); boolean success = false; SQLException toThrow = null; + long maxTaskQueueWaitTime = 0; + long maxTaskEndToEndTime = 0; try { StatementContext context = plan.getContext(); final ConnectionQueryServices services = context.getConnection().getQueryServices(); @@ -238,19 +243,40 @@ private List fetchNextBatch() throws SQLException { if (LOGGER.isDebugEnabled()) { LOGGER.debug("Performing parallel fetch for " + openIterators.size() + " iterators. "); } + String physicalTableName = plan.getTableRef().getTable().getPhysicalName().getString(); for (final RoundRobinIterator itr : openIterators) { - Future future = executor.submit(new Callable() { + ReadMetricQueue readMetricQueue = context.getReadMetricsQueue(); + TaskExecutionMetricsHolder taskMetrics = + new TaskExecutionMetricsHolder(readMetricQueue, physicalTableName); + Future future = executor.submit(new JobCallable() { @Override public Tuple call() throws Exception { // Read the next record to refill the scanner's cache. return itr.next(); } + + @Override + public Object getJobId() { + // Prior to using JobCallable, every callable refilling the scanner cache + // was treated as a separate producer in JobManager queue so, keeping + // that same. Should this be changed to ResultIterators.this ? + return this; + } + + @Override + public TaskExecutionMetricsHolder getTaskExecutionMetric() { + return taskMetrics; + } }); futures.add(future); } int i = 0; for (Future future : futures) { Tuple tuple = future.get(); + long taskQueueWaitTime = JobManager.getTaskQueueWaitTime(future); + long taskEndToEndTime = JobManager.getTaskEndToEndTime(future); + maxTaskQueueWaitTime = Math.max(maxTaskQueueWaitTime, taskQueueWaitTime); + maxTaskEndToEndTime = Math.max(maxTaskEndToEndTime, taskEndToEndTime); if (tuple != null) { results.add(new RoundRobinIterator(openIterators.get(i).delegate, tuple)); } else { @@ -279,9 +305,12 @@ public Tuple call() throws Exception { } } } finally { + OverAllQueryMetrics overAllQueryMetrics = + plan.getContext().getOverallQueryMetrics(); + overAllQueryMetrics.updateQueryWaitTime(maxTaskQueueWaitTime); + overAllQueryMetrics.updateQueryTaskEndToEndTime(maxTaskEndToEndTime); if (toThrow != null) { GLOBAL_FAILED_QUERY_COUNTER.increment(); - OverAllQueryMetrics overAllQueryMetrics = plan.getContext().getOverallQueryMetrics(); overAllQueryMetrics.queryFailed(); if (plan.getContext().getScanRanges().isPointLookup()) { overAllQueryMetrics.queryPointLookupFailed(); diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/job/JobManager.java b/phoenix-core-client/src/main/java/org/apache/phoenix/job/JobManager.java index 8801f0f66b6..02f10acd718 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/job/JobManager.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/job/JobManager.java @@ -26,6 +26,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.FutureTask; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionHandler; @@ -153,6 +154,10 @@ public JobFutureTask(Callable c) { public Object getJobId() { return jobId; } + + public TaskExecutionMetricsHolder getTaskMetric() { + return taskMetric; + } } /** @@ -300,5 +305,31 @@ private static TaskExecutionMetricsHolder getRequestMetric(Runnable task) { return ((JobFutureTask)task).taskMetric; } } + + private static TaskExecutionMetricsHolder getTaskMetric( + Future futureTask) { + if (futureTask instanceof InstrumentedJobFutureTask) { + TaskExecutionMetricsHolder taskMetrics = + ((InstrumentedJobFutureTask) futureTask).getTaskMetric(); + return taskMetrics; + } + return null; + } + + public static long getTaskQueueWaitTime(Future futureTask) { + TaskExecutionMetricsHolder taskMetrics = getTaskMetric(futureTask); + if (taskMetrics != null) { + return taskMetrics.getTaskQueueWaitTime().getValue(); + } + return 0; + } + + public static long getTaskEndToEndTime(Future futureTask) { + TaskExecutionMetricsHolder taskMetrics = getTaskMetric(futureTask); + if (taskMetrics != null) { + return taskMetrics.getTaskEndToEndTime().getValue(); + } + return 0; + } } diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/MetricType.java b/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/MetricType.java index 40b7932a28c..d0494385a57 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/MetricType.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/MetricType.java @@ -141,6 +141,11 @@ public enum MetricType { CACHE_REFRESH_SPLITS_COUNTER("cr", "Number of times cache was refreshed because of splits",LogLevel.DEBUG, PLong.INSTANCE), WALL_CLOCK_TIME_MS("tq", "Wall clock time elapsed for the overall query execution",LogLevel.INFO, PLong.INSTANCE), RESULT_SET_TIME_MS("tn", "Wall clock time elapsed for reading all records using resultSet.next()",LogLevel.INFO, PLong.INSTANCE), + WALL_CLOCK_QUERY_WAIT_TIME("qwt", "Total wall clock time spent by a " + + "query waiting in the Phoenix client thread pool queue", LogLevel.OFF, PLong.INSTANCE), + WALL_CLOCK_QUERY_TASK_END_TO_END_TIME("qeet", "Total wall clock time " + + "spent by a query in task execution in the Phoenix client thread pool" + + "including the queue wait time", LogLevel.OFF, PLong.INSTANCE), OPEN_PHOENIX_CONNECTIONS_COUNTER("o", "Number of open phoenix connections",LogLevel.OFF, PLong.INSTANCE), OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER("io", "Number of open internal phoenix connections",LogLevel.OFF, PLong.INSTANCE), QUERY_SERVICES_COUNTER("cqs", "Number of ConnectionQueryServicesImpl instantiated",LogLevel.OFF, PLong.INSTANCE), diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/OverAllQueryMetrics.java b/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/OverAllQueryMetrics.java index 5038cb32387..d11af8053bb 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/OverAllQueryMetrics.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/OverAllQueryMetrics.java @@ -26,6 +26,8 @@ import static org.apache.phoenix.monitoring.MetricType.QUERY_SCAN_TIMEOUT_COUNTER; import static org.apache.phoenix.monitoring.MetricType.QUERY_TIMEOUT_COUNTER; import static org.apache.phoenix.monitoring.MetricType.RESULT_SET_TIME_MS; +import static org.apache.phoenix.monitoring.MetricType.WALL_CLOCK_QUERY_TASK_END_TO_END_TIME; +import static org.apache.phoenix.monitoring.MetricType.WALL_CLOCK_QUERY_WAIT_TIME; import static org.apache.phoenix.monitoring.MetricType.WALL_CLOCK_TIME_MS; import java.util.HashMap; @@ -50,6 +52,8 @@ public class OverAllQueryMetrics { private final CombinableMetric queryPointLookupFailed; private final CombinableMetric queryScanFailed; private final CombinableMetric cacheRefreshedDueToSplits; + private final CombinableMetric queryWaitTime; + private final CombinableMetric queryTaskEndToEndTime; public OverAllQueryMetrics(boolean isRequestMetricsEnabled, LogLevel connectionLogLevel) { queryWatch = MetricUtil.getMetricsStopWatch(isRequestMetricsEnabled, connectionLogLevel, @@ -72,6 +76,11 @@ public OverAllQueryMetrics(boolean isRequestMetricsEnabled, LogLevel connectionL queryScanFailed = MetricUtil.getCombinableMetric(isRequestMetricsEnabled,connectionLogLevel, QUERY_SCAN_FAILED_COUNTER); cacheRefreshedDueToSplits = MetricUtil.getCombinableMetric(isRequestMetricsEnabled, connectionLogLevel, CACHE_REFRESH_SPLITS_COUNTER); + queryWaitTime = MetricUtil.getCombinableMetric(isRequestMetricsEnabled, + connectionLogLevel, WALL_CLOCK_QUERY_WAIT_TIME); + queryTaskEndToEndTime = MetricUtil.getCombinableMetric(isRequestMetricsEnabled, + connectionLogLevel, WALL_CLOCK_QUERY_TASK_END_TO_END_TIME); + } public void updateNumParallelScans(long numParallelScans) { @@ -132,6 +141,14 @@ public void stopResultSetWatch() { } } + public void updateQueryWaitTime(long queryWaitTime) { + this.queryWaitTime.change(queryWaitTime); + } + + public void updateQueryTaskEndToEndTime(long queryTaskEndToEndTime) { + this.queryTaskEndToEndTime.change(queryTaskEndToEndTime); + } + @VisibleForTesting long getWallClockTimeMs() { return wallClockTimeMS.getValue(); @@ -154,6 +171,9 @@ public Map publish() { metricsForPublish.put(queryPointLookupFailed.getMetricType(), queryPointLookupFailed.getValue()); metricsForPublish.put(queryScanFailed.getMetricType(), queryScanFailed.getValue()); metricsForPublish.put(cacheRefreshedDueToSplits.getMetricType(), cacheRefreshedDueToSplits.getValue()); + metricsForPublish.put(queryWaitTime.getMetricType(), queryWaitTime.getValue()); + metricsForPublish.put(queryTaskEndToEndTime.getMetricType(), + queryTaskEndToEndTime.getValue()); return metricsForPublish; } @@ -170,6 +190,8 @@ public void reset() { cacheRefreshedDueToSplits.reset(); queryWatch.stop(); resultSetWatch.stop(); + queryWaitTime.reset(); + queryTaskEndToEndTime.reset(); } public OverAllQueryMetrics combine(OverAllQueryMetrics metric) { @@ -183,6 +205,8 @@ public OverAllQueryMetrics combine(OverAllQueryMetrics metric) { numParallelScans.combine(metric.numParallelScans); wallClockTimeMS.combine(metric.wallClockTimeMS); resultSetTimeMS.combine(metric.resultSetTimeMS); + queryWaitTime.combine(metric.queryWaitTime); + queryTaskEndToEndTime.combine(metric.queryTaskEndToEndTime); return this; } diff --git a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java index f3495fef914..798fc9a8dc5 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java @@ -75,6 +75,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.metrics2.AbstractMetric; @@ -84,6 +85,7 @@ import org.apache.phoenix.compile.StatementContext; import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; import org.apache.phoenix.exception.SQLExceptionCode; +import org.apache.phoenix.iterate.RoundRobinResultIterator; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixDriver; import org.apache.phoenix.jdbc.PhoenixResultSet; @@ -486,7 +488,9 @@ public void testReadMetricsForSelect() throws Exception { Statement stmt = conn.createStatement(); ResultSet rs = stmt.executeQuery(query); PhoenixResultSet resultSetBeingTested = rs.unwrap(PhoenixResultSet.class); - changeInternalStateForTesting(resultSetBeingTested); + // TODO use a spy ? + ReadMetricQueue testMetricsQueue = new TestReadMetricsQueue(LogLevel.OFF,true); + changeInternalStateForTesting(resultSetBeingTested, testMetricsQueue); while (resultSetBeingTested.next()) {} resultSetBeingTested.close(); Set expectedTableNames = Sets.newHashSet(tableName); @@ -955,12 +959,11 @@ private void assertMetricsHaveSameValues(Map metricNameValueMa } } - private void changeInternalStateForTesting(PhoenixResultSet rs) throws NoSuchFieldException, - SecurityException, IllegalArgumentException, IllegalAccessException { + private void changeInternalStateForTesting(PhoenixResultSet rs, + ReadMetricQueue testMetricsQueue) throws + NoSuchFieldException, SecurityException, IllegalArgumentException, + IllegalAccessException { // get and set the internal state for testing purposes. - // TODO use a spy ? - ReadMetricQueue testMetricsQueue = new TestReadMetricsQueue(LogLevel.OFF,true); - Field rsQueueField = PhoenixResultSet.class.getDeclaredField("readMetricsQueue"); rsQueueField.setAccessible(true); rsQueueField.set(rs, testMetricsQueue); @@ -1298,5 +1301,225 @@ public Connection call() throws Exception { } } + @Test + public void testPhoenixClientQueueWaitTimeAndEndToEndTime() throws SQLException, + NoSuchFieldException, + IllegalAccessException { + int saltBucketNum = 8; + String tableName = generateUniqueName(); + String getRows = "SELECT COL1, COL2, PK4, PK2, PK3 FROM " + tableName + + " WHERE PK1=? AND PK2=? AND PK3=? AND PK4 IN (?"; + // Send at least 4 tasks in Phoenix client thread pool and we will assert also on this + // later while actually querying data. + int taskCount = 4; + for (int i = 1; i < taskCount; i++) { + getRows += ", ?"; + } + getRows += ")"; + final String upsertRows = "UPSERT INTO " + tableName + " VALUES(?, ?, ?, ?, ?, ?)"; + String creatTableDdl = "CREATE TABLE IF NOT EXISTS " + tableName + " (\n" + + " PK1 CHAR(15) NOT NULL,\n" + + " PK2 CHAR(15) NOT NULL,\n" + + " PK3 DECIMAL NOT NULL,\n" + + " PK4 CHAR(32) NOT NULL,\n" + + " COL1 VARCHAR NOT NULL,\n" + + " COL2 VARCHAR NOT NULL,\n" + + " CONSTRAINT PK PRIMARY KEY (\n" + + " PK1,\n" + + " PK2,\n" + + " PK3,\n" + + " PK4\n" + + " )\n" + + ") MULTI_TENANT=true, IMMUTABLE_ROWS=TRUE, SALT_BUCKETS=" + saltBucketNum; + long vpk3 = 1000; + try(Connection conn = DriverManager.getConnection(getUrl())) { + PreparedStatement stmt = conn.prepareStatement(upsertRows); + stmt.execute(creatTableDdl); + for (int i = 1; i <= saltBucketNum; i++) { + stmt.setString(1, "VPK1"); + stmt.setString(2, "VPK2"); + stmt.setLong(3, vpk3); + stmt.setString(4, "VPK4_" + i); + stmt.setString(5, "{i:" + i + "}"); + stmt.setString(6, "{o:" + i + "}"); + stmt.executeUpdate(); + } + conn.commit(); + } + try(Connection conn = DriverManager.getConnection(getUrl())) { + PreparedStatement stmt = conn.prepareStatement(getRows); + stmt.setString(1, "VPK1"); + stmt.setString(2, "VPK2"); + stmt.setLong(3, vpk3); + for (int i = 4; i < taskCount + 4; i++) { + stmt.setString(i, "VPK4_" + i); + } + ResultSet rs = stmt.executeQuery(); + PhoenixResultSet phoenixResultSet = rs.unwrap(PhoenixResultSet.class); + ReadMetricQueue readMetricQueue = new TestTaskReadMetricsQueue(LogLevel.OFF, + true); + changeInternalStateForTesting(phoenixResultSet, readMetricQueue); + while(rs.next()) {} + Map overallQueryMetrics = + PhoenixRuntime.getOverAllReadRequestMetricInfo(rs); + Map readMetrics = + PhoenixRuntime.getRequestReadMetricInfo(rs).get(tableName); + // Sent 4 tasks in Phoenix client thread pool. So, that we do see the max task + // queue time and max task end to end time. + assertEquals(4, (long) readMetrics.get(TASK_EXECUTED_COUNTER)); + assertEquals(20, (long) overallQueryMetrics.get( + MetricType.WALL_CLOCK_QUERY_WAIT_TIME)); + assertEquals(41, (long) overallQueryMetrics.get( + MetricType.WALL_CLOCK_QUERY_TASK_END_TO_END_TIME)); + } + } + + /** + * This test aims to test that when scanner cache of first batch of scans is exhausted + * in RoundRobinResultItr and next batch of scans is submitted then overall query wait + * time and query end to end task time is updated correctly. + * + * @throws SQLException + * @throws NoSuchFieldException + * @throws IllegalAccessException + */ + @Test + public void testPhoenixClientQueueWaitTimeAndEndToEndTimeWithScannerCacheRefill() + throws Exception { + int fetchSize = 3; + int saltBucketNum = 8; + String tableName = generateUniqueName(); + String getRows = "SELECT COL1, COL2, PK4, PK2, PK3 FROM " + tableName + + " WHERE PK1=? AND PK2=? AND PK3=? AND PK4 IN (?"; + // Send at least 2 batches of scans. The salt buckets are 8 so, even if first batch of + // scans cover all the salt buckets still only 16 rows will be cached with fetch size of 2. + // So, setting rows to read per query greater than 16. + int taskCount = saltBucketNum * (fetchSize - 1) + saltBucketNum; + for (int i = 1; i < taskCount; i++) { + getRows += ", ?"; + } + getRows += ")"; + final String upsertRows = "UPSERT INTO " + tableName + " VALUES(?, ?, ?, ?, ?, ?)"; + String creatTableDdl = "CREATE TABLE IF NOT EXISTS " + tableName + " (\n" + + " PK1 CHAR(15) NOT NULL,\n" + + " PK2 CHAR(15) NOT NULL,\n" + + " PK3 DECIMAL NOT NULL,\n" + + " PK4 CHAR(32) NOT NULL,\n" + + " COL1 VARCHAR NOT NULL,\n" + + " COL2 VARCHAR NOT NULL,\n" + + " CONSTRAINT PK PRIMARY KEY (\n" + + " PK1,\n" + + " PK2,\n" + + " PK3,\n" + + " PK4\n" + + " )\n" + + ") MULTI_TENANT=true, IMMUTABLE_ROWS=TRUE, SALT_BUCKETS=" + saltBucketNum; + long vpk3 = 1000; + try(Connection conn = DriverManager.getConnection(getUrl())) { + PreparedStatement stmt = conn.prepareStatement(upsertRows); + stmt.execute(creatTableDdl); + for (int i = 1; i <= 2 * taskCount; i++) { + stmt.setString(1, "VPK1"); + stmt.setString(2, "VPK2"); + stmt.setLong(3, vpk3); + stmt.setString(4, "VPK4_" + i); + stmt.setString(5, "{i:" + i + "}"); + stmt.setString(6, "{o:" + i + "}"); + stmt.executeUpdate(); + } + conn.commit(); + } + String url = getUrl("TEST"); + Properties props = new Properties(); + props.setProperty(QueryServices.FORCE_ROW_KEY_ORDER_ATTRIB, String.valueOf(false)); + try(Connection conn = DriverManager.getConnection(url, props)) { + PreparedStatement stmt = conn.prepareStatement(getRows); + stmt.setFetchSize(fetchSize); + stmt.setString(1, "VPK1"); + stmt.setString(2, "VPK2"); + stmt.setLong(3, vpk3); + for (int i = 4; i < taskCount + 4; i++) { + stmt.setString(i, "VPK4_" + i); + } + ResultSet rs = stmt.executeQuery(); + PhoenixResultSet phoenixResultSet = rs.unwrap(PhoenixResultSet.class); + // Make sure we are using RoundRobinResultItr. as it refills scanner cache + assertTrue(phoenixResultSet.getUnderlyingIterator() + instanceof RoundRobinResultIterator); + RoundRobinResultIterator itr = + (RoundRobinResultIterator) phoenixResultSet.getUnderlyingIterator(); + ReadMetricQueue readMetricQueue = new TestTaskReadMetricsQueue(LogLevel.OFF, + true); + changeInternalStateForTesting(phoenixResultSet, readMetricQueue); + while(rs.next()) {} + Map overallQueryMetrics = + PhoenixRuntime.getOverAllReadRequestMetricInfo(rs); + Map readMetrics = + PhoenixRuntime.getRequestReadMetricInfo(rs).get(tableName); + // Make sure scanner cache was refilled once + assertEquals(1, itr.getNumberOfParallelFetches()); + assertEquals(40, + (long) overallQueryMetrics.get(MetricType.WALL_CLOCK_QUERY_WAIT_TIME)); + assertEquals(82, (long) overallQueryMetrics.get( + MetricType.WALL_CLOCK_QUERY_TASK_END_TO_END_TIME)); + long totalTasksExecuted = readMetrics.get(TASK_EXECUTED_COUNTER); + assertEquals(totalTasksExecuted * TASK_EXECUTION_TIME_DELTA, + (long) readMetrics.get(TASK_EXECUTION_TIME)); + } + } + + private class TestTaskReadMetricsQueue extends ReadMetricQueue { + + int taskQueueWaitTimeIndex = 0; + int taskEndToEndTimeIndex = 0; + int[] taskQueueWaitTime = {10, 5, 20, 7}; + int[] taskEndToEndTime = {20, 15, 41, 16}; + // To make test predictable + final Object lock = new Object(); + + public TestTaskReadMetricsQueue(LogLevel connectionLogLevel, + boolean isRequestMetricsEnabled) { + super(isRequestMetricsEnabled, connectionLogLevel); + } + + @Override + public CombinableMetric getMetric(MetricType type) { + switch (type) { + case TASK_QUEUE_WAIT_TIME: + return new CombinableMetricImpl(type) { + + @Override + public void change(long delta) { + synchronized (lock) { + super.change(taskQueueWaitTime[taskQueueWaitTimeIndex]); + taskQueueWaitTimeIndex++; + taskQueueWaitTimeIndex %= taskQueueWaitTime.length; + } + } + }; + case TASK_END_TO_END_TIME: + return new CombinableMetricImpl(type) { + + @Override + public void change(long delta) { + synchronized (lock) { + super.change(taskEndToEndTime[taskEndToEndTimeIndex]); + taskEndToEndTimeIndex++; + taskEndToEndTimeIndex %= taskEndToEndTime.length; + } + } + }; + case TASK_EXECUTION_TIME: + return new CombinableMetricImpl(type) { + + @Override + public void change(long delta) { + super.change(TASK_EXECUTION_TIME_DELTA); + } + }; + } + return super.getMetric(type); + } + } } diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java index 1405ad6355c..7d5ebbb849d 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java @@ -441,6 +441,13 @@ protected static String getUrl() { } return url; } + + protected static String getUrl(String principal) throws Exception { + if (!clusterInitialized) { + throw new IllegalStateException("Cluster must be initialized before attempting to get the URL"); + } + return getLocalClusterUrl(utility, principal); + } protected static String checkClusterInitialized(ReadOnlyProps serverProps) throws Exception { if (!clusterInitialized) { @@ -569,6 +576,13 @@ protected static String getLocalClusterUrl(HBaseTestingUtility util) throws Exce String url = QueryUtil.getConnectionUrl(new Properties(), util.getConfiguration()); return url + PHOENIX_TEST_DRIVER_URL_PARAM; } + + protected static String getLocalClusterUrl(HBaseTestingUtility util, String principal) + throws Exception { + String url = QueryUtil.getConnectionUrl(new Properties(), util.getConfiguration(), + principal); + return url + PHOENIX_TEST_DRIVER_URL_PARAM; + } /** * Initialize the cluster in distributed mode