Skip to content

HADOOP-19472: [ABFS] Improve write workload performance for ABFS by efficient concurrency utilization #7669

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 24 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
1193511
config changes
anmolanmol1234 Mar 3, 2025
198fea8
merge trunk
anmolanmol1234 Mar 24, 2025
6bc388e
fix for executor closing down
anmolanmol1234 Apr 2, 2025
27cfcd8
Merge branch 'trunk' of https://github.com/anmolanmol1234/hadoop into…
anmolanmol1234 Apr 7, 2025
8dd82e2
changing total threads
anmolanmol1234 Apr 16, 2025
e88c784
fix no of threads
anmolanmol1234 Apr 16, 2025
fd699b1
fix max thread pool size
anmolanmol1234 Apr 16, 2025
a9b6803
Changes in pool size
anmolanmol1234 Apr 18, 2025
ab11da9
changin minimum poll size
anmolanmol1234 Apr 21, 2025
108ad28
added test
anmolanmol1234 Apr 30, 2025
f233943
fix constants
anmolanmol1234 May 3, 2025
1ba12f6
fix merge conflicts
anmolanmol1234 May 3, 2025
8fe0842
spotbugs and checkstyle
anmolanmol1234 May 7, 2025
c56bdcb
fix spot bug
anmolanmol1234 May 7, 2025
ca7be88
Merge branch 'trunk' of https://github.com/anmolanmol1234/hadoop into…
anmolanmol1234 May 27, 2025
5e79b10
fix variable and logging
anmolanmol1234 May 27, 2025
35d293b
Merge branch 'trunk' of https://github.com/anmolanmol1234/hadoop into…
anmolanmol1234 Jul 14, 2025
73a8012
Merge conflicts with trunk
anmolanmol1234 Aug 7, 2025
c41d9c8
Changes in write thread pool manager
anmolanmol1234 Aug 8, 2025
9cd431c
Merge branch 'trunk' of https://github.com/anmolanmol1234/hadoop into…
anmolanmol1234 Aug 11, 2025
371d427
Merge branch 'trunk' of https://github.com/anmolanmol1234/hadoop into…
anmolanmol1234 Aug 18, 2025
3164050
New configurations
anmolanmol1234 Aug 18, 2025
db7ae1f
additional config
anmolanmol1234 Aug 18, 2025
812ea46
Added integration tests
anmolanmol1234 Aug 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,10 @@ public class AbfsConfiguration{
FS_AZURE_ABFS_ENABLE_CHECKSUM_VALIDATION, DefaultValue = DEFAULT_ENABLE_ABFS_CHECKSUM_VALIDATION)
private boolean isChecksumValidationEnabled;

@BooleanConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_ABFS_ENABLE_FULL_BLOB_CHECKSUM_VALIDATION, DefaultValue = DEFAULT_ENABLE_FULL_BLOB_ABFS_CHECKSUM_VALIDATION)
private boolean isFullBlobChecksumValidationEnabled;

@BooleanConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_ENABLE_PAGINATED_DELETE, DefaultValue = DEFAULT_ENABLE_PAGINATED_DELETE)
private boolean isPaginatedDeleteEnabled;
Expand Down Expand Up @@ -474,6 +478,57 @@ public class AbfsConfiguration{
FS_AZURE_APACHE_HTTP_CLIENT_MAX_IO_EXCEPTION_RETRIES, DefaultValue = DEFAULT_APACHE_HTTP_CLIENT_MAX_IO_EXCEPTION_RETRIES)
private int maxApacheHttpClientIoExceptionsRetries;

@BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_WRITE_DYNAMIC_THREADPOOL_ENABLEMENT,
DefaultValue = DEFAULT_WRITE_DYNAMIC_THREADPOOL_ENABLEMENT)
private boolean dynamicWriteThreadPoolEnablement;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_WRITE_THREADPOOL_KEEP_ALIVE_TIME,
DefaultValue = DEFAULT_WRITE_THREADPOOL_KEEP_ALIVE_TIME)
private int writeThreadPoolKeepAliveTime;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_WRITE_CPU_MONITORING_INTERVAL,
MinValue = MIN_WRITE_CPU_MONITORING_INTERVAL,
MaxValue = MAX_WRITE_CPU_MONITORING_INTERVAL,
DefaultValue = DEFAULT_WRITE_CPU_MONITORING_INTERVAL)
private int writeCpuMonitoringInterval;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_WRITE_THREADPOOL_CORE_POOL_SIZE,
DefaultValue = DEFAULT_WRITE_THREADPOOL_CORE_POOL_SIZE)
private int writeCorePoolSize;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_WRITE_HIGH_CPU_THRESHOLD,
MinValue = MIN_WRITE_HIGH_CPU_THRESHOLD,
MaxValue = MAX_WRITE_HIGH_CPU_THRESHOLD,
DefaultValue = DEFAULT_WRITE_HIGH_CPU_THRESHOLD)
private int writeHighCpuThreshold;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_WRITE_MEDIUM_CPU_THRESHOLD,
MinValue = MIN_WRITE_MEDIUM_CPU_THRESHOLD,
MaxValue = MAX_WRITE_MEDIUM_CPU_THRESHOLD,
DefaultValue = DEFAULT_WRITE_MEDIUM_CPU_THRESHOLD)
private int writeMediumCpuThreshold;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_WRITE_LOW_CPU_THRESHOLD,
MinValue = MIN_WRITE_LOW_CPU_THRESHOLD,
MaxValue = MAX_WRITE_LOW_CPU_THRESHOLD,
DefaultValue = DEFAULT_WRITE_LOW_CPU_THRESHOLD)
private int writeLowCpuThreshold;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_WRITE_LOW_TIER_MEMORY_MULTIPLIER,
MinValue = MIN_WRITE_LOW_TIER_MEMORY_MULTIPLIER,
DefaultValue = DEFAULT_WRITE_LOW_TIER_MEMORY_MULTIPLIER)
private int lowTierMemoryMultiplier;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_WRITE_MEDIUM_TIER_MEMORY_MULTIPLIER,
MinValue = MIN_WRITE_MEDIUM_TIER_MEMORY_MULTIPLIER,
DefaultValue = DEFAULT_WRITE_MEDIUM_TIER_MEMORY_MULTIPLIER)
private int mediumTierMemoryMultiplier;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_WRITE_HIGH_TIER_MEMORY_MULTIPLIER,
MinValue = MIN_WRITE_HIGH_TIER_MEMORY_MULTIPLIER,
DefaultValue = DEFAULT_WRITE_HIGH_TIER_MEMORY_MULTIPLIER)
private int highTierMemoryMultiplier;

/**
* Max idle TTL configuration for connection given in
* {@value org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys#FS_AZURE_APACHE_HTTP_CLIENT_IDLE_CONNECTION_TTL}
Expand Down Expand Up @@ -1560,6 +1615,46 @@ public int getWriteMaxConcurrentRequestCount() {
return this.writeMaxConcurrentRequestCount;
}

public int getWriteThreadPoolKeepAliveTime() {
return writeThreadPoolKeepAliveTime;
}

public int getWriteCpuMonitoringInterval() {
return writeCpuMonitoringInterval;
}

public boolean isDynamicWriteThreadPoolEnablement() {
return dynamicWriteThreadPoolEnablement;
}

public int getWriteLowCpuThreshold() {
return writeLowCpuThreshold;
}

public int getWriteMediumCpuThreshold() {
return writeMediumCpuThreshold;
}

public int getWriteHighCpuThreshold() {
return writeHighCpuThreshold;
}

public int getLowTierMemoryMultiplier() {
return lowTierMemoryMultiplier;
}

public int getMediumTierMemoryMultiplier() {
return mediumTierMemoryMultiplier;
}

public int getHighTierMemoryMultiplier() {
return highTierMemoryMultiplier;
}

public int getWriteCorePoolSize() {
return writeCorePoolSize;
}

public int getMaxWriteRequestsToQueue() {
if (this.maxWriteRequestsToQueue < 1) {
return 2 * getWriteMaxConcurrentRequestCount();
Expand Down Expand Up @@ -1705,6 +1800,10 @@ public void setIsChecksumValidationEnabled(boolean isChecksumValidationEnabled)
this.isChecksumValidationEnabled = isChecksumValidationEnabled;
}

public boolean isFullBlobChecksumValidationEnabled() {
return isFullBlobChecksumValidationEnabled;
}

public long getBlobCopyProgressPollWaitMillis() {
return blobCopyProgressPollWaitMillis;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.WeakHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -134,7 +135,6 @@
import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.util.SemaphoredDelegatingExecutor;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.http.client.utils.URIBuilder;

import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.METADATA_INCOMPLETE_RENAME_FAILURES;
Expand Down Expand Up @@ -203,6 +203,7 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
private int blockOutputActiveBlocks;
/** Bounded ThreadPool for this instance. */
private ExecutorService boundedThreadPool;
private WriteThreadPoolSizeManager poolSizeManager;

/** ABFS instance reference to be held by the store to avoid GC close. */
private BackReference fsBackRef;
Expand Down Expand Up @@ -277,11 +278,19 @@ public AzureBlobFileSystemStore(
}
this.blockFactory = abfsStoreBuilder.blockFactory;
this.blockOutputActiveBlocks = abfsStoreBuilder.blockOutputActiveBlocks;
this.boundedThreadPool = BlockingThreadPoolExecutorService.newInstance(
abfsConfiguration.getWriteMaxConcurrentRequestCount(),
abfsConfiguration.getMaxWriteRequestsToQueue(),
10L, TimeUnit.SECONDS,
"abfs-bounded");
if (abfsConfiguration.isDynamicWriteThreadPoolEnablement()) {
this.poolSizeManager = WriteThreadPoolSizeManager.getInstance(
getClient().getFileSystem() + "-" + UUID.randomUUID(),
abfsConfiguration);
poolSizeManager.startCPUMonitoring();
this.boundedThreadPool = poolSizeManager.getExecutorService();
} else {
this.boundedThreadPool = BlockingThreadPoolExecutorService.newInstance(
abfsConfiguration.getWriteMaxConcurrentRequestCount(),
abfsConfiguration.getMaxWriteRequestsToQueue(),
10L, TimeUnit.SECONDS,
"abfs-bounded");
}
}

/**
Expand Down Expand Up @@ -321,16 +330,13 @@ public void close() throws IOException {
try {
Futures.allAsList(futures).get();
// shutdown the threadPool and set it to null.
HadoopExecutors.shutdown(boundedThreadPool, LOG,
30, TimeUnit.SECONDS);
boundedThreadPool = null;
} catch (InterruptedException e) {
LOG.error("Interrupted freeing leases", e);
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
LOG.error("Error freeing leases", e);
} finally {
IOUtils.cleanupWithLogger(LOG, getClient());
IOUtils.cleanupWithLogger(LOG, poolSizeManager, getClient());
}
}

Expand Down
Loading