diff --git a/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java b/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java
index 7ce7a023df6..cb380fd993d 100644
--- a/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java
+++ b/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java
@@ -418,6 +418,13 @@
*
Cache |
* scan server tablet cache metrics |
*
+ *
+ * | N/A |
+ * N/A |
+ * {@value #METRICS_SCAN_EXCEPTIONS} |
+ * Counter |
+ * Count the number of exceptions that occur within scan executors |
+ *
*
*
* | scan |
@@ -725,6 +732,7 @@ public interface MetricsProducer {
String METRICS_SCAN_SCANNED_ENTRIES = METRICS_SCAN_PREFIX + "query.scanned.entries";
String METRICS_SCAN_ZOMBIE_THREADS = METRICS_SCAN_PREFIX + "zombie.threads";
String METRICS_SCAN_TABLET_METADATA_CACHE = METRICS_SCAN_PREFIX + "tablet.metadata.cache";
+ String METRICS_SCAN_EXCEPTIONS = METRICS_SCAN_PREFIX + "exceptions";
String METRICS_TSERVER_PREFIX = "accumulo.tserver.";
String METRICS_TSERVER_ENTRIES = METRICS_TSERVER_PREFIX + "entries";
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerScanMetrics.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerScanMetrics.java
index 36fbd42de11..c45480afe9d 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerScanMetrics.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerScanMetrics.java
@@ -19,6 +19,7 @@
package org.apache.accumulo.tserver.metrics;
import java.time.Duration;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.IntSupplier;
@@ -28,6 +29,7 @@
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
+import io.micrometer.core.instrument.FunctionCounter;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Timer;
@@ -47,6 +49,9 @@ public class TabletServerScanMetrics implements MetricsProducer {
private final LongAdder queryResultCount = new LongAdder();
private final LongAdder queryResultBytes = new LongAdder();
private final LongAdder scannedCount = new LongAdder();
+ private final ConcurrentHashMap executorExceptionCounts =
+ new ConcurrentHashMap<>();
+ private volatile MeterRegistry registry = null;
public void incrementLookupCount(long amount) {
this.lookupCount.add(amount);
@@ -124,8 +129,28 @@ public TabletServerScanMetrics(IntSupplier openFileSupplier) {
openFiles = openFileSupplier;
}
+ public void incrementExecutorExceptions(String executorName) {
+ executorExceptionCounts.computeIfAbsent(executorName, k -> {
+ AtomicLong counter = new AtomicLong(0);
+ // Register the counter if the registry is already available
+ if (registry != null) {
+ registerExecutorExceptionCounter(executorName, counter);
+ }
+ return counter;
+ }).incrementAndGet();
+ }
+
+ private void registerExecutorExceptionCounter(String executorName, AtomicLong counter) {
+ FunctionCounter.builder(METRICS_SCAN_EXCEPTIONS, counter, AtomicLong::get)
+ .tags("executor", executorName)
+ .description(
+ "Number of exceptions thrown from the iterator stack during scan execution, tagged by executor name")
+ .register(registry);
+ }
+
@Override
public void registerMetrics(MeterRegistry registry) {
+ this.registry = registry;
Gauge.builder(METRICS_SCAN_OPEN_FILES, openFiles::getAsInt)
.description("Number of files open for scans").register(registry);
scans = Timer.builder(METRICS_SCAN_TIMES).description("Scans").register(registry);
@@ -156,6 +181,7 @@ public void registerMetrics(MeterRegistry registry) {
Gauge.builder(METRICS_SCAN_ZOMBIE_THREADS, this, TabletServerScanMetrics::getZombieThreadsCount)
.description("Number of scan threads that have no associated client session")
.register(registry);
+ executorExceptionCounts.forEach(this::registerExecutorExceptionCounter);
}
}
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/LookupTask.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/LookupTask.java
index 8eb15ea6bb4..750661182fe 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/LookupTask.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/LookupTask.java
@@ -59,6 +59,17 @@ public LookupTask(TabletHostingServer server, long scanID) {
this.scanID = scanID;
}
+ private void recordException(MultiScanSession scanSession) {
+ if (scanSession != null && server.getScanMetrics() != null) {
+ String executorName = getExecutorName(scanSession);
+ server.getScanMetrics().incrementExecutorExceptions(executorName);
+ }
+ }
+
+ private String getExecutorName(MultiScanSession scanSession) {
+ return scanSession.scanParams.getScanDispatch().getExecutorName();
+ }
+
@Override
public void run() {
MultiScanSession session = (MultiScanSession) server.getSession(scanID);
@@ -179,9 +190,11 @@ public void run() {
addResult(iie);
}
} catch (SampleNotPresentException e) {
+ recordException(session);
addResult(e);
} catch (Exception e) {
log.warn("exception while doing multi-scan ", e);
+ recordException(session);
addResult(e);
} finally {
transitionFromRunning();
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/NextBatchTask.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/NextBatchTask.java
index 96251bd59b0..5707d714da0 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/NextBatchTask.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/NextBatchTask.java
@@ -48,6 +48,17 @@ public NextBatchTask(TabletHostingServer server, long scanID, AtomicBoolean inte
}
}
+ private void recordException(SingleScanSession scanSession) {
+ if (scanSession != null && server.getScanMetrics() != null) {
+ String executorName = getExecutorName(scanSession);
+ server.getScanMetrics().incrementExecutorExceptions(executorName);
+ }
+ }
+
+ private String getExecutorName(SingleScanSession scanSession) {
+ return scanSession.scanParams.getScanDispatch().getExecutorName();
+ }
+
@Override
public void run() {
@@ -93,10 +104,12 @@ public void run() {
addResult(iie);
}
} catch (TooManyFilesException | SampleNotPresentException e) {
+ recordException(scanSession);
addResult(e);
} catch (IOException | RuntimeException e) {
log.warn("exception while scanning tablet {} for {}", scanSession.extent, scanSession.client,
e);
+ recordException(scanSession);
addResult(e);
} finally {
transitionFromRunning();
diff --git a/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java b/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java
index 026f392a010..b82640d998d 100644
--- a/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java
@@ -120,7 +120,8 @@ public void confirmMetricsPublished() throws Exception {
METRICS_SCAN_RESERVATION_TOTAL_TIMER,
METRICS_SCAN_RESERVATION_WRITEOUT_TIMER,
METRICS_SCAN_TABLET_METADATA_CACHE,
- METRICS_SERVER_IDLE);
+ METRICS_SERVER_IDLE,
+ METRICS_SCAN_EXCEPTIONS);
// @formatter:on
Map expectedMetricNames = this.getMetricFields();
diff --git a/test/src/main/java/org/apache/accumulo/test/metrics/ScanExecutorExceptionsIT.java b/test/src/main/java/org/apache/accumulo/test/metrics/ScanExecutorExceptionsIT.java
new file mode 100644
index 00000000000..7915b851fe1
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/metrics/ScanExecutorExceptionsIT.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.metrics;
+
+import static org.apache.accumulo.core.metrics.MetricsProducer.METRICS_SCAN_EXCEPTIONS;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.time.Duration;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.BatchScanner;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.admin.NewTableConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.iterators.IteratorUtil;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.spi.metrics.LoggingMeterRegistryFactory;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.functional.ConfigurableMacBase;
+import org.apache.accumulo.test.functional.ErrorThrowingIterator;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ScanExecutorExceptionsIT extends ConfigurableMacBase {
+
+ private static final Logger log = LoggerFactory.getLogger(ScanExecutorExceptionsIT.class);
+ private static TestStatsDSink sink;
+
+ @Override
+ protected Duration defaultTimeout() {
+ return Duration.ofMinutes(5);
+ }
+
+ @BeforeAll
+ public static void setup() throws Exception {
+ sink = new TestStatsDSink();
+ }
+
+ @AfterAll
+ public static void teardown() throws Exception {
+ if (sink != null) {
+ sink.close();
+ }
+ }
+
+ @Override
+ protected void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+ cfg.setProperty(Property.GENERAL_MICROMETER_ENABLED, "true");
+ cfg.setProperty(Property.GENERAL_MICROMETER_JVM_METRICS_ENABLED, "true");
+ cfg.setProperty(Property.GENERAL_MICROMETER_ENABLED, "true");
+ cfg.setProperty("general.custom.metrics.opts.logging.step", "5s");
+ String clazzList = LoggingMeterRegistryFactory.class.getName() + ","
+ + TestStatsDRegistryFactory.class.getName();
+ cfg.setProperty(Property.GENERAL_MICROMETER_FACTORY, clazzList);
+ Map sysProps = Map.of(TestStatsDRegistryFactory.SERVER_HOST, "127.0.0.1",
+ TestStatsDRegistryFactory.SERVER_PORT, Integer.toString(sink.getPort()));
+ cfg.setSystemProperties(sysProps);
+ }
+
+ @Test
+ public void testScanExecutorExceptions() throws Exception {
+ try (AccumuloClient client = Accumulo.newClient().from(getClientProperties()).build()) {
+
+ String tableName = getUniqueNames(1)[0];
+ log.info("Creating table: {}", tableName);
+
+ final int numEntries = 10;
+ final int totalExceptions = 6;
+
+ NewTableConfiguration ntc = new NewTableConfiguration();
+ IteratorSetting errorIterSetting =
+ new IteratorSetting(50, "error", ErrorThrowingIterator.class);
+ errorIterSetting.addOption(ErrorThrowingIterator.TIMES, String.valueOf(totalExceptions));
+ ntc.attachIterator(errorIterSetting, EnumSet.of(IteratorUtil.IteratorScope.scan));
+ client.tableOperations().create(tableName, ntc);
+
+ try (BatchWriter writer = client.createBatchWriter(tableName)) {
+ for (int i = 0; i < numEntries; i++) {
+ Mutation m = new Mutation(new Text("row" + i));
+ m.put("cf", "cq", "value" + i);
+ writer.addMutation(m);
+ }
+ }
+
+ client.tableOperations().flush(tableName, null, null, true);
+
+ log.info("Performing regular scan");
+ int scanCount = performScanCountingEntries(client, tableName);
+ log.info("Regular scan returned {} entries", scanCount);
+
+ log.info("Performing batch scan");
+ int batchScanCount = performBatchScanCountingEntries(client, tableName);
+ log.info("Batch scan returned {} entries", batchScanCount);
+
+ List statsDMetrics;
+ boolean foundMetric = false;
+ long highestExceptionCount = 0;
+ long startTime = System.currentTimeMillis();
+ long timeout = 30_000;
+
+ while (!foundMetric && (System.currentTimeMillis() - startTime) < timeout) {
+ statsDMetrics = sink.getLines();
+
+ if (!statsDMetrics.isEmpty()) {
+ for (String line : statsDMetrics) {
+ if (line.startsWith(METRICS_SCAN_EXCEPTIONS)) {
+ foundMetric = true;
+ TestStatsDSink.Metric metric = TestStatsDSink.parseStatsDMetric(line);
+ String executor = metric.getTags().get("executor");
+ if (executor != null) {
+ long val = Long.parseLong(metric.getValue());
+ highestExceptionCount = Math.max(highestExceptionCount, val);
+ log.info("Found scan exception metric for executor '{}': {}", executor, val);
+ }
+ }
+ }
+ }
+
+ if (!foundMetric) {
+ Thread.sleep(1_000);
+ }
+ }
+
+ log.info("Final exception count from metrics: {}", highestExceptionCount);
+
+ assertTrue(foundMetric, "Should have found scan exception metric");
+ assertTrue(highestExceptionCount > 0,
+ "Scan exception metric should have a count > 0, but was: " + highestExceptionCount);
+ }
+ }
+
+ private int performScanCountingEntries(AccumuloClient client, String table) {
+ int count = 0;
+ try (Scanner scanner = client.createScanner(table, Authorizations.EMPTY)) {
+ scanner.setTimeout(10, java.util.concurrent.TimeUnit.SECONDS);
+ for (var entry : scanner) {
+ count++;
+ log.debug("Scan entry {}: {}", count, entry.getKey());
+ }
+ log.info("Scan completed successfully with {} entries", count);
+ } catch (Exception e) {
+ log.info("Exception during regular scan after {} entries: {}", count, e.getMessage());
+ }
+ return count;
+ }
+
+ private int performBatchScanCountingEntries(AccumuloClient client, String table) {
+ int count = 0;
+ try (BatchScanner batchScanner = client.createBatchScanner(table, Authorizations.EMPTY, 2)) {
+ batchScanner.setTimeout(10, java.util.concurrent.TimeUnit.SECONDS);
+ batchScanner.setRanges(List.of(new Range()));
+ for (var entry : batchScanner) {
+ count++;
+ log.debug("Batch scan entry {}: {}", count, entry.getKey());
+ }
+ log.info("Batch scan completed successfully with {} entries", count);
+ } catch (Exception e) {
+ log.info("Exception during batch scan after {} entries: {}", count, e.getMessage());
+ }
+ return count;
+ }
+}