Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PHOENIX-7528 Add metrics for overall wall clock query wait time in Phoenix client thread pool and total time spent in executing HBase scan tasks #2077

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
import org.apache.phoenix.filter.EncodedQualifiersColumnProjectionFilter;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.hbase.index.util.VersionUtil;
import org.apache.phoenix.job.JobManager;
import org.apache.phoenix.join.HashCacheClient;
import org.apache.phoenix.monitoring.OverAllQueryMetrics;
import org.apache.phoenix.parse.FilterableStatement;
Expand All @@ -111,7 +112,6 @@
import org.apache.phoenix.schema.stats.GuidePostsInfo;
import org.apache.phoenix.schema.stats.GuidePostsKey;
import org.apache.phoenix.schema.stats.StatisticsUtil;
import org.apache.phoenix.schema.types.PVarbinaryEncoded;
import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.ClientUtil;
Expand Down Expand Up @@ -1439,6 +1439,8 @@ private List<PeekingResultIterator> getIterators(List<List<Scan>> scan, Connecti
SQLException toThrow = null;
final HashCacheClient hashCacheClient = new HashCacheClient(context.getConnection());
int queryTimeOut = context.getStatement().getQueryTimeoutInMillis();
long maxTaskQueueWaitTime = 0;
long maxTaskEndToEndTime = 0;
try {
submitWork(scan, futures, allIterators, splitSize, isReverse, scanGrouper, maxQueryEndTime);
boolean clearedCache = false;
Expand Down Expand Up @@ -1467,7 +1469,13 @@ private List<PeekingResultIterator> getIterators(List<List<Scan>> scan, Connecti
&& Bytes.compareTo(scanPair.getFirst().getAttribute(SCAN_START_ROW_SUFFIX), previousScan.getScan().getAttribute(SCAN_START_ROW_SUFFIX))==0)) {
continue;
}
PeekingResultIterator iterator = scanPair.getSecond().get(timeOutForScan, TimeUnit.MILLISECONDS);
Future<PeekingResultIterator> futureTask = scanPair.getSecond();
PeekingResultIterator iterator = futureTask.get(timeOutForScan,
TimeUnit.MILLISECONDS);
long taskQueueWaitTime = JobManager.getTaskQueueWaitTime(futureTask);
long taskEndToEndTime = JobManager.getTaskEndToEndTime(futureTask);
maxTaskQueueWaitTime = Math.max(maxTaskQueueWaitTime, taskQueueWaitTime);
maxTaskEndToEndTime = Math.max(maxTaskEndToEndTime, taskEndToEndTime);
concatIterators.add(iterator);
previousScan.setScan(scanPair.getFirst());
} catch (ExecutionException e) {
Expand Down Expand Up @@ -1571,9 +1579,11 @@ private List<PeekingResultIterator> getIterators(List<List<Scan>> scan, Connecti
}
}
} finally {
OverAllQueryMetrics overAllQueryMetrics = context.getOverallQueryMetrics();
overAllQueryMetrics.updateQueryWaitTime(maxTaskQueueWaitTime);
overAllQueryMetrics.updateQueryTaskEndToEndTime(maxTaskEndToEndTime);
if (toThrow != null) {
GLOBAL_FAILED_QUERY_COUNTER.increment();
OverAllQueryMetrics overAllQueryMetrics = context.getOverallQueryMetrics();
overAllQueryMetrics.queryFailed();
if (context.getScanRanges().isPointLookup()) {
overAllQueryMetrics.queryPointLookupFailed();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,23 @@

import static org.apache.phoenix.thirdparty.com.google.common.base.Preconditions.checkArgument;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_FAILED_QUERY_COUNTER;
import static org.apache.phoenix.job.JobManager.JobCallable;

import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;

import org.apache.phoenix.compile.ExplainPlanAttributes
.ExplainPlanAttributesBuilder;
import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.compile.StatementContext;
import org.apache.phoenix.job.JobManager;
import org.apache.phoenix.monitoring.OverAllQueryMetrics;
import org.apache.phoenix.monitoring.ReadMetricQueue;
import org.apache.phoenix.monitoring.TaskExecutionMetricsHolder;
import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.schema.tuple.Tuple;
import org.apache.phoenix.util.ClientUtil;
Expand Down Expand Up @@ -167,7 +170,7 @@ public void explain(List<String> planSteps,
}

@VisibleForTesting
int getNumberOfParallelFetches() {
public int getNumberOfParallelFetches() {
return numParallelFetches;
}

Expand Down Expand Up @@ -230,6 +233,8 @@ private List<RoundRobinIterator> fetchNextBatch() throws SQLException {
Collections.shuffle(openIterators);
boolean success = false;
SQLException toThrow = null;
long maxTaskQueueWaitTime = 0;
long maxTaskEndToEndTime = 0;
try {
StatementContext context = plan.getContext();
final ConnectionQueryServices services = context.getConnection().getQueryServices();
Expand All @@ -238,19 +243,40 @@ private List<RoundRobinIterator> fetchNextBatch() throws SQLException {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Performing parallel fetch for " + openIterators.size() + " iterators. ");
}
String physicalTableName = plan.getTableRef().getTable().getPhysicalName().getString();
for (final RoundRobinIterator itr : openIterators) {
Future<Tuple> future = executor.submit(new Callable<Tuple>() {
ReadMetricQueue readMetricQueue = context.getReadMetricsQueue();
TaskExecutionMetricsHolder taskMetrics =
new TaskExecutionMetricsHolder(readMetricQueue, physicalTableName);
Future<Tuple> future = executor.submit(new JobCallable<Tuple>() {
@Override
public Tuple call() throws Exception {
// Read the next record to refill the scanner's cache.
return itr.next();
}

@Override
public Object getJobId() {
// Prior to using JobCallable, every callable refilling the scanner cache
// was treated as a separate producer in JobManager queue so, keeping
// that same. Should this be changed to ResultIterators.this ?
return this;
}

@Override
public TaskExecutionMetricsHolder getTaskExecutionMetric() {
return taskMetrics;
}
});
futures.add(future);
}
int i = 0;
for (Future<Tuple> future : futures) {
Tuple tuple = future.get();
long taskQueueWaitTime = JobManager.getTaskQueueWaitTime(future);
long taskEndToEndTime = JobManager.getTaskEndToEndTime(future);
maxTaskQueueWaitTime = Math.max(maxTaskQueueWaitTime, taskQueueWaitTime);
maxTaskEndToEndTime = Math.max(maxTaskEndToEndTime, taskEndToEndTime);
if (tuple != null) {
results.add(new RoundRobinIterator(openIterators.get(i).delegate, tuple));
} else {
Expand Down Expand Up @@ -279,9 +305,12 @@ public Tuple call() throws Exception {
}
}
} finally {
OverAllQueryMetrics overAllQueryMetrics =
plan.getContext().getOverallQueryMetrics();
overAllQueryMetrics.updateQueryWaitTime(maxTaskQueueWaitTime);
overAllQueryMetrics.updateQueryTaskEndToEndTime(maxTaskEndToEndTime);
if (toThrow != null) {
GLOBAL_FAILED_QUERY_COUNTER.increment();
OverAllQueryMetrics overAllQueryMetrics = plan.getContext().getOverallQueryMetrics();
overAllQueryMetrics.queryFailed();
if (plan.getContext().getScanRanges().isPointLookup()) {
overAllQueryMetrics.queryPointLookupFailed();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
Expand Down Expand Up @@ -153,6 +154,10 @@ public JobFutureTask(Callable<T> c) {
public Object getJobId() {
return jobId;
}

public TaskExecutionMetricsHolder getTaskMetric() {
return taskMetric;
}
}

/**
Expand Down Expand Up @@ -300,5 +305,31 @@ private static TaskExecutionMetricsHolder getRequestMetric(Runnable task) {
return ((JobFutureTask)task).taskMetric;
}
}

private static <V> TaskExecutionMetricsHolder getTaskMetric(
Future<V> futureTask) {
if (futureTask instanceof InstrumentedJobFutureTask) {
TaskExecutionMetricsHolder taskMetrics =
((InstrumentedJobFutureTask<V>) futureTask).getTaskMetric();
return taskMetrics;
}
return null;
}

public static <V> long getTaskQueueWaitTime(Future<V> futureTask) {
TaskExecutionMetricsHolder taskMetrics = getTaskMetric(futureTask);
if (taskMetrics != null) {
return taskMetrics.getTaskQueueWaitTime().getValue();
}
return 0;
}

public static <V> long getTaskEndToEndTime(Future<V> futureTask) {
TaskExecutionMetricsHolder taskMetrics = getTaskMetric(futureTask);
if (taskMetrics != null) {
return taskMetrics.getTaskEndToEndTime().getValue();
}
return 0;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,11 @@ public enum MetricType {
CACHE_REFRESH_SPLITS_COUNTER("cr", "Number of times cache was refreshed because of splits",LogLevel.DEBUG, PLong.INSTANCE),
WALL_CLOCK_TIME_MS("tq", "Wall clock time elapsed for the overall query execution",LogLevel.INFO, PLong.INSTANCE),
RESULT_SET_TIME_MS("tn", "Wall clock time elapsed for reading all records using resultSet.next()",LogLevel.INFO, PLong.INSTANCE),
WALL_CLOCK_QUERY_WAIT_TIME("qwt", "Total wall clock time spent by a " +
"query waiting in the Phoenix client thread pool queue", LogLevel.OFF, PLong.INSTANCE),
WALL_CLOCK_QUERY_TASK_END_TO_END_TIME("qeet", "Total wall clock time " +
"spent by a query in task execution in the Phoenix client thread pool" +
"including the queue wait time", LogLevel.OFF, PLong.INSTANCE),
OPEN_PHOENIX_CONNECTIONS_COUNTER("o", "Number of open phoenix connections",LogLevel.OFF, PLong.INSTANCE),
OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER("io", "Number of open internal phoenix connections",LogLevel.OFF, PLong.INSTANCE),
QUERY_SERVICES_COUNTER("cqs", "Number of ConnectionQueryServicesImpl instantiated",LogLevel.OFF, PLong.INSTANCE),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import static org.apache.phoenix.monitoring.MetricType.QUERY_SCAN_TIMEOUT_COUNTER;
import static org.apache.phoenix.monitoring.MetricType.QUERY_TIMEOUT_COUNTER;
import static org.apache.phoenix.monitoring.MetricType.RESULT_SET_TIME_MS;
import static org.apache.phoenix.monitoring.MetricType.WALL_CLOCK_QUERY_TASK_END_TO_END_TIME;
import static org.apache.phoenix.monitoring.MetricType.WALL_CLOCK_QUERY_WAIT_TIME;
import static org.apache.phoenix.monitoring.MetricType.WALL_CLOCK_TIME_MS;

import java.util.HashMap;
Expand All @@ -50,6 +52,8 @@ public class OverAllQueryMetrics {
private final CombinableMetric queryPointLookupFailed;
private final CombinableMetric queryScanFailed;
private final CombinableMetric cacheRefreshedDueToSplits;
private final CombinableMetric queryWaitTime;
private final CombinableMetric queryTaskEndToEndTime;

public OverAllQueryMetrics(boolean isRequestMetricsEnabled, LogLevel connectionLogLevel) {
queryWatch = MetricUtil.getMetricsStopWatch(isRequestMetricsEnabled, connectionLogLevel,
Expand All @@ -72,6 +76,11 @@ public OverAllQueryMetrics(boolean isRequestMetricsEnabled, LogLevel connectionL
queryScanFailed = MetricUtil.getCombinableMetric(isRequestMetricsEnabled,connectionLogLevel, QUERY_SCAN_FAILED_COUNTER);
cacheRefreshedDueToSplits = MetricUtil.getCombinableMetric(isRequestMetricsEnabled,
connectionLogLevel, CACHE_REFRESH_SPLITS_COUNTER);
queryWaitTime = MetricUtil.getCombinableMetric(isRequestMetricsEnabled,
connectionLogLevel, WALL_CLOCK_QUERY_WAIT_TIME);
queryTaskEndToEndTime = MetricUtil.getCombinableMetric(isRequestMetricsEnabled,
connectionLogLevel, WALL_CLOCK_QUERY_TASK_END_TO_END_TIME);

}

public void updateNumParallelScans(long numParallelScans) {
Expand Down Expand Up @@ -132,6 +141,14 @@ public void stopResultSetWatch() {
}
}

public void updateQueryWaitTime(long queryWaitTime) {
this.queryWaitTime.change(queryWaitTime);
}

public void updateQueryTaskEndToEndTime(long queryTaskEndToEndTime) {
this.queryTaskEndToEndTime.change(queryTaskEndToEndTime);
}

@VisibleForTesting
long getWallClockTimeMs() {
return wallClockTimeMS.getValue();
Expand All @@ -154,6 +171,9 @@ public Map<MetricType, Long> publish() {
metricsForPublish.put(queryPointLookupFailed.getMetricType(), queryPointLookupFailed.getValue());
metricsForPublish.put(queryScanFailed.getMetricType(), queryScanFailed.getValue());
metricsForPublish.put(cacheRefreshedDueToSplits.getMetricType(), cacheRefreshedDueToSplits.getValue());
metricsForPublish.put(queryWaitTime.getMetricType(), queryWaitTime.getValue());
metricsForPublish.put(queryTaskEndToEndTime.getMetricType(),
queryTaskEndToEndTime.getValue());
return metricsForPublish;
}

Expand All @@ -170,6 +190,8 @@ public void reset() {
cacheRefreshedDueToSplits.reset();
queryWatch.stop();
resultSetWatch.stop();
queryWaitTime.reset();
queryTaskEndToEndTime.reset();
}

public OverAllQueryMetrics combine(OverAllQueryMetrics metric) {
Expand All @@ -183,6 +205,8 @@ public OverAllQueryMetrics combine(OverAllQueryMetrics metric) {
numParallelScans.combine(metric.numParallelScans);
wallClockTimeMS.combine(metric.wallClockTimeMS);
resultSetTimeMS.combine(metric.resultSetTimeMS);
queryWaitTime.combine(metric.queryWaitTime);
queryTaskEndToEndTime.combine(metric.queryTaskEndToEndTime);
return this;
}

Expand Down
Loading