From a36e3a8befb55a781261ae97c8bd5e3d8415d134 Mon Sep 17 00:00:00 2001 From: Daniel Tu Date: Thu, 13 Mar 2025 23:16:58 -0700 Subject: [PATCH 1/3] support bulk deletion in batch file cleanup task --- .../task/BatchFileCleanupTaskHandlerTest.java | 56 ++++++++------- .../task/TableCleanupTaskHandlerTest.java | 10 +-- .../task/BatchFileCleanupTaskHandler.java | 34 ++++++--- .../service/task/FileCleanupTaskHandler.java | 72 ++++++++++++++++++- .../service/task/TableCleanupTaskHandler.java | 2 +- 5 files changed, 131 insertions(+), 43 deletions(-) diff --git a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/BatchFileCleanupTaskHandlerTest.java b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/BatchFileCleanupTaskHandlerTest.java index 662f88bb05..082825fa78 100644 --- a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/BatchFileCleanupTaskHandlerTest.java +++ b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/BatchFileCleanupTaskHandlerTest.java @@ -59,6 +59,8 @@ import org.apache.polaris.service.task.TaskFileIOSupplier; import org.apache.polaris.service.task.TaskUtils; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; @QuarkusTest public class BatchFileCleanupTaskHandlerTest { @@ -191,7 +193,7 @@ public void close() { .withTaskType(AsyncTaskType.BATCH_FILE_CLEANUP) .withData( new BatchFileCleanupTaskHandler.BatchFileCleanupTask( - tableIdentifier, cleanupFiles)) + tableIdentifier, cleanupFiles, BatchFileCleanupTaskHandler.BatchFileType.TABLE_METADATA)) .setName(UUID.randomUUID().toString()) .build(); @@ -242,7 +244,7 @@ public void testMetadataFileCleanupIfFileNotExist() throws IOException { .withTaskType(AsyncTaskType.BATCH_FILE_CLEANUP) .withData( new BatchFileCleanupTaskHandler.BatchFileCleanupTask( - tableIdentifier, List.of(statisticsFile.path()))) + tableIdentifier, List.of(statisticsFile.path()), BatchFileCleanupTaskHandler.BatchFileType.TABLE_METADATA)) .setName(UUID.randomUUID().toString()) .build(); @@ -252,40 +254,45 @@ public void testMetadataFileCleanupIfFileNotExist() throws IOException { } } - @Test - public void testCleanupWithRetries() throws IOException { + @ParameterizedTest + @ValueSource(ints = {1, 2, 3}) + public void testCleanupWithRetries(int retryTime) throws IOException { PolarisCallContext polarisCallContext = new PolarisCallContext( metaStoreManagerFactory.getOrCreateSessionSupplier(realmContext).get(), new PolarisDefaultDiagServiceImpl()); try (CallContext callCtx = CallContext.of(realmContext, polarisCallContext)) { CallContext.setCurrentContext(callCtx); - Map retryCounter = new HashMap<>(); + AtomicInteger batchRetryCounter = new AtomicInteger(0); FileIO fileIO = new InMemoryFileIO() { @Override public void close() { // no-op } - - @Override - public void deleteFile(String location) { - int attempts = - retryCounter - .computeIfAbsent(location, k -> new AtomicInteger(0)) - .incrementAndGet(); - if (attempts < 3) { - throw new RuntimeException("Simulating failure to test retries"); - } else { - super.deleteFile(location); - } - } }; TableIdentifier tableIdentifier = TableIdentifier.of(Namespace.of("db1", "schema1"), "table1"); BatchFileCleanupTaskHandler handler = new BatchFileCleanupTaskHandler( - buildTaskFileIOSupplier(fileIO), Executors.newSingleThreadExecutor()); + buildTaskFileIOSupplier(fileIO), Executors.newSingleThreadExecutor()) { + @Override + public CompletableFuture tryDelete( + TableIdentifier tableId, + FileIO fileIO, + Iterable files, + String type, + Boolean isConcurrent, + Throwable e, + int attempt) { + if (attempt <= retryTime) { + batchRetryCounter.incrementAndGet(); + return tryDelete(tableId, fileIO, files, type, isConcurrent, e, attempt + 1); + } else { + return super.tryDelete(tableId, fileIO, files, type, isConcurrent, e, attempt); + } + } + }; long snapshotId = 100L; ManifestFile manifestFile = TaskTestUtils.manifestFile( @@ -307,7 +314,9 @@ public void deleteFile(String location) { .withTaskType(AsyncTaskType.BATCH_FILE_CLEANUP) .withData( new BatchFileCleanupTaskHandler.BatchFileCleanupTask( - tableIdentifier, List.of(statisticsFile.path()))) + tableIdentifier, + List.of(statisticsFile.path()), + BatchFileCleanupTaskHandler.BatchFileType.TABLE_METADATA)) .setName(UUID.randomUUID().toString()) .build(); @@ -323,12 +332,11 @@ public void deleteFile(String location) { // Wait for all async tasks to finish future.join(); + // Ensure that retries happened as expected + assertThat(batchRetryCounter.get()).isEqualTo(retryTime); + // Check if the file was successfully deleted after retries assertThat(TaskUtils.exists(statisticsFile.path(), fileIO)).isFalse(); - - // Ensure that retries happened as expected - assertThat(retryCounter.containsKey(statisticsFile.path())).isTrue(); - assertThat(retryCounter.get(statisticsFile.path()).get()).isEqualTo(3); } } } diff --git a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/TableCleanupTaskHandlerTest.java b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/TableCleanupTaskHandlerTest.java index 5e39028c92..ee795c1806 100644 --- a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/TableCleanupTaskHandlerTest.java +++ b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/TableCleanupTaskHandlerTest.java @@ -180,7 +180,7 @@ public void testTableCleanup() throws IOException { .returns( new BatchFileCleanupTaskHandler.BatchFileCleanupTask( tableIdentifier, - List.of(snapshot.manifestListLocation(), statisticsFile.path())), + List.of(snapshot.manifestListLocation(), statisticsFile.path()), BatchFileCleanupTaskHandler.BatchFileType.TABLE_METADATA), entity -> entity.readData( BatchFileCleanupTaskHandler.BatchFileCleanupTask.class))); @@ -307,7 +307,7 @@ public void close() { .returns(AsyncTaskType.BATCH_FILE_CLEANUP, TaskEntity::getTaskType) .returns( new BatchFileCleanupTaskHandler.BatchFileCleanupTask( - tableIdentifier, List.of(snapshot.manifestListLocation())), + tableIdentifier, List.of(snapshot.manifestListLocation()), BatchFileCleanupTaskHandler.BatchFileType.TABLE_METADATA), entity -> entity.readData( BatchFileCleanupTaskHandler.BatchFileCleanupTask.class)), @@ -318,7 +318,7 @@ public void close() { .returns(AsyncTaskType.BATCH_FILE_CLEANUP, TaskEntity::getTaskType) .returns( new BatchFileCleanupTaskHandler.BatchFileCleanupTask( - tableIdentifier, List.of(snapshot.manifestListLocation())), + tableIdentifier, List.of(snapshot.manifestListLocation()), BatchFileCleanupTaskHandler.BatchFileType.TABLE_METADATA), entity -> entity.readData( BatchFileCleanupTaskHandler.BatchFileCleanupTask.class)), @@ -450,7 +450,7 @@ public void testTableCleanupMultipleSnapshots() throws IOException { snapshot.manifestListLocation(), snapshot2.manifestListLocation(), statisticsFile1.path(), - statisticsFile2.path())), + statisticsFile2.path()), BatchFileCleanupTaskHandler.BatchFileType.TABLE_METADATA), entity -> entity.readData( BatchFileCleanupTaskHandler.BatchFileCleanupTask.class))); @@ -627,7 +627,7 @@ public void testTableCleanupMultipleMetadata() throws IOException { statisticsFile1.path(), statisticsFile2.path(), partitionStatisticsFile1.path(), - partitionStatisticsFile2.path())), + partitionStatisticsFile2.path()), BatchFileCleanupTaskHandler.BatchFileType.TABLE_METADATA), entity -> entity.readData( BatchFileCleanupTaskHandler.BatchFileCleanupTask.class))); diff --git a/service/common/src/main/java/org/apache/polaris/service/task/BatchFileCleanupTaskHandler.java b/service/common/src/main/java/org/apache/polaris/service/task/BatchFileCleanupTaskHandler.java index d47725351f..35a7469fc0 100644 --- a/service/common/src/main/java/org/apache/polaris/service/task/BatchFileCleanupTaskHandler.java +++ b/service/common/src/main/java/org/apache/polaris/service/task/BatchFileCleanupTaskHandler.java @@ -77,17 +77,11 @@ public boolean handleTask(TaskEntity task, CallContext callContext) { missingFiles.size()); } - // Schedule the deletion for each file asynchronously - List> deleteFutures = - validFiles.stream() - .map(file -> super.tryDelete(tableId, authorizedFileIO, null, file, null, 1)) - .toList(); + CompletableFuture deleteFutures = + tryDelete(tableId, authorizedFileIO, validFiles, cleanupTask.type().getValue(), true, null, 1); try { - // Wait for all delete operations to finish - CompletableFuture allDeletes = - CompletableFuture.allOf(deleteFutures.toArray(new CompletableFuture[0])); - allDeletes.join(); + deleteFutures.join(); } catch (Exception e) { LOGGER.error("Exception detected during batch files deletion", e); return false; @@ -97,5 +91,25 @@ public boolean handleTask(TaskEntity task, CallContext callContext) { } } - public record BatchFileCleanupTask(TableIdentifier tableId, List batchFiles) {} + public enum BatchFileType { + TABLE_METADATA("table_metadata"); + + private final String value; + + BatchFileType(String value) { + this.value = value; + } + + public String getValue() { + return value; + } + + @Override + public String toString() { + return value; + } + } + + public record BatchFileCleanupTask( + TableIdentifier tableId, List batchFiles, BatchFileType type) {} } diff --git a/service/common/src/main/java/org/apache/polaris/service/task/FileCleanupTaskHandler.java b/service/common/src/main/java/org/apache/polaris/service/task/FileCleanupTaskHandler.java index aa0b0d9f1e..a16adf85a3 100644 --- a/service/common/src/main/java/org/apache/polaris/service/task/FileCleanupTaskHandler.java +++ b/service/common/src/main/java/org/apache/polaris/service/task/FileCleanupTaskHandler.java @@ -21,6 +21,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; +import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.io.FileIO; import org.apache.polaris.core.context.CallContext; @@ -29,9 +30,12 @@ import org.slf4j.LoggerFactory; /** - * {@link FileCleanupTaskHandler} responsible for cleaning up files in table tasks. Handles retries - * for file deletions and skips files that are already missing. Subclasses must implement - * task-specific logic. + * Abstract base class for handling file cleanup tasks within Apache Polaris. + * + *

This class is for performing asynchronous file deletions with retry logic. + * + *

Subclasses must implement {@link #canHandleTask(TaskEntity)} and {@link + * #handleTask(TaskEntity, CallContext)} to define task-specific handling logic. */ public abstract class FileCleanupTaskHandler implements TaskHandler { @@ -53,6 +57,19 @@ public FileCleanupTaskHandler( @Override public abstract boolean handleTask(TaskEntity task, CallContext callContext); + /** + * Attempts to delete a single file with retry logic. If the file does not exist, it logs a + * message and does not retry. If an error occurs, it retries up to {@link #MAX_ATTEMPTS} times + * before failing. + * + * @param tableId The identifier of the table associated with the file. + * @param fileIO The {@link FileIO} instance used for file operations. + * @param baseFile An optional base file associated with the file being deleted (can be null). + * @param file The path of the file to be deleted. + * @param e The exception from the previous attempt, if any. + * @param attempt The current retry attempt count. + * @return A {@link CompletableFuture} representing the asynchronous deletion operation. + */ public CompletableFuture tryDelete( TableIdentifier tableId, FileIO fileIO, @@ -103,4 +120,53 @@ public CompletableFuture tryDelete( CompletableFuture.delayedExecutor( FILE_DELETION_RETRY_MILLIS, TimeUnit.MILLISECONDS, executorService)); } + + /** + * Attempts to delete multiple files in a batch operation with retry logic. If an error occurs, it + * retries up to {@link #MAX_ATTEMPTS} times before failing. + * + * @param tableId The identifier of the table associated with the files. + * @param fileIO The {@link FileIO} instance used for file operations. + * @param files The list of file paths to be deleted. + * @param type The type of files being deleted (e.g., data files, metadata files). + * @param isConcurrent Whether the deletion should be performed concurrently. + * @param e The exception from the previous attempt, if any. + * @param attempt The current retry attempt count. + * @return A {@link CompletableFuture} representing the asynchronous batch deletion operation. + */ + public CompletableFuture tryDelete( + TableIdentifier tableId, + FileIO fileIO, + Iterable files, + String type, + Boolean isConcurrent, + Throwable e, + int attempt) { + if (e != null && attempt <= MAX_ATTEMPTS) { + LOGGER + .atWarn() + .addKeyValue("files", files) + .addKeyValue("attempt", attempt) + .addKeyValue("error", e.getMessage()) + .addKeyValue("type", type) + .log("Error encountered attempting to delete files"); + } + if (attempt > MAX_ATTEMPTS && e != null) { + return CompletableFuture.failedFuture(e); + } + return CompletableFuture.runAsync( + () -> CatalogUtil.deleteFiles(fileIO, files, type, isConcurrent), executorService) + .exceptionallyComposeAsync( + newEx -> { + LOGGER + .atWarn() + .addKeyValue("files", files) + .addKeyValue("tableIdentifier", tableId) + .addKeyValue("type", type) + .log("Exception caught deleting data files", newEx); + return tryDelete(tableId, fileIO, files, type, isConcurrent, newEx, attempt + 1); + }, + CompletableFuture.delayedExecutor( + FILE_DELETION_RETRY_MILLIS, TimeUnit.MILLISECONDS, executorService)); + } } diff --git a/service/common/src/main/java/org/apache/polaris/service/task/TableCleanupTaskHandler.java b/service/common/src/main/java/org/apache/polaris/service/task/TableCleanupTaskHandler.java index ff791bf188..3afcb0e74f 100644 --- a/service/common/src/main/java/org/apache/polaris/service/task/TableCleanupTaskHandler.java +++ b/service/common/src/main/java/org/apache/polaris/service/task/TableCleanupTaskHandler.java @@ -234,7 +234,7 @@ private Stream getMetadataTaskStream( .withTaskType(AsyncTaskType.BATCH_FILE_CLEANUP) .withData( new BatchFileCleanupTaskHandler.BatchFileCleanupTask( - tableEntity.getTableIdentifier(), metadataBatch)) + tableEntity.getTableIdentifier(), metadataBatch, BatchFileCleanupTaskHandler.BatchFileType.TABLE_METADATA)) .setInternalProperties(cleanupTask.getInternalPropertiesAsMap()) .build(); }); From b377d1863243b15c7c39c2c75c84bd96d4aad149 Mon Sep 17 00:00:00 2001 From: Daniel Tu Date: Wed, 19 Mar 2025 00:22:52 -0700 Subject: [PATCH 2/3] format with spotless apply --- .../task/BatchFileCleanupTaskHandlerTest.java | 10 +++++++--- .../quarkus/task/TableCleanupTaskHandlerTest.java | 14 ++++++++++---- .../service/task/BatchFileCleanupTaskHandler.java | 3 ++- .../service/task/TableCleanupTaskHandler.java | 4 +++- 4 files changed, 22 insertions(+), 9 deletions(-) diff --git a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/BatchFileCleanupTaskHandlerTest.java b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/BatchFileCleanupTaskHandlerTest.java index 082825fa78..cb98ef808c 100644 --- a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/BatchFileCleanupTaskHandlerTest.java +++ b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/BatchFileCleanupTaskHandlerTest.java @@ -193,7 +193,9 @@ public void close() { .withTaskType(AsyncTaskType.BATCH_FILE_CLEANUP) .withData( new BatchFileCleanupTaskHandler.BatchFileCleanupTask( - tableIdentifier, cleanupFiles, BatchFileCleanupTaskHandler.BatchFileType.TABLE_METADATA)) + tableIdentifier, + cleanupFiles, + BatchFileCleanupTaskHandler.BatchFileType.TABLE_METADATA)) .setName(UUID.randomUUID().toString()) .build(); @@ -244,7 +246,9 @@ public void testMetadataFileCleanupIfFileNotExist() throws IOException { .withTaskType(AsyncTaskType.BATCH_FILE_CLEANUP) .withData( new BatchFileCleanupTaskHandler.BatchFileCleanupTask( - tableIdentifier, List.of(statisticsFile.path()), BatchFileCleanupTaskHandler.BatchFileType.TABLE_METADATA)) + tableIdentifier, + List.of(statisticsFile.path()), + BatchFileCleanupTaskHandler.BatchFileType.TABLE_METADATA)) .setName(UUID.randomUUID().toString()) .build(); @@ -316,7 +320,7 @@ public CompletableFuture tryDelete( new BatchFileCleanupTaskHandler.BatchFileCleanupTask( tableIdentifier, List.of(statisticsFile.path()), - BatchFileCleanupTaskHandler.BatchFileType.TABLE_METADATA)) + BatchFileCleanupTaskHandler.BatchFileType.TABLE_METADATA)) .setName(UUID.randomUUID().toString()) .build(); diff --git a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/TableCleanupTaskHandlerTest.java b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/TableCleanupTaskHandlerTest.java index ee795c1806..a6fdd6c6d1 100644 --- a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/TableCleanupTaskHandlerTest.java +++ b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/TableCleanupTaskHandlerTest.java @@ -180,7 +180,8 @@ public void testTableCleanup() throws IOException { .returns( new BatchFileCleanupTaskHandler.BatchFileCleanupTask( tableIdentifier, - List.of(snapshot.manifestListLocation(), statisticsFile.path()), BatchFileCleanupTaskHandler.BatchFileType.TABLE_METADATA), + List.of(snapshot.manifestListLocation(), statisticsFile.path()), + BatchFileCleanupTaskHandler.BatchFileType.TABLE_METADATA), entity -> entity.readData( BatchFileCleanupTaskHandler.BatchFileCleanupTask.class))); @@ -307,7 +308,9 @@ public void close() { .returns(AsyncTaskType.BATCH_FILE_CLEANUP, TaskEntity::getTaskType) .returns( new BatchFileCleanupTaskHandler.BatchFileCleanupTask( - tableIdentifier, List.of(snapshot.manifestListLocation()), BatchFileCleanupTaskHandler.BatchFileType.TABLE_METADATA), + tableIdentifier, + List.of(snapshot.manifestListLocation()), + BatchFileCleanupTaskHandler.BatchFileType.TABLE_METADATA), entity -> entity.readData( BatchFileCleanupTaskHandler.BatchFileCleanupTask.class)), @@ -318,7 +321,9 @@ public void close() { .returns(AsyncTaskType.BATCH_FILE_CLEANUP, TaskEntity::getTaskType) .returns( new BatchFileCleanupTaskHandler.BatchFileCleanupTask( - tableIdentifier, List.of(snapshot.manifestListLocation()), BatchFileCleanupTaskHandler.BatchFileType.TABLE_METADATA), + tableIdentifier, + List.of(snapshot.manifestListLocation()), + BatchFileCleanupTaskHandler.BatchFileType.TABLE_METADATA), entity -> entity.readData( BatchFileCleanupTaskHandler.BatchFileCleanupTask.class)), @@ -450,7 +455,8 @@ public void testTableCleanupMultipleSnapshots() throws IOException { snapshot.manifestListLocation(), snapshot2.manifestListLocation(), statisticsFile1.path(), - statisticsFile2.path()), BatchFileCleanupTaskHandler.BatchFileType.TABLE_METADATA), + statisticsFile2.path()), + BatchFileCleanupTaskHandler.BatchFileType.TABLE_METADATA), entity -> entity.readData( BatchFileCleanupTaskHandler.BatchFileCleanupTask.class))); diff --git a/service/common/src/main/java/org/apache/polaris/service/task/BatchFileCleanupTaskHandler.java b/service/common/src/main/java/org/apache/polaris/service/task/BatchFileCleanupTaskHandler.java index 35a7469fc0..7c5c28cd32 100644 --- a/service/common/src/main/java/org/apache/polaris/service/task/BatchFileCleanupTaskHandler.java +++ b/service/common/src/main/java/org/apache/polaris/service/task/BatchFileCleanupTaskHandler.java @@ -78,7 +78,8 @@ public boolean handleTask(TaskEntity task, CallContext callContext) { } CompletableFuture deleteFutures = - tryDelete(tableId, authorizedFileIO, validFiles, cleanupTask.type().getValue(), true, null, 1); + tryDelete( + tableId, authorizedFileIO, validFiles, cleanupTask.type().getValue(), true, null, 1); try { deleteFutures.join(); diff --git a/service/common/src/main/java/org/apache/polaris/service/task/TableCleanupTaskHandler.java b/service/common/src/main/java/org/apache/polaris/service/task/TableCleanupTaskHandler.java index 3afcb0e74f..014c0752bf 100644 --- a/service/common/src/main/java/org/apache/polaris/service/task/TableCleanupTaskHandler.java +++ b/service/common/src/main/java/org/apache/polaris/service/task/TableCleanupTaskHandler.java @@ -234,7 +234,9 @@ private Stream getMetadataTaskStream( .withTaskType(AsyncTaskType.BATCH_FILE_CLEANUP) .withData( new BatchFileCleanupTaskHandler.BatchFileCleanupTask( - tableEntity.getTableIdentifier(), metadataBatch, BatchFileCleanupTaskHandler.BatchFileType.TABLE_METADATA)) + tableEntity.getTableIdentifier(), + metadataBatch, + BatchFileCleanupTaskHandler.BatchFileType.TABLE_METADATA)) .setInternalProperties(cleanupTask.getInternalPropertiesAsMap()) .build(); }); From ac8e321f57d16e600bb202784c43688ab478d1e8 Mon Sep 17 00:00:00 2001 From: Daniel Tu Date: Thu, 1 May 2025 22:37:10 -0700 Subject: [PATCH 3/3] format with spotless apply --- .../quarkus/task/BatchFileCleanupTaskHandlerTest.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/BatchFileCleanupTaskHandlerTest.java b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/BatchFileCleanupTaskHandlerTest.java index cb98ef808c..2a85d32df5 100644 --- a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/BatchFileCleanupTaskHandlerTest.java +++ b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/BatchFileCleanupTaskHandlerTest.java @@ -260,7 +260,7 @@ public void testMetadataFileCleanupIfFileNotExist() throws IOException { @ParameterizedTest @ValueSource(ints = {1, 2, 3}) - public void testCleanupWithRetries(int retryTime) throws IOException { + public void testCleanupWithRetries(int maxRetries) throws IOException { PolarisCallContext polarisCallContext = new PolarisCallContext( metaStoreManagerFactory.getOrCreateSessionSupplier(realmContext).get(), @@ -289,7 +289,7 @@ public CompletableFuture tryDelete( Boolean isConcurrent, Throwable e, int attempt) { - if (attempt <= retryTime) { + if (attempt <= maxRetries) { batchRetryCounter.incrementAndGet(); return tryDelete(tableId, fileIO, files, type, isConcurrent, e, attempt + 1); } else { @@ -337,7 +337,7 @@ public CompletableFuture tryDelete( future.join(); // Ensure that retries happened as expected - assertThat(batchRetryCounter.get()).isEqualTo(retryTime); + assertThat(batchRetryCounter.get()).isEqualTo(maxRetries); // Check if the file was successfully deleted after retries assertThat(TaskUtils.exists(statisticsFile.path(), fileIO)).isFalse();