diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java index 3db1565c7057e..dfd093e79c6ea 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java @@ -438,6 +438,10 @@ public class AbfsConfiguration{ FS_AZURE_ABFS_ENABLE_CHECKSUM_VALIDATION, DefaultValue = DEFAULT_ENABLE_ABFS_CHECKSUM_VALIDATION) private boolean isChecksumValidationEnabled; + @BooleanConfigurationValidatorAnnotation(ConfigurationKey = + FS_AZURE_ABFS_ENABLE_FULL_BLOB_CHECKSUM_VALIDATION, DefaultValue = DEFAULT_ENABLE_FULL_BLOB_ABFS_CHECKSUM_VALIDATION) + private boolean isFullBlobChecksumValidationEnabled; + @BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ENABLE_PAGINATED_DELETE, DefaultValue = DEFAULT_ENABLE_PAGINATED_DELETE) private boolean isPaginatedDeleteEnabled; @@ -474,6 +478,57 @@ public class AbfsConfiguration{ FS_AZURE_APACHE_HTTP_CLIENT_MAX_IO_EXCEPTION_RETRIES, DefaultValue = DEFAULT_APACHE_HTTP_CLIENT_MAX_IO_EXCEPTION_RETRIES) private int maxApacheHttpClientIoExceptionsRetries; + @BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_WRITE_DYNAMIC_THREADPOOL_ENABLEMENT, + DefaultValue = DEFAULT_WRITE_DYNAMIC_THREADPOOL_ENABLEMENT) + private boolean dynamicWriteThreadPoolEnablement; + + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_WRITE_THREADPOOL_KEEP_ALIVE_TIME, + DefaultValue = DEFAULT_WRITE_THREADPOOL_KEEP_ALIVE_TIME) + private int writeThreadPoolKeepAliveTime; + + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_WRITE_CPU_MONITORING_INTERVAL, + MinValue = MIN_WRITE_CPU_MONITORING_INTERVAL, + MaxValue = MAX_WRITE_CPU_MONITORING_INTERVAL, + DefaultValue = DEFAULT_WRITE_CPU_MONITORING_INTERVAL) + private int writeCpuMonitoringInterval; + + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_WRITE_THREADPOOL_CORE_POOL_SIZE, + DefaultValue = DEFAULT_WRITE_THREADPOOL_CORE_POOL_SIZE) + private int writeCorePoolSize; + + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_WRITE_HIGH_CPU_THRESHOLD, + MinValue = MIN_WRITE_HIGH_CPU_THRESHOLD, + MaxValue = MAX_WRITE_HIGH_CPU_THRESHOLD, + DefaultValue = DEFAULT_WRITE_HIGH_CPU_THRESHOLD) + private int writeHighCpuThreshold; + + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_WRITE_MEDIUM_CPU_THRESHOLD, + MinValue = MIN_WRITE_MEDIUM_CPU_THRESHOLD, + MaxValue = MAX_WRITE_MEDIUM_CPU_THRESHOLD, + DefaultValue = DEFAULT_WRITE_MEDIUM_CPU_THRESHOLD) + private int writeMediumCpuThreshold; + + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_WRITE_LOW_CPU_THRESHOLD, + MinValue = MIN_WRITE_LOW_CPU_THRESHOLD, + MaxValue = MAX_WRITE_LOW_CPU_THRESHOLD, + DefaultValue = DEFAULT_WRITE_LOW_CPU_THRESHOLD) + private int writeLowCpuThreshold; + + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_WRITE_LOW_TIER_MEMORY_MULTIPLIER, + MinValue = MIN_WRITE_LOW_TIER_MEMORY_MULTIPLIER, + DefaultValue = DEFAULT_WRITE_LOW_TIER_MEMORY_MULTIPLIER) + private int lowTierMemoryMultiplier; + + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_WRITE_MEDIUM_TIER_MEMORY_MULTIPLIER, + MinValue = MIN_WRITE_MEDIUM_TIER_MEMORY_MULTIPLIER, + DefaultValue = DEFAULT_WRITE_MEDIUM_TIER_MEMORY_MULTIPLIER) + private int mediumTierMemoryMultiplier; + + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_WRITE_HIGH_TIER_MEMORY_MULTIPLIER, + MinValue = MIN_WRITE_HIGH_TIER_MEMORY_MULTIPLIER, + DefaultValue = DEFAULT_WRITE_HIGH_TIER_MEMORY_MULTIPLIER) + private int highTierMemoryMultiplier; + /** * Max idle TTL configuration for connection given in * {@value org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys#FS_AZURE_APACHE_HTTP_CLIENT_IDLE_CONNECTION_TTL} @@ -1560,6 +1615,46 @@ public int getWriteMaxConcurrentRequestCount() { return this.writeMaxConcurrentRequestCount; } + public int getWriteThreadPoolKeepAliveTime() { + return writeThreadPoolKeepAliveTime; + } + + public int getWriteCpuMonitoringInterval() { + return writeCpuMonitoringInterval; + } + + public boolean isDynamicWriteThreadPoolEnablement() { + return dynamicWriteThreadPoolEnablement; + } + + public int getWriteLowCpuThreshold() { + return writeLowCpuThreshold; + } + + public int getWriteMediumCpuThreshold() { + return writeMediumCpuThreshold; + } + + public int getWriteHighCpuThreshold() { + return writeHighCpuThreshold; + } + + public int getLowTierMemoryMultiplier() { + return lowTierMemoryMultiplier; + } + + public int getMediumTierMemoryMultiplier() { + return mediumTierMemoryMultiplier; + } + + public int getHighTierMemoryMultiplier() { + return highTierMemoryMultiplier; + } + + public int getWriteCorePoolSize() { + return writeCorePoolSize; + } + public int getMaxWriteRequestsToQueue() { if (this.maxWriteRequestsToQueue < 1) { return 2 * getWriteMaxConcurrentRequestCount(); @@ -1705,6 +1800,10 @@ public void setIsChecksumValidationEnabled(boolean isChecksumValidationEnabled) this.isChecksumValidationEnabled = isChecksumValidationEnabled; } + public boolean isFullBlobChecksumValidationEnabled() { + return isFullBlobChecksumValidationEnabled; + } + public long getBlobCopyProgressPollWaitMillis() { return blobCopyProgressPollWaitMillis; } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index 2732c0ed8fb31..30d946f27b3e0 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -45,6 +45,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.UUID; import java.util.WeakHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -134,7 +135,6 @@ import org.apache.hadoop.util.BlockingThreadPoolExecutorService; import org.apache.hadoop.util.Preconditions; import org.apache.hadoop.util.SemaphoredDelegatingExecutor; -import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.apache.http.client.utils.URIBuilder; import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.METADATA_INCOMPLETE_RENAME_FAILURES; @@ -203,6 +203,7 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport { private int blockOutputActiveBlocks; /** Bounded ThreadPool for this instance. */ private ExecutorService boundedThreadPool; + private WriteThreadPoolSizeManager poolSizeManager; /** ABFS instance reference to be held by the store to avoid GC close. */ private BackReference fsBackRef; @@ -277,11 +278,19 @@ public AzureBlobFileSystemStore( } this.blockFactory = abfsStoreBuilder.blockFactory; this.blockOutputActiveBlocks = abfsStoreBuilder.blockOutputActiveBlocks; - this.boundedThreadPool = BlockingThreadPoolExecutorService.newInstance( - abfsConfiguration.getWriteMaxConcurrentRequestCount(), - abfsConfiguration.getMaxWriteRequestsToQueue(), - 10L, TimeUnit.SECONDS, - "abfs-bounded"); + if (abfsConfiguration.isDynamicWriteThreadPoolEnablement()) { + this.poolSizeManager = WriteThreadPoolSizeManager.getInstance( + getClient().getFileSystem() + "-" + UUID.randomUUID(), + abfsConfiguration); + poolSizeManager.startCPUMonitoring(); + this.boundedThreadPool = poolSizeManager.getExecutorService(); + } else { + this.boundedThreadPool = BlockingThreadPoolExecutorService.newInstance( + abfsConfiguration.getWriteMaxConcurrentRequestCount(), + abfsConfiguration.getMaxWriteRequestsToQueue(), + 10L, TimeUnit.SECONDS, + "abfs-bounded"); + } } /** @@ -321,16 +330,13 @@ public void close() throws IOException { try { Futures.allAsList(futures).get(); // shutdown the threadPool and set it to null. - HadoopExecutors.shutdown(boundedThreadPool, LOG, - 30, TimeUnit.SECONDS); - boundedThreadPool = null; } catch (InterruptedException e) { LOG.error("Interrupted freeing leases", e); Thread.currentThread().interrupt(); } catch (ExecutionException e) { LOG.error("Error freeing leases", e); } finally { - IOUtils.cleanupWithLogger(LOG, getClient()); + IOUtils.cleanupWithLogger(LOG, poolSizeManager, getClient()); } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/WriteThreadPoolSizeManager.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/WriteThreadPoolSizeManager.java new file mode 100644 index 0000000000000..876a04ca6aa82 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/WriteThreadPoolSizeManager.java @@ -0,0 +1,377 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.lang.management.OperatingSystemMXBean; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.hadoop.util.concurrent.HadoopExecutors; + +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.LOW_HEAP_SPACE_FACTOR; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.MEDIUM_HEAP_SPACE_FACTOR; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.BYTES_PER_GIGABYTE; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.HIGH_CPU_LOW_MEMORY_REDUCTION_FACTOR; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.HIGH_CPU_REDUCTION_FACTOR; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.HIGH_MEDIUM_HEAP_FACTOR; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.HUNDRED_D; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.LOW_CPU_HEAP_FACTOR; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.LOW_CPU_HIGH_MEMORY_DECREASE_FACTOR; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.LOW_CPU_POOL_SIZE_INCREASE_FACTOR; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MEDIUM_CPU_LOW_MEMORY_REDUCTION_FACTOR; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MEDIUM_CPU_REDUCTION_FACTOR; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.THIRTY_SECONDS; + +/** + * Manages a thread pool for writing operations, adjusting the pool size based on CPU utilization. + */ +public final class WriteThreadPoolSizeManager implements Closeable { + + /* Maximum allowed size for the thread pool. */ + private final int maxThreadPoolSize; + /* Executor for periodically monitoring CPU usage. */ + private final ScheduledExecutorService cpuMonitorExecutor; + /* Thread pool whose size is dynamically managed. */ + private volatile ExecutorService boundedThreadPool; + /* Lock to ensure thread-safe updates to the thread pool. */ + private final Lock lock = new ReentrantLock(); + /* New computed max size for the thread pool after adjustment. */ + private volatile int newMaxPoolSize; + /* Logger instance for logging events from WriteThreadPoolSizeManager. */ + private static final Logger LOG = LoggerFactory.getLogger( + WriteThreadPoolSizeManager.class); + /* Map to maintain a WriteThreadPoolSizeManager instance per filesystem. */ + private static final ConcurrentHashMap + POOL_SIZE_MANAGER_MAP = new ConcurrentHashMap<>(); + /* Name of the filesystem associated with this manager. */ + private final String filesystemName; + /* Initial size for the thread pool when created. */ + private final int initialPoolSize; + /* Initially available heap memory. */ + private final long initialAvailableHeapMemory; + /* The configuration instance. */ + private final AbfsConfiguration abfsConfiguration; + + /** + * Private constructor to initialize the write thread pool and CPU monitor executor + * based on system resources and ABFS configuration. + * + * @param filesystemName Name of the ABFS filesystem. + * @param abfsConfiguration Configuration containing pool size parameters. + */ + private WriteThreadPoolSizeManager(String filesystemName, + AbfsConfiguration abfsConfiguration) { + this.filesystemName = filesystemName; + this.abfsConfiguration = abfsConfiguration; + int availableProcessors = Runtime.getRuntime().availableProcessors(); + /* Get the heap space available when the instance is created */ + this.initialAvailableHeapMemory = getAvailableHeapMemory(); + /* Compute the max pool size */ + int computedMaxPoolSize = getComputedMaxPoolSize(availableProcessors, initialAvailableHeapMemory); + + /* Get the initial pool size from config, fallback to at least 1 */ + this.initialPoolSize = Math.max(1, + abfsConfiguration.getWriteMaxConcurrentRequestCount()); + + /* Set the upper bound for the thread pool size */ + this.maxThreadPoolSize = Math.max(computedMaxPoolSize, initialPoolSize); + + /* Initialize the bounded thread pool executor */ + this.boundedThreadPool = Executors.newFixedThreadPool(initialPoolSize); + + ThreadPoolExecutor executor = (ThreadPoolExecutor) this.boundedThreadPool; + executor.setKeepAliveTime( + abfsConfiguration.getWriteThreadPoolKeepAliveTime(), TimeUnit.SECONDS); + executor.allowCoreThreadTimeOut(true); + + /* Create a scheduled executor for CPU monitoring and pool adjustment */ + this.cpuMonitorExecutor = Executors.newScheduledThreadPool( + abfsConfiguration.getWriteCorePoolSize()); + } + + public AbfsConfiguration getAbfsConfiguration() { + return abfsConfiguration; + } + + /** + * Calculates the max thread pool size using a multiplier based on + * memory per core. Higher memory per core results in a larger multiplier. + * + * @param availableProcessors Number of CPU cores. + * @return Computed max thread pool size. + */ + private int getComputedMaxPoolSize(final int availableProcessors, long initialAvailableHeapMemory) { + LOG.debug("The available heap space in GB {} ", initialAvailableHeapMemory); + LOG.debug("The number of available processors is {} ", availableProcessors); + int maxpoolSize = getMemoryTierMaxThreads(initialAvailableHeapMemory, availableProcessors); + LOG.debug("The max thread pool size is {} ", maxpoolSize); + return maxpoolSize; + } + + /** + * Calculates the available heap memory in gigabytes. + * This method uses {@link Runtime#getRuntime()} to obtain the maximum heap memory + * allowed for the JVM and subtracts the currently used memory (total - free) + * to determine how much heap memory is still available. + * The result is rounded up to the nearest gigabyte. + * + * @return the available heap memory in gigabytes + */ + private long getAvailableHeapMemory() { + Runtime runtime = Runtime.getRuntime(); + long maxMemory = runtime.maxMemory(); + long usedMemory = runtime.totalMemory() - runtime.freeMemory(); + long availableHeapBytes = maxMemory - usedMemory; + return (availableHeapBytes + BYTES_PER_GIGABYTE - 1) / BYTES_PER_GIGABYTE; + } + + /** + * Returns aggressive thread count = CPU cores × multiplier based on heap tier. + */ + private int getMemoryTierMaxThreads(long availableHeapGB, int availableProcessors) { + int multiplier; + if (availableHeapGB <= LOW_HEAP_SPACE_FACTOR) { + multiplier = abfsConfiguration.getLowTierMemoryMultiplier(); + } else if (availableHeapGB <= MEDIUM_HEAP_SPACE_FACTOR) { + multiplier = abfsConfiguration.getMediumTierMemoryMultiplier(); + } else { + multiplier = abfsConfiguration.getHighTierMemoryMultiplier(); + } + return availableProcessors * multiplier; + } + + /** + * Returns the singleton instance of WriteThreadPoolSizeManager for the given filesystem. + * + * @param filesystemName the name of the filesystem. + * @param abfsConfiguration the configuration for the ABFS. + * + * @return the singleton instance. + */ + public static synchronized WriteThreadPoolSizeManager getInstance( + String filesystemName, AbfsConfiguration abfsConfiguration) { + /* Check if an instance already exists in the map for the given filesystem */ + WriteThreadPoolSizeManager existingInstance = POOL_SIZE_MANAGER_MAP.get( + filesystemName); + + /* If an existing instance is found, return it */ + if (existingInstance != null && existingInstance.boundedThreadPool != null + && !existingInstance.boundedThreadPool.isShutdown()) { + return existingInstance; + } + + /* Otherwise, create a new instance, put it in the map, and return it */ + LOG.debug( + "Creating new WriteThreadPoolSizeManager instance for filesystem: {}", + filesystemName); + WriteThreadPoolSizeManager newInstance = new WriteThreadPoolSizeManager( + filesystemName, abfsConfiguration); + POOL_SIZE_MANAGER_MAP.put(filesystemName, newInstance); + return newInstance; + } + + /** + * Adjusts the thread pool size to the specified maximum pool size. + * + * @param newMaxPoolSize the new maximum pool size. + */ + private void adjustThreadPoolSize(int newMaxPoolSize) { + synchronized (this) { + ThreadPoolExecutor threadPoolExecutor + = ((ThreadPoolExecutor) boundedThreadPool); + int currentCorePoolSize = threadPoolExecutor.getCorePoolSize(); + + if (newMaxPoolSize >= currentCorePoolSize) { + threadPoolExecutor.setMaximumPoolSize(newMaxPoolSize); + threadPoolExecutor.setCorePoolSize(newMaxPoolSize); + } else { + threadPoolExecutor.setCorePoolSize(newMaxPoolSize); + threadPoolExecutor.setMaximumPoolSize(newMaxPoolSize); + } + + LOG.debug("The thread pool size is: {} ", newMaxPoolSize); + LOG.debug("The pool size is: {} ", threadPoolExecutor.getPoolSize()); + LOG.debug("The active thread count is: {}", threadPoolExecutor.getActiveCount()); + } + } + + /** + * Starts monitoring the CPU utilization and adjusts the thread pool size accordingly. + */ + synchronized void startCPUMonitoring() { + cpuMonitorExecutor.scheduleAtFixedRate(() -> { + double cpuUtilization = getCpuUtilization(); + LOG.debug("Current CPU Utilization is this: {}", cpuUtilization); + try { + adjustThreadPoolSizeBasedOnCPU(cpuUtilization); + } catch (InterruptedException e) { + throw new RuntimeException(String.format( + "Thread pool size adjustment interrupted for filesystem %s", + filesystemName), e); + } + }, 0, getAbfsConfiguration().getWriteCpuMonitoringInterval(), TimeUnit.SECONDS); + } + + /** + * Gets the current CPU utilization. + * + * @return the CPU utilization as a percentage (0.0 to 1.0). + */ + private double getCpuUtilization() { + OperatingSystemMXBean osBean = ManagementFactory.getOperatingSystemMXBean(); + if (osBean instanceof com.sun.management.OperatingSystemMXBean) { + com.sun.management.OperatingSystemMXBean sunOsBean + = (com.sun.management.OperatingSystemMXBean) osBean; + double cpuLoad = sunOsBean.getSystemCpuLoad(); + if (cpuLoad >= 0) { + return cpuLoad; + } + } + return 0.0; + } + + /** + * Dynamically adjusts the thread pool size based on current CPU utilization + * and available heap memory relative to the initially available heap. + * + * @param cpuUtilization Current system CPU utilization (0.0 to 1.0) + * @throws InterruptedException if thread locking is interrupted + */ + public void adjustThreadPoolSizeBasedOnCPU(double cpuUtilization) throws InterruptedException { + lock.lock(); + try { + ThreadPoolExecutor executor = (ThreadPoolExecutor) this.boundedThreadPool; + int currentPoolSize = executor.getMaximumPoolSize(); + long currentHeap = getAvailableHeapMemory(); + long initialHeap = initialAvailableHeapMemory; + LOG.debug("Available heap memory: {} GB, Initial heap memory: {} GB", currentHeap, initialHeap); + LOG.debug("Current CPU Utilization: {}", cpuUtilization); + + if (cpuUtilization > (abfsConfiguration.getWriteHighCpuThreshold()/HUNDRED_D)) { + newMaxPoolSize = calculateReducedPoolSizeHighCPU(currentPoolSize, currentHeap, initialHeap); + } else if (cpuUtilization > (abfsConfiguration.getWriteMediumCpuThreshold()/HUNDRED_D)) { + newMaxPoolSize = calculateReducedPoolSizeMediumCPU(currentPoolSize, currentHeap, initialHeap); + } else if (cpuUtilization < (abfsConfiguration.getWriteLowCpuThreshold()/HUNDRED_D)) { + newMaxPoolSize = calculateIncreasedPoolSizeLowCPU(currentPoolSize, currentHeap, initialHeap); + } else { + newMaxPoolSize = currentPoolSize; + LOG.debug("CPU load normal ({}). No change: current={}", cpuUtilization, currentPoolSize); + } + + if (newMaxPoolSize != currentPoolSize) { + LOG.debug("Resizing thread pool from {} to {}", currentPoolSize, newMaxPoolSize); + adjustThreadPoolSize(newMaxPoolSize); + } + } finally { + lock.unlock(); + } + } + + /** + * Calculates reduced pool size under high CPU utilization. + */ + private int calculateReducedPoolSizeHighCPU(int currentPoolSize, long currentHeap, long initialHeap) { + if (currentHeap <= initialHeap / HIGH_MEDIUM_HEAP_FACTOR) { + LOG.debug("High CPU & low heap. Aggressively reducing: current={}, new={}", + currentPoolSize, currentPoolSize / HIGH_CPU_LOW_MEMORY_REDUCTION_FACTOR); + return Math.max(initialPoolSize, currentPoolSize / HIGH_CPU_LOW_MEMORY_REDUCTION_FACTOR); + } + int reduced = Math.max(initialPoolSize, currentPoolSize - currentPoolSize / HIGH_CPU_REDUCTION_FACTOR); + LOG.debug("High CPU ({}). Reducing pool size moderately: current={}, new={}", + abfsConfiguration.getWriteHighCpuThreshold(), currentPoolSize, reduced); + return reduced; + } + + /** + * Calculates reduced pool size under medium CPU utilization. + */ + private int calculateReducedPoolSizeMediumCPU(int currentPoolSize, long currentHeap, long initialHeap) { + if (currentHeap <= initialHeap / HIGH_MEDIUM_HEAP_FACTOR) { + int reduced = Math.max(initialPoolSize, currentPoolSize - currentPoolSize / MEDIUM_CPU_LOW_MEMORY_REDUCTION_FACTOR); + LOG.debug("Medium CPU & low heap. Reducing: current={}, new={}", currentPoolSize, reduced); + return reduced; + } + int reduced = Math.max(initialPoolSize, currentPoolSize - currentPoolSize / MEDIUM_CPU_REDUCTION_FACTOR); + LOG.debug("Medium CPU ({}). Moderate reduction: current={}, new={}", + abfsConfiguration.getWriteMediumCpuThreshold(), currentPoolSize, reduced); + return reduced; + } + + /** + * Calculates increased pool size under low CPU utilization. + */ + private int calculateIncreasedPoolSizeLowCPU(int currentPoolSize, long currentHeap, long initialHeap) { + if (currentHeap >= initialHeap * LOW_CPU_HEAP_FACTOR) { + int increased = Math.min(maxThreadPoolSize, (int) (currentPoolSize * LOW_CPU_POOL_SIZE_INCREASE_FACTOR)); + LOG.debug("Low CPU & healthy heap. Increasing: current={}, new={}", currentPoolSize, increased); + return increased; + } else { + // Decrease by 10% + int decreased = Math.max(1, (int) (currentPoolSize * LOW_CPU_HIGH_MEMORY_DECREASE_FACTOR)); + LOG.debug("Low CPU but insufficient heap ({} GB). Decreasing: current={}, new={}", currentHeap, currentPoolSize, decreased); + return decreased; + } + } + + + /** + * Returns the executor service for the thread pool. + * + * @return the executor service. + */ + public ExecutorService getExecutorService() { + return boundedThreadPool; + } + + public ScheduledExecutorService getCpuMonitorExecutor() { + return cpuMonitorExecutor; + } + + @Override + public void close() throws IOException { + synchronized (this) { + try { + // Shutdown executors + cpuMonitorExecutor.shutdown(); + HadoopExecutors.shutdown(boundedThreadPool, LOG, THIRTY_SECONDS, TimeUnit.SECONDS); + boundedThreadPool = null; + + // Remove from the map + POOL_SIZE_MANAGER_MAP.remove(filesystemName); + LOG.debug("Closed and removed instance for filesystem: {}", + filesystemName); + } catch (Exception e) { + LOG.warn("Failed to properly close instance for filesystem: {}", + filesystemName, e); + } + } + } +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java index fe4991c9582d5..a751101cf5741 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java @@ -173,6 +173,8 @@ public final class AbfsHttpConstants { public static final char CHAR_EQUALS = '='; public static final char CHAR_STAR = '*'; public static final char CHAR_PLUS = '+'; + public static final int LOW_HEAP_SPACE_FACTOR = 4; + public static final double MEDIUM_HEAP_SPACE_FACTOR = 8; public static final int SPLIT_NO_LIMIT = -1; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java index 50a88ab4e4587..62abce4455b97 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java @@ -356,6 +356,9 @@ public final class ConfigurationKeys { /** Add extra layer of verification of the integrity of the request content during transport: {@value}. */ public static final String FS_AZURE_ABFS_ENABLE_CHECKSUM_VALIDATION = "fs.azure.enable.checksum.validation"; + /** Add extra layer of verification of the integrity of the full blob request content during transport: {@value}. */ + public static final String FS_AZURE_ABFS_ENABLE_FULL_BLOB_CHECKSUM_VALIDATION = "fs.azure.enable.full.blob.checksum.validation"; + public static String accountProperty(String property, String account) { return property + DOT + account; } @@ -425,8 +428,41 @@ public static String containerProperty(String property, String fsName, String ac public static final String FS_AZURE_BLOB_DIR_RENAME_MAX_THREAD = "fs.azure.blob.dir.rename.max.thread"; /**Maximum number of thread per blob-delete orchestration: {@value}*/ public static final String FS_AZURE_BLOB_DIR_DELETE_MAX_THREAD = "fs.azure.blob.dir.delete.max.thread"; + /** + * Configuration key for the keep-alive time for the write thread pool. + * This value specifies the amount of time that threads in the write thread pool + * will remain idle before being terminated. + * Value: {@value}. + */ + public static final String FS_AZURE_WRITE_THREADPOOL_KEEP_ALIVE_TIME = "fs.azure.write.threadpool.keep.alive.time"; + + public static final String FS_AZURE_WRITE_CPU_MONITORING_INTERVAL = "fs.azure.write.cpu.monitoring.interval"; + + public static final String FS_AZURE_WRITE_DYNAMIC_THREADPOOL_ENABLEMENT = "fs.azure.write.dynamic.threadpool.enablement"; + + public static final String FS_AZURE_WRITE_HIGH_CPU_THRESHOLD = "fs.azure.write.high.cpu.threshold"; + + public static final String FS_AZURE_WRITE_MEDIUM_CPU_THRESHOLD = "fs.azure.write.medium.cpu.threshold"; + + public static final String FS_AZURE_WRITE_LOW_CPU_THRESHOLD = "fs.azure.write.low.cpu.threshold"; + + public static final String FS_AZURE_WRITE_LOW_TIER_MEMORY_MULTIPLIER = "fs.azure.write.low.tier.memory.multiplier"; + + public static final String FS_AZURE_WRITE_MEDIUM_TIER_MEMORY_MULTIPLIER = "fs.azure.write.medium.tier.memory.multiplier"; + + public static final String FS_AZURE_WRITE_HIGH_TIER_MEMORY_MULTIPLIER = "fs.azure.write.high.tier.memory.multiplier"; + + + /**Flag to enable/disable sending client transactional ID during create/rename operations: {@value}*/ public static final String FS_AZURE_ENABLE_CLIENT_TRANSACTION_ID = "fs.azure.enable.client.transaction.id"; + /** + * Configuration key for the core pool size of the write thread pool. + * This value specifies the number of threads to keep in the write thread pool, + * even if they are idle. + * Value: {@value}. + */ + public static final String FS_AZURE_WRITE_THREADPOOL_CORE_POOL_SIZE = "fs.azure.write.threadpool.core.pool.size"; private ConfigurationKeys() {} } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java index 8bcd55aee8e35..6da838b270bd0 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java @@ -36,7 +36,25 @@ public final class FileSystemConfigurations { public static final boolean DEFAULT_FS_AZURE_ACCOUNT_IS_EXPECT_HEADER_ENABLED = true; public static final String USER_HOME_DIRECTORY_PREFIX = "/user"; - private static final int SIXTY_SECONDS = 60_000; + public static final int SIXTY_SECONDS = 60; + public static final int THIRTY_SECONDS = 30; + /** + * Number of bytes in a gigabyte. + */ + public static final long BYTES_PER_GIGABYTE = 1024L * 1024 * 1024; + /** + * Factor by which the pool size is increased when CPU utilization is low. + */ + public static final double LOW_CPU_POOL_SIZE_INCREASE_FACTOR = 1.5; + public static final double LOW_CPU_HIGH_MEMORY_DECREASE_FACTOR = 0.9; + public static final int HIGH_CPU_REDUCTION_FACTOR = 3; + public static final int HIGH_CPU_LOW_MEMORY_REDUCTION_FACTOR = 2; + public static final int MEDIUM_CPU_REDUCTION_FACTOR = 5; + public static final int MEDIUM_CPU_LOW_MEMORY_REDUCTION_FACTOR = 3; + public static final int HIGH_MEDIUM_HEAP_FACTOR = 2; + public static final double LOW_CPU_HEAP_FACTOR = 0.8; + + // Retry parameter defaults. public static final int DEFAULT_MIN_BACKOFF_INTERVAL = 500; // 500ms @@ -147,6 +165,7 @@ public final class FileSystemConfigurations { public static final boolean DEFAULT_ENABLE_ABFS_RENAME_RESILIENCE = true; public static final boolean DEFAULT_ENABLE_PAGINATED_DELETE = false; public static final boolean DEFAULT_ENABLE_ABFS_CHECKSUM_VALIDATION = false; + public static final boolean DEFAULT_ENABLE_FULL_BLOB_ABFS_CHECKSUM_VALIDATION = false; /** * Limit of queued block upload operations before writes @@ -210,6 +229,7 @@ public final class FileSystemConfigurations { public static final int ZERO = 0; public static final int HUNDRED = 100; + public static final double HUNDRED_D = 100.0; public static final long THOUSAND = 1000L; public static final HttpOperationType DEFAULT_NETWORKING_LIBRARY @@ -237,6 +257,48 @@ public final class FileSystemConfigurations { public static final int DEFAULT_FS_AZURE_BLOB_DELETE_THREAD = DEFAULT_FS_AZURE_LISTING_ACTION_THREADS; + public static final boolean DEFAULT_WRITE_DYNAMIC_THREADPOOL_ENABLEMENT = true; + + public static final int DEFAULT_WRITE_THREADPOOL_KEEP_ALIVE_TIME = 30; + + public static final int MIN_WRITE_CPU_MONITORING_INTERVAL = 10; + + public static final int MAX_WRITE_CPU_MONITORING_INTERVAL = 60; + + public static final int DEFAULT_WRITE_CPU_MONITORING_INTERVAL = 15; + + public static final int MIN_WRITE_HIGH_CPU_THRESHOLD = 65; + + public static final int MAX_WRITE_HIGH_CPU_THRESHOLD = 90; + + public static final int DEFAULT_WRITE_HIGH_CPU_THRESHOLD = 80; + + public static final int MIN_WRITE_MEDIUM_CPU_THRESHOLD = 45; + + public static final int MAX_WRITE_MEDIUM_CPU_THRESHOLD = 65; + + public static final int DEFAULT_WRITE_MEDIUM_CPU_THRESHOLD = 60; + + public static final int MIN_WRITE_LOW_CPU_THRESHOLD = 10; + + public static final int MAX_WRITE_LOW_CPU_THRESHOLD = 40; + + public static final int DEFAULT_WRITE_LOW_CPU_THRESHOLD = 30; + + public static final int MIN_WRITE_LOW_TIER_MEMORY_MULTIPLIER = 3; + + public static final int DEFAULT_WRITE_LOW_TIER_MEMORY_MULTIPLIER = 4; + + public static final int MIN_WRITE_MEDIUM_TIER_MEMORY_MULTIPLIER = 6; + + public static final int DEFAULT_WRITE_MEDIUM_TIER_MEMORY_MULTIPLIER = 8; + + public static final int MIN_WRITE_HIGH_TIER_MEMORY_MULTIPLIER = 12; + + public static final int DEFAULT_WRITE_HIGH_TIER_MEMORY_MULTIPLIER = 16; + + public static final int DEFAULT_WRITE_THREADPOOL_CORE_POOL_SIZE = 1; + public static final boolean DEFAULT_FS_AZURE_ENABLE_CLIENT_TRANSACTION_ID = true; private FileSystemConfigurations() {} diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestWriteThreadPoolSizeManager.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestWriteThreadPoolSizeManager.java new file mode 100644 index 0000000000000..789922caaf7e6 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestWriteThreadPoolSizeManager.java @@ -0,0 +1,763 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs; + +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicIntegerArray; + +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; + +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ZERO; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +class TestWriteThreadPoolSizeManager extends AbstractAbfsIntegrationTest { + + private AbfsConfiguration mockConfig; + private static final double HIGH_CPU_UTILIZATION_THRESHOLD = 0.95; + private static final double LOW_CPU_UTILIZATION_THRESHOLD = 0.05; + private static final int THREAD_SLEEP_DURATION_MS = 200; + private static final String TEST_FILE_PATH = "testFilePath"; + private static final String TEST_DIR_PATH = "testDirPath"; + private static final int TEST_FILE_LENGTH = 1024 * 1024 * 8; + private static final int MAX_CONCURRENT_REQUEST_COUNT = 15; + private static final int CORE_POOL_SIZE = 1; + private static final int THREAD_POOL_KEEP_ALIVE_TIME = 10; + private static final int LOW_TIER_MEMORY_MULTIPLIER = 4; + private static final int MEDIUM_TIER_MEMORY_MULTIPLIER = 6; + private static final int HIGH_TIER_MEMORY_MULTIPLIER = 8; + private static final int HIGH_CPU_THRESHOLD = 15; + private static final int MEDIUM_CPU_THRESHOLD = 10; + private static final int LOW_CPU_THRESHOLD = 5; + private static final int CPU_MONITORING_INTERVAL = 15; + private static final int WAIT_DURATION_MS = 3000; + private static final int LATCH_TIMEOUT_SECONDS = 60; + private static final int RESIZE_WAIT_TIME_MS = 6_000; + private static final double HIGH_CPU_USAGE_RATIO = 0.95; + private static final double LOW_CPU_USAGE_RATIO = 0.05; + private static final int SLEEP_DURATION_MS = 150; + private static final int AWAIT_TIMEOUT_SECONDS = 45; + private static final int RESIZER_JOIN_TIMEOUT_MS = 2_000; + private static final int WAIT_TIMEOUT_MS = 5000; + private static final int SLEEP_DURATION_30S_MS = 30000; + private static final int SMALL_PAUSE_MS = 50; + private static final int BURST_LOAD = 50; + private static final long LOAD_SLEEP_DURATION_MS = 2000; + + TestWriteThreadPoolSizeManager() throws Exception { + super.setup(); + } + + /** + * Common setup to prepare a mock configuration for each test. + */ + @BeforeEach + public void setUp() { + mockConfig = mock(AbfsConfiguration.class); + when(mockConfig.getWriteMaxConcurrentRequestCount()).thenReturn(MAX_CONCURRENT_REQUEST_COUNT); + when(mockConfig.getWriteCorePoolSize()).thenReturn(CORE_POOL_SIZE); + when(mockConfig.getWriteThreadPoolKeepAliveTime()).thenReturn(THREAD_POOL_KEEP_ALIVE_TIME); + when(mockConfig.getLowTierMemoryMultiplier()).thenReturn(LOW_TIER_MEMORY_MULTIPLIER); + when(mockConfig.getMediumTierMemoryMultiplier()).thenReturn(MEDIUM_TIER_MEMORY_MULTIPLIER); + when(mockConfig.getHighTierMemoryMultiplier()).thenReturn(HIGH_TIER_MEMORY_MULTIPLIER); + when(mockConfig.getWriteHighCpuThreshold()).thenReturn(HIGH_CPU_THRESHOLD); + when(mockConfig.getWriteMediumCpuThreshold()).thenReturn(MEDIUM_CPU_THRESHOLD); + when(mockConfig.getWriteLowCpuThreshold()).thenReturn(LOW_CPU_THRESHOLD); + when(mockConfig.getWriteCpuMonitoringInterval()).thenReturn(CPU_MONITORING_INTERVAL); + } + + /** + * Ensures that {@link WriteThreadPoolSizeManager#getInstance(String, AbfsConfiguration)} returns a singleton per key. + */ + @Test + void testGetInstanceReturnsSingleton() { + WriteThreadPoolSizeManager instance1 + = WriteThreadPoolSizeManager.getInstance("testfs", mockConfig); + WriteThreadPoolSizeManager instance2 + = WriteThreadPoolSizeManager.getInstance("testfs", mockConfig); + WriteThreadPoolSizeManager instance3 = + WriteThreadPoolSizeManager.getInstance("newFs", mockConfig); + Assertions.assertThat(instance1) + .as("Expected the same singleton instance for the same key") + .isSameAs(instance2); + Assertions.assertThat(instance1) + .as("Expected the same singleton instance for the same key") + .isNotSameAs(instance3); + } + + /** + /** + * Tests that high CPU usage results in thread pool downscaling. + */ + @Test + void testAdjustThreadPoolSizeBasedOnHighCPU() throws InterruptedException, IOException { + // Get the executor service (ThreadPoolExecutor) + WriteThreadPoolSizeManager instance + = WriteThreadPoolSizeManager.getInstance("testfsHigh", + getAbfsStore(getFileSystem()).getAbfsConfiguration()); + ExecutorService executor = instance.getExecutorService(); + ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor; + + // Simulate high CPU usage (e.g., 95% CPU utilization) + int initialMaxSize = threadPoolExecutor.getMaximumPoolSize(); + instance.adjustThreadPoolSizeBasedOnCPU(HIGH_CPU_UTILIZATION_THRESHOLD); // High CPU + + // Get the new maximum pool size after adjustment + int newMaxSize = threadPoolExecutor.getMaximumPoolSize(); + + // Assert that the pool size has decreased or is equal to initial PoolSize based on high CPU usage + Assertions.assertThat(newMaxSize) + .as("Expected pool size to decrease under high CPU usage") + .isLessThanOrEqualTo(initialMaxSize); + instance.close(); + } + + /** + * Tests that low CPU usage results in thread pool upscaling or remains the same. + */ + @Test + void testAdjustThreadPoolSizeBasedOnLowCPU() + throws InterruptedException, IOException { + WriteThreadPoolSizeManager instance + = WriteThreadPoolSizeManager.getInstance("testfsLow", + getAbfsStore(getFileSystem()).getAbfsConfiguration()); + ExecutorService executor = instance.getExecutorService(); + int initialSize = ((ThreadPoolExecutor) executor).getMaximumPoolSize(); + instance.adjustThreadPoolSizeBasedOnCPU(LOW_CPU_UTILIZATION_THRESHOLD); // Low CPU + int newSize = ((ThreadPoolExecutor) executor).getMaximumPoolSize(); + Assertions.assertThat(newSize) + .as("Expected pool size to increase or stay the same under low CPU usage") + .isGreaterThanOrEqualTo(initialSize); + instance.close(); + } + + + /** + * Confirms that the thread pool executor is initialized and not shut down. + */ + @Test + void testExecutorServiceIsNotNull() throws IOException { + WriteThreadPoolSizeManager instance + = WriteThreadPoolSizeManager.getInstance("testfsExec", mockConfig); + ExecutorService executor = instance.getExecutorService(); + Assertions.assertThat(executor).as("Executor service should be initialized") + .isNotNull(); + Assertions.assertThat(executor.isShutdown()) + .as("Executor service should not be shut down") + .isFalse(); + instance.close(); + } + + + /** + * Ensures that calling {@link WriteThreadPoolSizeManager#close()} cleans up resources. + */ + @Test + void testCloseCleansUp() throws Exception { + WriteThreadPoolSizeManager instance + = WriteThreadPoolSizeManager.getInstance("testfsClose", mockConfig); + ExecutorService executor = instance.getExecutorService(); + instance.close(); + Assertions.assertThat(executor.isShutdown() || executor.isTerminated()) + .as("Executor service should be shut down or terminated after close()") + .isTrue(); + } + + /** + * Test that the CPU monitoring task is scheduled properly when startCPUMonitoring() is called. + * This test checks the following: + * 1. That the CPU monitoring task gets scheduled by verifying that the CPU monitor executor is not null. + * 2. Ensures that the thread pool executor has at least one thread running, confirming that the task is being executed. + * @throws InterruptedException if the test is interrupted during the sleep time + */ + @Test + void testStartCPUMonitoringSchedulesTask() + throws InterruptedException, IOException { + // Create a new instance of WriteThreadPoolSizeManager using a mock configuration + WriteThreadPoolSizeManager instance + = WriteThreadPoolSizeManager.getInstance("testScheduler", mockConfig); + + // Call startCPUMonitoring to schedule the monitoring task + instance.startCPUMonitoring(); + + // Wait for a short period to allow the task to run and be scheduled + Thread.sleep(THREAD_SLEEP_DURATION_MS); + + // Retrieve the CPU monitor executor (ScheduledThreadPoolExecutor) from the instance + ScheduledThreadPoolExecutor monitor + = (ScheduledThreadPoolExecutor) instance.getCpuMonitorExecutor(); + + // Assert that the monitor executor is not null, ensuring that it was properly initialized + Assertions.assertThat(monitor) + .as("CPU Monitor Executor should not be null") + .isNotNull(); + + // Assert that the thread pool size is greater than 0, confirming that the task has been scheduled and threads are active + Assertions.assertThat(monitor.getPoolSize()) + .as("Thread pool size should be greater than 0, indicating that the task is running") + .isGreaterThan(ZERO); + instance.close(); + } + + /** + * Verifies that ABFS write tasks can complete successfully even when the system + * is under artificial CPU stress. The test also ensures that the write thread + * pool resizes dynamically under load without leading to starvation, overload, + * or leftover work in the queue. + */ + @Test + void testABFSWritesUnderCPUStress() throws Exception { + // Initialize the filesystem and thread pool manager + AzureBlobFileSystem fs = getFileSystem(); + WriteThreadPoolSizeManager instance = + WriteThreadPoolSizeManager.getInstance(getFileSystemName(), mockConfig); + ThreadPoolExecutor executor = + (ThreadPoolExecutor) instance.getExecutorService(); + + // Start CPU monitoring so pool size adjustments happen in response to load + instance.startCPUMonitoring(); + + // Launch a background thread that generates CPU stress for ~3 seconds. + // This simulates contention on the system and should cause the pool to adjust. + Thread stressThread = new Thread(() -> { + long end = System.currentTimeMillis() + WAIT_DURATION_MS; + while (System.currentTimeMillis() < end) { + // Busy-work loop: repeatedly compute random powers to waste CPU cycles + double waste = Math.pow(Math.random(), Math.random()); + } + }); + stressThread.start(); + + // Prepare the ABFS write workload with multiple concurrent tasks + int taskCount = 10; + CountDownLatch latch = new CountDownLatch(taskCount); + Path testFile = new Path(TEST_FILE_PATH); + final byte[] b = new byte[TEST_FILE_LENGTH]; + new Random().nextBytes(b); + + // Submit 10 tasks, each writing to its own file to simulate parallel load + for (int i = 0; i < taskCount; i++) { + int finalI = i; + executor.submit(() -> { + try (FSDataOutputStream out = fs.create( + new Path(testFile + "_" + finalI), true)) { + for (int j = 0; j < 5; j++) { + out.write(b); // perform multiple writes to add sustained pressure + } + out.hflush(); // flush to force actual I/O + } catch (IOException e) { + // Any failure here indicates pool misbehavior or I/O issues + Assertions.fail("Write task failed with exception", e); + } finally { + // Mark this task as complete + latch.countDown(); + } + }); + } + + // Wait for all tasks to finish (up to 60s timeout to guard against deadlock/starvation) + boolean finished = latch.await(LATCH_TIMEOUT_SECONDS, TimeUnit.SECONDS); + + // Record the pool size after CPU stress to confirm resizing took place + int resizedPoolSize = executor.getMaximumPoolSize(); + + // 1. All tasks must finish within timeout → proves no starvation or deadlock + Assertions.assertThat(finished) + .as("All ABFS write tasks should complete without starvation") + .isTrue(); + + // 2. Pool size must fall within valid bounds → proves resizing occurred + Assertions.assertThat(resizedPoolSize) + .as("Thread pool size should dynamically adjust under CPU stress") + .isBetween(1, getAbfsStore(fs).getAbfsConfiguration().getWriteMaxConcurrentRequestCount()); + + // 3. Task queue must be empty → proves no backlog remains after workload + Assertions.assertThat(executor.getQueue().size()) + .as("No backlog should remain in task queue after completion") + .isEqualTo(0); + + // Cleanup resources + instance.close(); + } + + + /** + * Ensures that dynamic thread pool resizing during an active ABFS write workload + * does not cause deadlocks, task loss, or task duplication. The test also verifies + * that the pool resizes while work is in progress and that the executor queue + * eventually drains cleanly. + */ + @Test + void testDynamicResizeNoDeadlocksNoTaskLoss() throws Exception { + // Initialize filesystem and thread pool manager + AzureBlobFileSystem fs = getFileSystem(); + WriteThreadPoolSizeManager mgr = + WriteThreadPoolSizeManager.getInstance(getFileSystemName(), mockConfig); + ThreadPoolExecutor executor = (ThreadPoolExecutor) mgr.getExecutorService(); + + // Enable monitoring (may not be required if adjust() is triggered internally) + mgr.startCPUMonitoring(); + + // Test configuration: enough tasks and writes to stress the pool + final int taskCount = 10; + final int writesPerTask = 5; + final byte[] b = new byte[TEST_FILE_LENGTH]; + new Random().nextBytes(b); + final Path base = new Path(TEST_DIR_PATH); + fs.mkdirs(base); + + // Barrier ensures all tasks start together, so resizing happens mid-flight + final CyclicBarrier startBarrier = new CyclicBarrier(taskCount + 1); + final CountDownLatch done = new CountDownLatch(taskCount); + + // Track execution results + final AtomicIntegerArray completed = new AtomicIntegerArray(taskCount); // mark tasks once + final AtomicInteger duplicates = new AtomicInteger(0); // guard against double-completion + final AtomicInteger rejected = new AtomicInteger(0); // count unexpected rejections + + // Submit ABFS write tasks + for (int i = 0; i < taskCount; i++) { + final int id = i; + try { + executor.submit(() -> { + try { + // Hold until all tasks are enqueued, then start together + startBarrier.await(10, TimeUnit.SECONDS); + + // Each task writes to its own file, flushing intermittently + Path subPath = new Path(base, "part-" + id); + try (FSDataOutputStream out = fs.create(subPath)) { + for (int w = 0; w < writesPerTask; w++) { + out.write(b); + if ((w & 1) == 1) { + out.hflush(); // force some syncs to increase contention + } + } + out.hflush(); + } + + // Mark task as completed once; duplicates flag if it happens again + if (!completed.compareAndSet(id, 0, 1)) { + duplicates.incrementAndGet(); + } + } catch (Exception e) { + Assertions.fail("ABFS write task " + id + " failed", e); + } finally { + done.countDown(); + } + }); + } catch (RejectedExecutionException rex) { + rejected.incrementAndGet(); + } + } + + // Thread that simulates fluctuating CPU load while tasks are running + final AtomicInteger observedMinMax = new AtomicInteger(executor.getMaximumPoolSize()); + final AtomicInteger observedMaxMax = new AtomicInteger(executor.getMaximumPoolSize()); + + Thread resizer = new Thread(() -> { + try { + // Release worker tasks + startBarrier.await(10, TimeUnit.SECONDS); + + long end = System.currentTimeMillis() + RESIZE_WAIT_TIME_MS; // keep resizing for ~6s + boolean high = true; + while (System.currentTimeMillis() < end) { + // Alternate between high load (shrink) and low load (expand) + if (high) { + mgr.adjustThreadPoolSizeBasedOnCPU(HIGH_CPU_USAGE_RATIO ); + } else { + mgr.adjustThreadPoolSizeBasedOnCPU(LOW_CPU_USAGE_RATIO); + } + high = !high; + + // Track observed pool size bounds to prove resizing occurred + int cur = executor.getMaximumPoolSize(); + observedMinMax.updateAndGet(prev -> Math.min(prev, cur)); + observedMaxMax.updateAndGet(prev -> Math.max(prev, cur)); + + Thread.sleep(SLEEP_DURATION_MS ); + } + } catch (Exception ignore) { + // No-op: this is best-effort simulation + } + }, "resizer-thread"); + + resizer.start(); + + // Wait for all tasks to finish (ensures no deadlock) + boolean finished = done.await(AWAIT_TIMEOUT_SECONDS , TimeUnit.SECONDS); + + // Join resizer thread + resizer.join(RESIZER_JOIN_TIMEOUT_MS); + + // 1. All tasks must complete in time → proves there are no deadlocks + Assertions.assertThat(finished) + .as("All tasks must complete within timeout (no deadlock)") + .isTrue(); + + // 2. Every task should complete exactly once → proves no task loss + int completedCount = 0; + for (int i = 0; i < taskCount; i++) { + completedCount += completed.get(i); + } + Assertions.assertThat(completedCount) + .as("Every task should complete exactly once (no task loss)") + .isEqualTo(taskCount); + + // 3. No task should mark itself as done more than once → proves no duplication + Assertions.assertThat(duplicates.get()) + .as("No task should report completion more than once (no duplication)") + .isZero(); + + // 4. The executor should not reject tasks while resizing is happening + Assertions.assertThat(rejected.get()) + .as("Tasks should not be rejected during active resizing") + .isZero(); + + // 5. Executor queue should eventually empty once all tasks finish + Assertions.assertThat(executor.getQueue().size()) + .as("Executor queue should drain after workload") + .isEqualTo(0); + + // 6. Executor should still be running after workload until explicitly closed + Assertions.assertThat(executor.isShutdown()) + .as("Executor should remain running until manager.close()") + .isFalse(); + + // 7. Verify that resizing actually occurred (pool max both grew and shrank) + int minObserved = observedMinMax.get(); + int maxObserved = observedMaxMax.get(); + + Assertions.assertThat(maxObserved) + .as("Pool maximum size should have increased or fluctuated above baseline") + .isGreaterThan(0); + + Assertions.assertThat(minObserved) + .as("Pool maximum size should have dropped during resizing") + .isLessThanOrEqualTo(maxObserved); + + // Cleanup + for (int i = 0; i < taskCount; i++) { + Path p = new Path(base, "part-" + i); + try { + fs.delete(p, false); + } catch (IOException ignore) { + // Ignored: delete failures are non-fatal for test cleanup + } + } + try { + fs.delete(base, true); + } catch (IOException ignore) { + // Ignored: cleanup failures are non-fatal in tests + } + mgr.close(); + } + + + + /** + * Verifies that when the system experiences high CPU usage, + * the WriteThreadPoolSizeManager detects the load and reduces + * the maximum thread pool size accordingly. + */ + @Test + void testThreadPoolScalesDownOnHighCpuLoad() throws Exception { + // Initialize filesystem and thread pool manager + AzureBlobFileSystem fs = getFileSystem(); + WriteThreadPoolSizeManager instance = + WriteThreadPoolSizeManager.getInstance(getFileSystemName(), mockConfig); + ThreadPoolExecutor executor = + (ThreadPoolExecutor) instance.getExecutorService(); + + // Start monitoring CPU load + instance.startCPUMonitoring(); + + // Capture baseline pool size for comparison later + int initialMax = executor.getMaximumPoolSize(); + + // Define a CPU-bound task: tight loop of math ops for ~5s + Runnable cpuBurn = () -> { + long end = System.currentTimeMillis() + WAIT_TIMEOUT_MS; + while (System.currentTimeMillis() < end) { + double waste = Math.sin(Math.random()) * Math.cos(Math.random()); + } + }; + + // Launch two CPU hogs in parallel + Thread cpuHog1 = new Thread(cpuBurn, "cpu-hog-thread-1"); + Thread cpuHog2 = new Thread(cpuBurn, "cpu-hog-thread-2"); + cpuHog1.start(); + cpuHog2.start(); + + // Submit multiple write tasks while CPU is under stress + int taskCount = 10; + CountDownLatch latch = new CountDownLatch(taskCount); + Path base = new Path(TEST_DIR_PATH); + fs.mkdirs(base); + final byte[] buffer = new byte[TEST_FILE_LENGTH]; + new Random().nextBytes(buffer); + + for (int i = 0; i < taskCount; i++) { + final Path part = new Path(base, "part-" + i); + executor.submit(() -> { + try (FSDataOutputStream out = fs.create(part, true)) { + for (int j = 0; j < 5; j++) { + out.write(buffer); + out.hflush(); + } + } catch (IOException e) { + Assertions.fail("Write task failed under CPU stress", e); + } finally { + latch.countDown(); + } + }); + } + + // Ensure all tasks complete (avoid deadlock/starvation) + boolean finished = latch.await(LATCH_TIMEOUT_SECONDS, TimeUnit.SECONDS); + + // Wait for CPU hogs to end and give monitor time to react + cpuHog1.join(); + cpuHog2.join(); + Thread.sleep(SLEEP_DURATION_30S_MS); + + int resizedMax = executor.getMaximumPoolSize(); + + // Verify outcomes: + // 1. All write tasks succeeded despite CPU pressure + Assertions.assertThat(finished) + .as("All ABFS write tasks must complete despite CPU stress") + .isTrue(); + + // 2. Thread pool scaled down as expected + Assertions.assertThat(resizedMax) + .as("Thread pool should scale down under high CPU load") + .isLessThanOrEqualTo(initialMax); + + // 3. No leftover tasks in the queue + Assertions.assertThat(executor.getQueue().size()) + .as("No backlog should remain in the queue after workload") + .isEqualTo(0); + + // Cleanup test data + for (int i = 0; i < taskCount; i++) { + try { + fs.delete(new Path(base, "part-" + i), false); + } catch (IOException ignore) { + // Ignored: cleanup failures are non-fatal in tests + } + } + try { + fs.delete(base, true); + } catch (IOException ignore) { + // Ignored: cleanup failures are non-fatal in tests + } + instance.close(); + } + + + /** + * Verifies that when two parallel high memory–consuming workloads run, + * the WriteThreadPoolSizeManager detects the memory pressure and + * scales down the maximum thread pool size. + */ + @Test + void testScalesDownOnParallelHighMemoryLoad() throws Exception { + // Initialize filesystem and thread pool manager + AzureBlobFileSystem fs = getFileSystem(); + WriteThreadPoolSizeManager instance = + WriteThreadPoolSizeManager.getInstance(getFileSystemName(), mockConfig); + ThreadPoolExecutor executor = + (ThreadPoolExecutor) instance.getExecutorService(); + + // Begin monitoring resource usage (CPU + memory) + instance.startCPUMonitoring(); + + // Capture the initial thread pool size for later comparison + int initialMax = executor.getMaximumPoolSize(); + + // Define a workload that continuously allocates memory (~5 MB chunks) + // for ~5 seconds to simulate memory pressure in the JVM. + Runnable memoryBurn = () -> { + List allocations = new ArrayList<>(); + long end = System.currentTimeMillis() + WAIT_TIMEOUT_MS ; + while (System.currentTimeMillis() < end) { + try { + allocations.add(new byte[5 * 1024 * 1024]); // allocate 5 MB + Thread.sleep(SMALL_PAUSE_MS); // small pause to avoid instant OOM + } catch (OutOfMemoryError oom) { + // Clear allocations if JVM runs out of memory and continue + allocations.clear(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + }; + + // Start two threads running the memory hog workload in parallel + Thread memHog1 = new Thread(memoryBurn, "mem-hog-thread-1"); + Thread memHog2 = new Thread(memoryBurn, "mem-hog-thread-2"); + memHog1.start(); + memHog2.start(); + + // Submit several write tasks to ABFS while memory is under stress + int taskCount = 10; + CountDownLatch latch = new CountDownLatch(taskCount); + Path base = new Path(TEST_DIR_PATH); + fs.mkdirs(base); + final byte[] buffer = new byte[TEST_FILE_LENGTH]; + new Random().nextBytes(buffer); + + for (int i = 0; i < taskCount; i++) { + final Path part = new Path(base, "part-" + i); + executor.submit(() -> { + try (FSDataOutputStream out = fs.create(part, true)) { + for (int j = 0; j < 5; j++) { + out.write(buffer); + out.hflush(); + } + } catch (IOException e) { + Assertions.fail("Write task failed under memory stress", e); + } finally { + latch.countDown(); + } + }); + } + + // Ensure all tasks finish within a timeout + boolean finished = latch.await(LATCH_TIMEOUT_SECONDS, TimeUnit.SECONDS); + + // Wait for memory hog threads to finish + memHog1.join(); + memHog2.join(); + + // Give monitoring thread time to detect memory pressure and react + Thread.sleep(SLEEP_DURATION_30S_MS); + + int resizedMax = executor.getMaximumPoolSize(); + + // Validate that: + // 1. All ABFS writes succeeded despite memory stress + Assertions.assertThat(finished) + .as("All ABFS write tasks must complete despite parallel memory stress") + .isTrue(); + + // 2. The thread pool scaled down under memory pressure + Assertions.assertThat(resizedMax) + .as("Thread pool should scale down under parallel high memory load") + .isLessThanOrEqualTo(initialMax); + + // 3. No tasks remain queued after workload completion + Assertions.assertThat(executor.getQueue().size()) + .as("No backlog should remain in the queue after workload") + .isEqualTo(0); + + // Clean up temporary test files + for (int i = 0; i < taskCount; i++) { + try { + fs.delete(new Path(base, "part-" + i), false); + } catch (IOException ignore) { + // Ignored: cleanup failures are non-fatal in tests + } + } + try { + fs.delete(base, true); + } catch (IOException ignore) { + // Ignored: cleanup failures are non-fatal in tests + } + instance.close(); + } + + /** + * Test that after a long idle period, the thread pool + * can quickly scale up in response to a sudden burst of load + * without performance degradation. + */ + @Test + void testThreadPoolScalesUpAfterIdleBurstLoad() throws Exception { + // Initialize filesystem and thread pool manager + AzureBlobFileSystem fs = getFileSystem(); + WriteThreadPoolSizeManager instance = + WriteThreadPoolSizeManager.getInstance(getFileSystemName(), + fs.getAbfsStore().getAbfsConfiguration()); + ThreadPoolExecutor executor = + (ThreadPoolExecutor) instance.getExecutorService(); + + // --- Step 1: Simulate idle period --- + // Let the executor sit idle with no work for a few seconds + Thread.sleep(WAIT_TIMEOUT_MS); + int poolSizeAfterIdle = executor.getPoolSize(); + + // Verify that after idling, the pool is at or close to its minimum size + Assertions.assertThat(poolSizeAfterIdle) + .as("Pool size should remain minimal after idle") + .isLessThanOrEqualTo(executor.getCorePoolSize()); + + // --- Step 2: Submit a sudden burst of tasks --- + // Launch many short, CPU-heavy tasks at once to simulate burst load + int burstLoad = BURST_LOAD; + CountDownLatch latch = new CountDownLatch(burstLoad); + for (int i = 0; i < burstLoad; i++) { + executor.submit(() -> { + // Busy loop for ~200ms to simulate CPU work + long end = System.currentTimeMillis() + THREAD_SLEEP_DURATION_MS; + while (System.currentTimeMillis() < end) { + Math.sqrt(Math.random()); // burn CPU cycles + } + latch.countDown(); + }); + } + + // --- Step 3: Give pool time to react --- + // Wait briefly so the pool’s scaling logic has a chance to expand + Thread.sleep(LOAD_SLEEP_DURATION_MS); + int poolSizeDuringBurst = executor.getPoolSize(); + + // Verify that the pool scaled up compared to idle + Assertions.assertThat(poolSizeDuringBurst) + .as("Pool size should increase after burst load") + .isGreaterThanOrEqualTo(poolSizeAfterIdle); + +// --- Step 4: Verify completion --- +// Ensure all tasks complete successfully in a reasonable time, +// proving there was no degradation or deadlock under burst load + Assertions.assertThat(latch.await(LATCH_TIMEOUT_SECONDS/2, TimeUnit.SECONDS)) + .as("All burst tasks should finish in reasonable time") + .isTrue(); + instance.close(); + } +} +