Skip to content

Support bulk deletion in batch file cleanup task #1179

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@
import org.apache.polaris.service.task.TaskFileIOSupplier;
import org.apache.polaris.service.task.TaskUtils;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

@QuarkusTest
public class BatchFileCleanupTaskHandlerTest {
Expand Down Expand Up @@ -191,7 +193,9 @@ public void close() {
.withTaskType(AsyncTaskType.BATCH_FILE_CLEANUP)
.withData(
new BatchFileCleanupTaskHandler.BatchFileCleanupTask(
tableIdentifier, cleanupFiles))
tableIdentifier,
cleanupFiles,
BatchFileCleanupTaskHandler.BatchFileType.TABLE_METADATA))
.setName(UUID.randomUUID().toString())
.build();

Expand Down Expand Up @@ -242,7 +246,9 @@ public void testMetadataFileCleanupIfFileNotExist() throws IOException {
.withTaskType(AsyncTaskType.BATCH_FILE_CLEANUP)
.withData(
new BatchFileCleanupTaskHandler.BatchFileCleanupTask(
tableIdentifier, List.of(statisticsFile.path())))
tableIdentifier,
List.of(statisticsFile.path()),
BatchFileCleanupTaskHandler.BatchFileType.TABLE_METADATA))
.setName(UUID.randomUUID().toString())
.build();

Expand All @@ -252,40 +258,45 @@ public void testMetadataFileCleanupIfFileNotExist() throws IOException {
}
}

@Test
public void testCleanupWithRetries() throws IOException {
@ParameterizedTest
@ValueSource(ints = {1, 2, 3})
public void testCleanupWithRetries(int maxRetries) throws IOException {
PolarisCallContext polarisCallContext =
new PolarisCallContext(
metaStoreManagerFactory.getOrCreateSessionSupplier(realmContext).get(),
new PolarisDefaultDiagServiceImpl());
try (CallContext callCtx = CallContext.of(realmContext, polarisCallContext)) {
CallContext.setCurrentContext(callCtx);
Map<String, AtomicInteger> retryCounter = new HashMap<>();
AtomicInteger batchRetryCounter = new AtomicInteger(0);
FileIO fileIO =
new InMemoryFileIO() {
@Override
public void close() {
// no-op
}

@Override
public void deleteFile(String location) {
int attempts =
retryCounter
.computeIfAbsent(location, k -> new AtomicInteger(0))
.incrementAndGet();
if (attempts < 3) {
throw new RuntimeException("Simulating failure to test retries");
} else {
super.deleteFile(location);
}
}
};
TableIdentifier tableIdentifier =
TableIdentifier.of(Namespace.of("db1", "schema1"), "table1");
BatchFileCleanupTaskHandler handler =
new BatchFileCleanupTaskHandler(
buildTaskFileIOSupplier(fileIO), Executors.newSingleThreadExecutor());
buildTaskFileIOSupplier(fileIO), Executors.newSingleThreadExecutor()) {
@Override
public CompletableFuture<Void> tryDelete(
TableIdentifier tableId,
FileIO fileIO,
Iterable<String> files,
String type,
Boolean isConcurrent,
Throwable e,
int attempt) {
if (attempt <= maxRetries) {
batchRetryCounter.incrementAndGet();
return tryDelete(tableId, fileIO, files, type, isConcurrent, e, attempt + 1);
} else {
return super.tryDelete(tableId, fileIO, files, type, isConcurrent, e, attempt);
}
}
};
long snapshotId = 100L;
ManifestFile manifestFile =
TaskTestUtils.manifestFile(
Expand All @@ -307,7 +318,9 @@ public void deleteFile(String location) {
.withTaskType(AsyncTaskType.BATCH_FILE_CLEANUP)
.withData(
new BatchFileCleanupTaskHandler.BatchFileCleanupTask(
tableIdentifier, List.of(statisticsFile.path())))
tableIdentifier,
List.of(statisticsFile.path()),
BatchFileCleanupTaskHandler.BatchFileType.TABLE_METADATA))
.setName(UUID.randomUUID().toString())
.build();

Expand All @@ -323,12 +336,11 @@ public void deleteFile(String location) {
// Wait for all async tasks to finish
future.join();

// Ensure that retries happened as expected
assertThat(batchRetryCounter.get()).isEqualTo(maxRetries);

// Check if the file was successfully deleted after retries
assertThat(TaskUtils.exists(statisticsFile.path(), fileIO)).isFalse();

// Ensure that retries happened as expected
assertThat(retryCounter.containsKey(statisticsFile.path())).isTrue();
assertThat(retryCounter.get(statisticsFile.path()).get()).isEqualTo(3);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,8 @@ public void testTableCleanup() throws IOException {
.returns(
new BatchFileCleanupTaskHandler.BatchFileCleanupTask(
tableIdentifier,
List.of(snapshot.manifestListLocation(), statisticsFile.path())),
List.of(snapshot.manifestListLocation(), statisticsFile.path()),
BatchFileCleanupTaskHandler.BatchFileType.TABLE_METADATA),
entity ->
entity.readData(
BatchFileCleanupTaskHandler.BatchFileCleanupTask.class)));
Expand Down Expand Up @@ -307,7 +308,9 @@ public void close() {
.returns(AsyncTaskType.BATCH_FILE_CLEANUP, TaskEntity::getTaskType)
.returns(
new BatchFileCleanupTaskHandler.BatchFileCleanupTask(
tableIdentifier, List.of(snapshot.manifestListLocation())),
tableIdentifier,
List.of(snapshot.manifestListLocation()),
BatchFileCleanupTaskHandler.BatchFileType.TABLE_METADATA),
entity ->
entity.readData(
BatchFileCleanupTaskHandler.BatchFileCleanupTask.class)),
Expand All @@ -318,7 +321,9 @@ public void close() {
.returns(AsyncTaskType.BATCH_FILE_CLEANUP, TaskEntity::getTaskType)
.returns(
new BatchFileCleanupTaskHandler.BatchFileCleanupTask(
tableIdentifier, List.of(snapshot.manifestListLocation())),
tableIdentifier,
List.of(snapshot.manifestListLocation()),
BatchFileCleanupTaskHandler.BatchFileType.TABLE_METADATA),
entity ->
entity.readData(
BatchFileCleanupTaskHandler.BatchFileCleanupTask.class)),
Expand Down Expand Up @@ -450,7 +455,8 @@ public void testTableCleanupMultipleSnapshots() throws IOException {
snapshot.manifestListLocation(),
snapshot2.manifestListLocation(),
statisticsFile1.path(),
statisticsFile2.path())),
statisticsFile2.path()),
BatchFileCleanupTaskHandler.BatchFileType.TABLE_METADATA),
entity ->
entity.readData(
BatchFileCleanupTaskHandler.BatchFileCleanupTask.class)));
Expand Down Expand Up @@ -627,7 +633,7 @@ public void testTableCleanupMultipleMetadata() throws IOException {
statisticsFile1.path(),
statisticsFile2.path(),
partitionStatisticsFile1.path(),
partitionStatisticsFile2.path())),
partitionStatisticsFile2.path()), BatchFileCleanupTaskHandler.BatchFileType.TABLE_METADATA),
entity ->
entity.readData(
BatchFileCleanupTaskHandler.BatchFileCleanupTask.class)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,17 +77,12 @@ public boolean handleTask(TaskEntity task, CallContext callContext) {
missingFiles.size());
}

// Schedule the deletion for each file asynchronously
List<CompletableFuture<Void>> deleteFutures =
validFiles.stream()
.map(file -> super.tryDelete(tableId, authorizedFileIO, null, file, null, 1))
.toList();
CompletableFuture<Void> deleteFutures =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this is now a single future, so maybe we should name this deleteFuture instead

tryDelete(
tableId, authorizedFileIO, validFiles, cleanupTask.type().getValue(), true, null, 1);

try {
// Wait for all delete operations to finish
CompletableFuture<Void> allDeletes =
CompletableFuture.allOf(deleteFutures.toArray(new CompletableFuture[0]));
allDeletes.join();
deleteFutures.join();
} catch (Exception e) {
LOGGER.error("Exception detected during batch files deletion", e);
return false;
Expand All @@ -97,5 +92,25 @@ public boolean handleTask(TaskEntity task, CallContext callContext) {
}
}

public record BatchFileCleanupTask(TableIdentifier tableId, List<String> batchFiles) {}
public enum BatchFileType {
TABLE_METADATA("table_metadata");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not completely sure here, so am asking for context: why did we introduce this enum type here instead of keeping it as a record? Is this going to be extensible for something in the immediate future?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The record in line 114 is still there — we added this enum to specify what kind of file the BatchFileCleanupTask is cleaning up.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes a lot more sense :) Thanks!


private final String value;

BatchFileType(String value) {
this.value = value;
}

public String getValue() {
return value;
}

@Override
public String toString() {
return value;
}
}

public record BatchFileCleanupTask(
TableIdentifier tableId, List<String> batchFiles, BatchFileType type) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.io.FileIO;
import org.apache.polaris.core.context.CallContext;
Expand All @@ -29,9 +30,12 @@
import org.slf4j.LoggerFactory;

/**
* {@link FileCleanupTaskHandler} responsible for cleaning up files in table tasks. Handles retries
* for file deletions and skips files that are already missing. Subclasses must implement
* task-specific logic.
* Abstract base class for handling file cleanup tasks within Apache Polaris.
*
* <p>This class is for performing asynchronous file deletions with retry logic.
*
* <p>Subclasses must implement {@link #canHandleTask(TaskEntity)} and {@link
* #handleTask(TaskEntity, CallContext)} to define task-specific handling logic.
*/
public abstract class FileCleanupTaskHandler implements TaskHandler {

Expand All @@ -53,6 +57,19 @@ public FileCleanupTaskHandler(
@Override
public abstract boolean handleTask(TaskEntity task, CallContext callContext);

/**
* Attempts to delete a single file with retry logic. If the file does not exist, it logs a
* message and does not retry. If an error occurs, it retries up to {@link #MAX_ATTEMPTS} times
* before failing.
*
* @param tableId The identifier of the table associated with the file.
* @param fileIO The {@link FileIO} instance used for file operations.
* @param baseFile An optional base file associated with the file being deleted (can be null).
* @param file The path of the file to be deleted.
* @param e The exception from the previous attempt, if any.
* @param attempt The current retry attempt count.
* @return A {@link CompletableFuture} representing the asynchronous deletion operation.
*/
public CompletableFuture<Void> tryDelete(
TableIdentifier tableId,
FileIO fileIO,
Expand Down Expand Up @@ -103,4 +120,53 @@ public CompletableFuture<Void> tryDelete(
CompletableFuture.delayedExecutor(
FILE_DELETION_RETRY_MILLIS, TimeUnit.MILLISECONDS, executorService));
}

/**
* Attempts to delete multiple files in a batch operation with retry logic. If an error occurs, it
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This LGTM, but it's important to note that we may not retry in the event that the service dies. Eventually, we should have Polaris try to drain the task queue for any tasks that failed the first time they were run

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's a good catch, do you mind if I follow up with a new PR to do this, it might involve refactoring on existing delete task

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@eric-maynard - I was looking at the currently existent tryDelete function regarding this comment. Is this comment something that also applies to that function?

Copy link
Contributor Author

@danielhumanmod danielhumanmod May 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@eric-maynard - I was looking at the currently existent tryDelete function regarding this comment. Is this comment something that also applies to that function?

No @adnanhemani , current PR mainly focusing on provide more efficient bulk deletion, but can not guarantee task got eventually executed.

For Eric's comment, we have a plan to fix that, please refer to #774

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@danielhumanmod - my question was that does the current tryDelete function also suffer from the same lack of guarantees that the task may not eventually retry. (Was a question for my learning to see if there was anything fundamentally different in this new function that was previously covered by the existing function or tasks in general, not accusatory or asking for any change). After reading #774 - it seems like the same comment apply to both tasks. Thanks!

* retries up to {@link #MAX_ATTEMPTS} times before failing.
*
* @param tableId The identifier of the table associated with the files.
* @param fileIO The {@link FileIO} instance used for file operations.
* @param files The list of file paths to be deleted.
* @param type The type of files being deleted (e.g., data files, metadata files).
* @param isConcurrent Whether the deletion should be performed concurrently.
* @param e The exception from the previous attempt, if any.
* @param attempt The current retry attempt count.
* @return A {@link CompletableFuture} representing the asynchronous batch deletion operation.
*/
public CompletableFuture<Void> tryDelete(
TableIdentifier tableId,
FileIO fileIO,
Iterable<String> files,
String type,
Boolean isConcurrent,
Throwable e,
int attempt) {
if (e != null && attempt <= MAX_ATTEMPTS) {
LOGGER
.atWarn()
.addKeyValue("files", files)
.addKeyValue("attempt", attempt)
.addKeyValue("error", e.getMessage())
.addKeyValue("type", type)
.log("Error encountered attempting to delete files");
}
if (attempt > MAX_ATTEMPTS && e != null) {
return CompletableFuture.failedFuture(e);
}
return CompletableFuture.runAsync(
() -> CatalogUtil.deleteFiles(fileIO, files, type, isConcurrent), executorService)
.exceptionallyComposeAsync(
newEx -> {
LOGGER
.atWarn()
.addKeyValue("files", files)
.addKeyValue("tableIdentifier", tableId)
.addKeyValue("type", type)
.log("Exception caught deleting data files", newEx);
return tryDelete(tableId, fileIO, files, type, isConcurrent, newEx, attempt + 1);
},
CompletableFuture.delayedExecutor(
FILE_DELETION_RETRY_MILLIS, TimeUnit.MILLISECONDS, executorService));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,9 @@ private Stream<TaskEntity> getMetadataTaskStream(
.withTaskType(AsyncTaskType.BATCH_FILE_CLEANUP)
.withData(
new BatchFileCleanupTaskHandler.BatchFileCleanupTask(
tableEntity.getTableIdentifier(), metadataBatch))
tableEntity.getTableIdentifier(),
metadataBatch,
BatchFileCleanupTaskHandler.BatchFileType.TABLE_METADATA))
.setInternalProperties(cleanupTask.getInternalPropertiesAsMap())
.build();
});
Expand Down