diff --git a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/BatchFileCleanupTaskHandlerTest.java b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/BatchFileCleanupTaskHandlerTest.java index 662f88bb05..2a85d32df5 100644 --- a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/BatchFileCleanupTaskHandlerTest.java +++ b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/BatchFileCleanupTaskHandlerTest.java @@ -59,6 +59,8 @@ import org.apache.polaris.service.task.TaskFileIOSupplier; import org.apache.polaris.service.task.TaskUtils; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; @QuarkusTest public class BatchFileCleanupTaskHandlerTest { @@ -191,7 +193,9 @@ public void close() { .withTaskType(AsyncTaskType.BATCH_FILE_CLEANUP) .withData( new BatchFileCleanupTaskHandler.BatchFileCleanupTask( - tableIdentifier, cleanupFiles)) + tableIdentifier, + cleanupFiles, + BatchFileCleanupTaskHandler.BatchFileType.TABLE_METADATA)) .setName(UUID.randomUUID().toString()) .build(); @@ -242,7 +246,9 @@ public void testMetadataFileCleanupIfFileNotExist() throws IOException { .withTaskType(AsyncTaskType.BATCH_FILE_CLEANUP) .withData( new BatchFileCleanupTaskHandler.BatchFileCleanupTask( - tableIdentifier, List.of(statisticsFile.path()))) + tableIdentifier, + List.of(statisticsFile.path()), + BatchFileCleanupTaskHandler.BatchFileType.TABLE_METADATA)) .setName(UUID.randomUUID().toString()) .build(); @@ -252,40 +258,45 @@ public void testMetadataFileCleanupIfFileNotExist() throws IOException { } } - @Test - public void testCleanupWithRetries() throws IOException { + @ParameterizedTest + @ValueSource(ints = {1, 2, 3}) + public void testCleanupWithRetries(int maxRetries) throws IOException { PolarisCallContext polarisCallContext = new PolarisCallContext( metaStoreManagerFactory.getOrCreateSessionSupplier(realmContext).get(), new PolarisDefaultDiagServiceImpl()); try (CallContext callCtx = CallContext.of(realmContext, polarisCallContext)) { CallContext.setCurrentContext(callCtx); - Map retryCounter = new HashMap<>(); + AtomicInteger batchRetryCounter = new AtomicInteger(0); FileIO fileIO = new InMemoryFileIO() { @Override public void close() { // no-op } - - @Override - public void deleteFile(String location) { - int attempts = - retryCounter - .computeIfAbsent(location, k -> new AtomicInteger(0)) - .incrementAndGet(); - if (attempts < 3) { - throw new RuntimeException("Simulating failure to test retries"); - } else { - super.deleteFile(location); - } - } }; TableIdentifier tableIdentifier = TableIdentifier.of(Namespace.of("db1", "schema1"), "table1"); BatchFileCleanupTaskHandler handler = new BatchFileCleanupTaskHandler( - buildTaskFileIOSupplier(fileIO), Executors.newSingleThreadExecutor()); + buildTaskFileIOSupplier(fileIO), Executors.newSingleThreadExecutor()) { + @Override + public CompletableFuture tryDelete( + TableIdentifier tableId, + FileIO fileIO, + Iterable files, + String type, + Boolean isConcurrent, + Throwable e, + int attempt) { + if (attempt <= maxRetries) { + batchRetryCounter.incrementAndGet(); + return tryDelete(tableId, fileIO, files, type, isConcurrent, e, attempt + 1); + } else { + return super.tryDelete(tableId, fileIO, files, type, isConcurrent, e, attempt); + } + } + }; long snapshotId = 100L; ManifestFile manifestFile = TaskTestUtils.manifestFile( @@ -307,7 +318,9 @@ public void deleteFile(String location) { .withTaskType(AsyncTaskType.BATCH_FILE_CLEANUP) .withData( new BatchFileCleanupTaskHandler.BatchFileCleanupTask( - tableIdentifier, List.of(statisticsFile.path()))) + tableIdentifier, + List.of(statisticsFile.path()), + BatchFileCleanupTaskHandler.BatchFileType.TABLE_METADATA)) .setName(UUID.randomUUID().toString()) .build(); @@ -323,12 +336,11 @@ public void deleteFile(String location) { // Wait for all async tasks to finish future.join(); + // Ensure that retries happened as expected + assertThat(batchRetryCounter.get()).isEqualTo(maxRetries); + // Check if the file was successfully deleted after retries assertThat(TaskUtils.exists(statisticsFile.path(), fileIO)).isFalse(); - - // Ensure that retries happened as expected - assertThat(retryCounter.containsKey(statisticsFile.path())).isTrue(); - assertThat(retryCounter.get(statisticsFile.path()).get()).isEqualTo(3); } } } diff --git a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/TableCleanupTaskHandlerTest.java b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/TableCleanupTaskHandlerTest.java index 5e39028c92..a6fdd6c6d1 100644 --- a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/TableCleanupTaskHandlerTest.java +++ b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/TableCleanupTaskHandlerTest.java @@ -180,7 +180,8 @@ public void testTableCleanup() throws IOException { .returns( new BatchFileCleanupTaskHandler.BatchFileCleanupTask( tableIdentifier, - List.of(snapshot.manifestListLocation(), statisticsFile.path())), + List.of(snapshot.manifestListLocation(), statisticsFile.path()), + BatchFileCleanupTaskHandler.BatchFileType.TABLE_METADATA), entity -> entity.readData( BatchFileCleanupTaskHandler.BatchFileCleanupTask.class))); @@ -307,7 +308,9 @@ public void close() { .returns(AsyncTaskType.BATCH_FILE_CLEANUP, TaskEntity::getTaskType) .returns( new BatchFileCleanupTaskHandler.BatchFileCleanupTask( - tableIdentifier, List.of(snapshot.manifestListLocation())), + tableIdentifier, + List.of(snapshot.manifestListLocation()), + BatchFileCleanupTaskHandler.BatchFileType.TABLE_METADATA), entity -> entity.readData( BatchFileCleanupTaskHandler.BatchFileCleanupTask.class)), @@ -318,7 +321,9 @@ public void close() { .returns(AsyncTaskType.BATCH_FILE_CLEANUP, TaskEntity::getTaskType) .returns( new BatchFileCleanupTaskHandler.BatchFileCleanupTask( - tableIdentifier, List.of(snapshot.manifestListLocation())), + tableIdentifier, + List.of(snapshot.manifestListLocation()), + BatchFileCleanupTaskHandler.BatchFileType.TABLE_METADATA), entity -> entity.readData( BatchFileCleanupTaskHandler.BatchFileCleanupTask.class)), @@ -450,7 +455,8 @@ public void testTableCleanupMultipleSnapshots() throws IOException { snapshot.manifestListLocation(), snapshot2.manifestListLocation(), statisticsFile1.path(), - statisticsFile2.path())), + statisticsFile2.path()), + BatchFileCleanupTaskHandler.BatchFileType.TABLE_METADATA), entity -> entity.readData( BatchFileCleanupTaskHandler.BatchFileCleanupTask.class))); @@ -627,7 +633,7 @@ public void testTableCleanupMultipleMetadata() throws IOException { statisticsFile1.path(), statisticsFile2.path(), partitionStatisticsFile1.path(), - partitionStatisticsFile2.path())), + partitionStatisticsFile2.path()), BatchFileCleanupTaskHandler.BatchFileType.TABLE_METADATA), entity -> entity.readData( BatchFileCleanupTaskHandler.BatchFileCleanupTask.class))); diff --git a/service/common/src/main/java/org/apache/polaris/service/task/BatchFileCleanupTaskHandler.java b/service/common/src/main/java/org/apache/polaris/service/task/BatchFileCleanupTaskHandler.java index d47725351f..7c5c28cd32 100644 --- a/service/common/src/main/java/org/apache/polaris/service/task/BatchFileCleanupTaskHandler.java +++ b/service/common/src/main/java/org/apache/polaris/service/task/BatchFileCleanupTaskHandler.java @@ -77,17 +77,12 @@ public boolean handleTask(TaskEntity task, CallContext callContext) { missingFiles.size()); } - // Schedule the deletion for each file asynchronously - List> deleteFutures = - validFiles.stream() - .map(file -> super.tryDelete(tableId, authorizedFileIO, null, file, null, 1)) - .toList(); + CompletableFuture deleteFutures = + tryDelete( + tableId, authorizedFileIO, validFiles, cleanupTask.type().getValue(), true, null, 1); try { - // Wait for all delete operations to finish - CompletableFuture allDeletes = - CompletableFuture.allOf(deleteFutures.toArray(new CompletableFuture[0])); - allDeletes.join(); + deleteFutures.join(); } catch (Exception e) { LOGGER.error("Exception detected during batch files deletion", e); return false; @@ -97,5 +92,25 @@ public boolean handleTask(TaskEntity task, CallContext callContext) { } } - public record BatchFileCleanupTask(TableIdentifier tableId, List batchFiles) {} + public enum BatchFileType { + TABLE_METADATA("table_metadata"); + + private final String value; + + BatchFileType(String value) { + this.value = value; + } + + public String getValue() { + return value; + } + + @Override + public String toString() { + return value; + } + } + + public record BatchFileCleanupTask( + TableIdentifier tableId, List batchFiles, BatchFileType type) {} } diff --git a/service/common/src/main/java/org/apache/polaris/service/task/FileCleanupTaskHandler.java b/service/common/src/main/java/org/apache/polaris/service/task/FileCleanupTaskHandler.java index aa0b0d9f1e..a16adf85a3 100644 --- a/service/common/src/main/java/org/apache/polaris/service/task/FileCleanupTaskHandler.java +++ b/service/common/src/main/java/org/apache/polaris/service/task/FileCleanupTaskHandler.java @@ -21,6 +21,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; +import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.io.FileIO; import org.apache.polaris.core.context.CallContext; @@ -29,9 +30,12 @@ import org.slf4j.LoggerFactory; /** - * {@link FileCleanupTaskHandler} responsible for cleaning up files in table tasks. Handles retries - * for file deletions and skips files that are already missing. Subclasses must implement - * task-specific logic. + * Abstract base class for handling file cleanup tasks within Apache Polaris. + * + *

This class is for performing asynchronous file deletions with retry logic. + * + *

Subclasses must implement {@link #canHandleTask(TaskEntity)} and {@link + * #handleTask(TaskEntity, CallContext)} to define task-specific handling logic. */ public abstract class FileCleanupTaskHandler implements TaskHandler { @@ -53,6 +57,19 @@ public FileCleanupTaskHandler( @Override public abstract boolean handleTask(TaskEntity task, CallContext callContext); + /** + * Attempts to delete a single file with retry logic. If the file does not exist, it logs a + * message and does not retry. If an error occurs, it retries up to {@link #MAX_ATTEMPTS} times + * before failing. + * + * @param tableId The identifier of the table associated with the file. + * @param fileIO The {@link FileIO} instance used for file operations. + * @param baseFile An optional base file associated with the file being deleted (can be null). + * @param file The path of the file to be deleted. + * @param e The exception from the previous attempt, if any. + * @param attempt The current retry attempt count. + * @return A {@link CompletableFuture} representing the asynchronous deletion operation. + */ public CompletableFuture tryDelete( TableIdentifier tableId, FileIO fileIO, @@ -103,4 +120,53 @@ public CompletableFuture tryDelete( CompletableFuture.delayedExecutor( FILE_DELETION_RETRY_MILLIS, TimeUnit.MILLISECONDS, executorService)); } + + /** + * Attempts to delete multiple files in a batch operation with retry logic. If an error occurs, it + * retries up to {@link #MAX_ATTEMPTS} times before failing. + * + * @param tableId The identifier of the table associated with the files. + * @param fileIO The {@link FileIO} instance used for file operations. + * @param files The list of file paths to be deleted. + * @param type The type of files being deleted (e.g., data files, metadata files). + * @param isConcurrent Whether the deletion should be performed concurrently. + * @param e The exception from the previous attempt, if any. + * @param attempt The current retry attempt count. + * @return A {@link CompletableFuture} representing the asynchronous batch deletion operation. + */ + public CompletableFuture tryDelete( + TableIdentifier tableId, + FileIO fileIO, + Iterable files, + String type, + Boolean isConcurrent, + Throwable e, + int attempt) { + if (e != null && attempt <= MAX_ATTEMPTS) { + LOGGER + .atWarn() + .addKeyValue("files", files) + .addKeyValue("attempt", attempt) + .addKeyValue("error", e.getMessage()) + .addKeyValue("type", type) + .log("Error encountered attempting to delete files"); + } + if (attempt > MAX_ATTEMPTS && e != null) { + return CompletableFuture.failedFuture(e); + } + return CompletableFuture.runAsync( + () -> CatalogUtil.deleteFiles(fileIO, files, type, isConcurrent), executorService) + .exceptionallyComposeAsync( + newEx -> { + LOGGER + .atWarn() + .addKeyValue("files", files) + .addKeyValue("tableIdentifier", tableId) + .addKeyValue("type", type) + .log("Exception caught deleting data files", newEx); + return tryDelete(tableId, fileIO, files, type, isConcurrent, newEx, attempt + 1); + }, + CompletableFuture.delayedExecutor( + FILE_DELETION_RETRY_MILLIS, TimeUnit.MILLISECONDS, executorService)); + } } diff --git a/service/common/src/main/java/org/apache/polaris/service/task/TableCleanupTaskHandler.java b/service/common/src/main/java/org/apache/polaris/service/task/TableCleanupTaskHandler.java index ff791bf188..014c0752bf 100644 --- a/service/common/src/main/java/org/apache/polaris/service/task/TableCleanupTaskHandler.java +++ b/service/common/src/main/java/org/apache/polaris/service/task/TableCleanupTaskHandler.java @@ -234,7 +234,9 @@ private Stream getMetadataTaskStream( .withTaskType(AsyncTaskType.BATCH_FILE_CLEANUP) .withData( new BatchFileCleanupTaskHandler.BatchFileCleanupTask( - tableEntity.getTableIdentifier(), metadataBatch)) + tableEntity.getTableIdentifier(), + metadataBatch, + BatchFileCleanupTaskHandler.BatchFileType.TABLE_METADATA)) .setInternalProperties(cleanupTask.getInternalPropertiesAsMap()) .build(); });