diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java index 633bca5f95..e485b200d2 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java @@ -108,7 +108,6 @@ import org.apache.polaris.core.exceptions.CommitConflictException; import org.apache.polaris.core.persistence.PolarisMetaStoreManager; import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper; -import org.apache.polaris.core.persistence.ResolvedPolarisEntity; import org.apache.polaris.core.persistence.dao.entity.BaseResult; import org.apache.polaris.core.persistence.dao.entity.DropEntityResult; import org.apache.polaris.core.persistence.dao.entity.EntityResult; @@ -120,12 +119,14 @@ import org.apache.polaris.core.persistence.resolver.ResolverFactory; import org.apache.polaris.core.persistence.resolver.ResolverPath; import org.apache.polaris.core.persistence.resolver.ResolverStatus; +import org.apache.polaris.core.storage.AccessConfig; import org.apache.polaris.core.storage.PolarisStorageActions; import org.apache.polaris.core.storage.StorageLocation; import org.apache.polaris.core.storage.StorageUtil; import org.apache.polaris.service.catalog.SupportsNotifications; import org.apache.polaris.service.catalog.common.CatalogUtils; import org.apache.polaris.service.catalog.common.LocationUtils; +import org.apache.polaris.service.catalog.io.AccessConfigProvider; import org.apache.polaris.service.catalog.io.FileIOFactory; import org.apache.polaris.service.catalog.io.FileIOUtil; import org.apache.polaris.service.catalog.validation.IcebergPropertiesValidation; @@ -178,6 +179,7 @@ public class IcebergCatalog extends BaseMetastoreViewCatalog private long catalogId = -1; private String defaultBaseLocation; private Map catalogProperties; + private final AccessConfigProvider accessConfigProvider; private FileIOFactory fileIOFactory; private PolarisMetaStoreManager metaStoreManager; @@ -195,6 +197,7 @@ public IcebergCatalog( PolarisResolutionManifestCatalogView resolvedEntityView, SecurityContext securityContext, TaskExecutor taskExecutor, + AccessConfigProvider accessConfigProvider, FileIOFactory fileIOFactory, PolarisEventListener polarisEventListener) { this.diagnostics = diagnostics; @@ -207,6 +210,7 @@ public IcebergCatalog( this.taskExecutor = taskExecutor; this.catalogId = catalogEntity.getId(); this.catalogName = catalogEntity.getName(); + this.accessConfigProvider = accessConfigProvider; this.fileIOFactory = fileIOFactory; this.metaStoreManager = metaStoreManager; this.polarisEventListener = polarisEventListener; @@ -2076,16 +2080,16 @@ private FileIO loadFileIOForTableLike( PolarisResolvedPathWrapper resolvedStorageEntity, Map tableProperties, Set storageActions) { - // Reload fileIO based on table specific context - FileIO fileIO = - fileIOFactory.loadFileIO( + AccessConfig accessConfig = + accessConfigProvider.getAccessConfig( callContext, - ioImplClassName, - tableProperties, identifier, readLocations, storageActions, + Optional.empty(), resolvedStorageEntity); + // Reload fileIO based on table specific context + FileIO fileIO = fileIOFactory.loadFileIO(accessConfig, ioImplClassName, tableProperties); // ensure the new fileIO is closed when the catalog is closed closeableGroup.addCloseable(fileIO); return fileIO; @@ -2595,26 +2599,6 @@ private Page listTableLike( .map(record -> TableIdentifier.of(parentNamespace, record.getName())); } - /** - * Load FileIO with provided impl and properties - * - * @param ioImpl full class name of a custom FileIO implementation - * @param properties used to initialize the FileIO implementation - * @return FileIO object - */ - protected FileIO loadFileIO(String ioImpl, Map properties) { - IcebergTableLikeEntity icebergTableLikeEntity = IcebergTableLikeEntity.of(catalogEntity); - TableIdentifier identifier = icebergTableLikeEntity.getTableIdentifier(); - Set locations = Set.of(catalogEntity.getBaseLocation()); - ResolvedPolarisEntity resolvedCatalogEntity = - new ResolvedPolarisEntity(catalogEntity, List.of(), List.of()); - PolarisResolvedPathWrapper resolvedPath = - new PolarisResolvedPathWrapper(List.of(resolvedCatalogEntity)); - Set storageActions = Set.of(PolarisStorageActions.ALL); - return fileIOFactory.loadFileIO( - callContext, ioImpl, properties, identifier, locations, storageActions, resolvedPath); - } - private int getMaxMetadataRefreshRetries() { return realmConfig.getConfig(FeatureConfiguration.MAX_METADATA_REFRESH_RETRIES); } diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/DefaultFileIOFactory.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/DefaultFileIOFactory.java index 6a70fadae2..a2e78524dc 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/DefaultFileIOFactory.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/DefaultFileIOFactory.java @@ -25,55 +25,32 @@ import jakarta.inject.Inject; import java.util.HashMap; import java.util.Map; -import java.util.Optional; -import java.util.Set; import org.apache.iceberg.CatalogUtil; -import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.io.FileIO; -import org.apache.polaris.core.context.CallContext; -import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper; import org.apache.polaris.core.storage.AccessConfig; -import org.apache.polaris.core.storage.PolarisStorageActions; /** * A default FileIO factory implementation for creating Iceberg {@link FileIO} instances with * contextual table-level properties. * *

This class acts as a translation layer between Polaris properties and the properties required - * by Iceberg's {@link FileIO}. For example, it evaluates storage actions and retrieves subscoped - * credentials to initialize a {@link FileIO} instance with the most limited permissions necessary. + * by Iceberg's {@link FileIO}. */ @ApplicationScoped @Identifier("default") public class DefaultFileIOFactory implements FileIOFactory { - private final AccessConfigProvider accessConfigProvider; - @Inject - public DefaultFileIOFactory(AccessConfigProvider accessConfigProvider) { - this.accessConfigProvider = accessConfigProvider; - } + public DefaultFileIOFactory() {} @Override public FileIO loadFileIO( - @Nonnull CallContext callContext, + @Nonnull AccessConfig accessConfig, @Nonnull String ioImplClassName, - @Nonnull Map properties, - @Nonnull TableIdentifier identifier, - @Nonnull Set tableLocations, - @Nonnull Set storageActions, - @Nonnull PolarisResolvedPathWrapper resolvedEntityPath) { + @Nonnull Map properties) { // Get subcoped creds properties = new HashMap<>(properties); - AccessConfig accessConfig = - accessConfigProvider.getAccessConfig( - callContext, - identifier, - tableLocations, - storageActions, - Optional.empty(), - resolvedEntityPath); // Update the FileIO with the subscoped credentials // Update with properties in case there are table-level overrides the credentials should diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/FileIOFactory.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/FileIOFactory.java index f3e0d6b98b..5c6007efa1 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/FileIOFactory.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/FileIOFactory.java @@ -21,12 +21,8 @@ import jakarta.annotation.Nonnull; import jakarta.enterprise.context.ApplicationScoped; import java.util.Map; -import java.util.Set; -import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.io.FileIO; -import org.apache.polaris.core.context.CallContext; -import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper; -import org.apache.polaris.core.storage.PolarisStorageActions; +import org.apache.polaris.core.storage.AccessConfig; /** * Interface for providing a way to construct FileIO objects, such as for reading/writing S3. @@ -41,21 +37,13 @@ public interface FileIOFactory { *

This method may obtain subscoped credentials to restrict the FileIO's permissions, ensuring * secure and limited access to the table's data and locations. * - * @param callContext the call for which the FileIO is being loaded. + * @param accessConfig the access configuration containing credentials and other properties. * @param ioImplClassName the class name of the FileIO implementation to load. * @param properties configuration properties for the FileIO. - * @param identifier the table identifier. - * @param tableLocations locations associated with the table. - * @param storageActions storage actions allowed for the table. - * @param resolvedEntityPath resolved paths for the entities. * @return a configured FileIO instance. */ FileIO loadFileIO( - @Nonnull CallContext callContext, + @Nonnull AccessConfig accessConfig, @Nonnull String ioImplClassName, - @Nonnull Map properties, - @Nonnull TableIdentifier identifier, - @Nonnull Set tableLocations, - @Nonnull Set storageActions, - @Nonnull PolarisResolvedPathWrapper resolvedEntityPath); + @Nonnull Map properties); } diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/WasbTranslatingFileIOFactory.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/WasbTranslatingFileIOFactory.java index ffe8cc1fd2..47617309ce 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/WasbTranslatingFileIOFactory.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/WasbTranslatingFileIOFactory.java @@ -23,12 +23,8 @@ import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; import java.util.Map; -import java.util.Set; -import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.io.FileIO; -import org.apache.polaris.core.context.CallContext; -import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper; -import org.apache.polaris.core.storage.PolarisStorageActions; +import org.apache.polaris.core.storage.AccessConfig; /** A {@link FileIOFactory} that translates WASB paths to ABFS ones */ @ApplicationScoped @@ -38,27 +34,16 @@ public class WasbTranslatingFileIOFactory implements FileIOFactory { private final FileIOFactory defaultFileIOFactory; @Inject - public WasbTranslatingFileIOFactory(AccessConfigProvider accessConfigProvider) { - defaultFileIOFactory = new DefaultFileIOFactory(accessConfigProvider); + public WasbTranslatingFileIOFactory() { + defaultFileIOFactory = new DefaultFileIOFactory(); } @Override public FileIO loadFileIO( - @Nonnull CallContext callContext, + @Nonnull AccessConfig accessConfig, @Nonnull String ioImplClassName, - @Nonnull Map properties, - @Nonnull TableIdentifier identifier, - @Nonnull Set tableLocations, - @Nonnull Set storageActions, - @Nonnull PolarisResolvedPathWrapper resolvedEntityPath) { + @Nonnull Map properties) { return new WasbTranslatingFileIO( - defaultFileIOFactory.loadFileIO( - callContext, - ioImplClassName, - properties, - identifier, - tableLocations, - storageActions, - resolvedEntityPath)); + defaultFileIOFactory.loadFileIO(accessConfig, ioImplClassName, properties)); } } diff --git a/runtime/service/src/main/java/org/apache/polaris/service/context/catalog/PolarisCallContextCatalogFactory.java b/runtime/service/src/main/java/org/apache/polaris/service/context/catalog/PolarisCallContextCatalogFactory.java index c9dcbefaee..d52e1c0fbb 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/context/catalog/PolarisCallContextCatalogFactory.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/context/catalog/PolarisCallContextCatalogFactory.java @@ -33,6 +33,7 @@ import org.apache.polaris.core.persistence.resolver.PolarisResolutionManifest; import org.apache.polaris.core.persistence.resolver.ResolverFactory; import org.apache.polaris.service.catalog.iceberg.IcebergCatalog; +import org.apache.polaris.service.catalog.io.AccessConfigProvider; import org.apache.polaris.service.catalog.io.FileIOFactory; import org.apache.polaris.service.events.listeners.PolarisEventListener; import org.apache.polaris.service.task.TaskExecutor; @@ -46,6 +47,7 @@ public class PolarisCallContextCatalogFactory implements CallContextCatalogFacto private final PolarisDiagnostics diagnostics; private final TaskExecutor taskExecutor; + private final AccessConfigProvider accessConfigProvider; private final FileIOFactory fileIOFactory; private final ResolverFactory resolverFactory; private final MetaStoreManagerFactory metaStoreManagerFactory; @@ -57,12 +59,14 @@ public PolarisCallContextCatalogFactory( ResolverFactory resolverFactory, MetaStoreManagerFactory metaStoreManagerFactory, TaskExecutor taskExecutor, + AccessConfigProvider accessConfigProvider, FileIOFactory fileIOFactory, PolarisEventListener polarisEventListener) { this.diagnostics = diagnostics; this.resolverFactory = resolverFactory; this.metaStoreManagerFactory = metaStoreManagerFactory; this.taskExecutor = taskExecutor; + this.accessConfigProvider = accessConfigProvider; this.fileIOFactory = fileIOFactory; this.polarisEventListener = polarisEventListener; } @@ -89,6 +93,7 @@ public Catalog createCallContextCatalog( resolvedManifest, securityContext, taskExecutor, + accessConfigProvider, fileIOFactory, polarisEventListener); diff --git a/runtime/service/src/main/java/org/apache/polaris/service/task/TaskFileIOSupplier.java b/runtime/service/src/main/java/org/apache/polaris/service/task/TaskFileIOSupplier.java index b21eaebf59..720f204fc2 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/task/TaskFileIOSupplier.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/task/TaskFileIOSupplier.java @@ -23,6 +23,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.catalog.TableIdentifier; @@ -32,19 +33,25 @@ import org.apache.polaris.core.entity.TaskEntity; import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper; import org.apache.polaris.core.persistence.ResolvedPolarisEntity; +import org.apache.polaris.core.storage.AccessConfig; import org.apache.polaris.core.storage.PolarisStorageActions; +import org.apache.polaris.service.catalog.io.AccessConfigProvider; import org.apache.polaris.service.catalog.io.FileIOFactory; @ApplicationScoped public class TaskFileIOSupplier { private final FileIOFactory fileIOFactory; + private final AccessConfigProvider accessConfigProvider; @Inject - public TaskFileIOSupplier(FileIOFactory fileIOFactory) { + public TaskFileIOSupplier( + FileIOFactory fileIOFactory, AccessConfigProvider accessConfigProvider) { this.fileIOFactory = fileIOFactory; + this.accessConfigProvider = accessConfigProvider; } public FileIO apply(TaskEntity task, TableIdentifier identifier, CallContext callContext) { + Map internalProperties = task.getInternalPropertiesAsMap(); Map properties = new HashMap<>(internalProperties); @@ -55,11 +62,14 @@ public FileIO apply(TaskEntity task, TableIdentifier identifier, CallContext cal new ResolvedPolarisEntity(task, List.of(), List.of()); PolarisResolvedPathWrapper resolvedPath = new PolarisResolvedPathWrapper(List.of(resolvedTaskEntity)); + AccessConfig accessConfig = + accessConfigProvider.getAccessConfig( + callContext, identifier, locations, storageActions, Optional.empty(), resolvedPath); + String ioImpl = properties.getOrDefault( CatalogProperties.FILE_IO_IMPL, "org.apache.iceberg.io.ResolvingFileIO"); - return fileIOFactory.loadFileIO( - callContext, ioImpl, properties, identifier, locations, storageActions, resolvedPath); + return fileIOFactory.loadFileIO(accessConfig, ioImpl, properties); } } diff --git a/runtime/service/src/test/java/org/apache/polaris/service/admin/PolarisAuthzTestBase.java b/runtime/service/src/test/java/org/apache/polaris/service/admin/PolarisAuthzTestBase.java index 6db99fb34d..f53d69f441 100644 --- a/runtime/service/src/test/java/org/apache/polaris/service/admin/PolarisAuthzTestBase.java +++ b/runtime/service/src/test/java/org/apache/polaris/service/admin/PolarisAuthzTestBase.java @@ -537,6 +537,7 @@ private void initBaseCatalog() { passthroughView, securityContext, Mockito.mock(), + accessConfigProvider, fileIOFactory, polarisEventListener); this.baseCatalog.initialize( @@ -562,10 +563,10 @@ protected TestPolarisCallContextCatalogFactory() { @Inject public TestPolarisCallContextCatalogFactory( PolarisDiagnostics diagnostics, - StorageCredentialCache storageCredentialCache, ResolverFactory resolverFactory, MetaStoreManagerFactory metaStoreManagerFactory, TaskExecutor taskExecutor, + AccessConfigProvider accessConfigProvider, FileIOFactory fileIOFactory, PolarisEventListener polarisEventListener) { super( @@ -573,6 +574,7 @@ public TestPolarisCallContextCatalogFactory( resolverFactory, metaStoreManagerFactory, taskExecutor, + accessConfigProvider, fileIOFactory, polarisEventListener); } diff --git a/runtime/service/src/test/java/org/apache/polaris/service/admin/PolarisS3InteroperabilityTest.java b/runtime/service/src/test/java/org/apache/polaris/service/admin/PolarisS3InteroperabilityTest.java index 4ab75815ae..d984234a68 100644 --- a/runtime/service/src/test/java/org/apache/polaris/service/admin/PolarisS3InteroperabilityTest.java +++ b/runtime/service/src/test/java/org/apache/polaris/service/admin/PolarisS3InteroperabilityTest.java @@ -28,6 +28,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.function.Supplier; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.exceptions.ForbiddenException; import org.apache.iceberg.inmemory.InMemoryFileIO; @@ -72,16 +73,8 @@ private static String makeTableLocation( } public PolarisS3InteroperabilityTest() { - TestServices.FileIOFactorySupplier fileIOFactorySupplier = - (accessConfigProvider) -> - (FileIOFactory) - (callContext, - ioImplClassName, - properties, - identifier, - tableLocations, - storageActions, - resolvedEntityPath) -> new InMemoryFileIO(); + Supplier fileIOFactorySupplier = + () -> (FileIOFactory) (accessConfig, ioImplClassName, properties) -> new InMemoryFileIO(); services = TestServices.builder() .config(SERVER_CONFIG) diff --git a/runtime/service/src/test/java/org/apache/polaris/service/catalog/generic/AbstractPolarisGenericTableCatalogTest.java b/runtime/service/src/test/java/org/apache/polaris/service/catalog/generic/AbstractPolarisGenericTableCatalogTest.java index 4f2ce23afc..0a63a80be7 100644 --- a/runtime/service/src/test/java/org/apache/polaris/service/catalog/generic/AbstractPolarisGenericTableCatalogTest.java +++ b/runtime/service/src/test/java/org/apache/polaris/service/catalog/generic/AbstractPolarisGenericTableCatalogTest.java @@ -215,7 +215,7 @@ public void before(TestInfo testInfo) { new PolarisPassthroughResolutionView( resolutionManifestFactory, securityContext, CATALOG_NAME); TaskExecutor taskExecutor = Mockito.mock(); - this.fileIOFactory = new DefaultFileIOFactory(accessConfigProvider); + this.fileIOFactory = new DefaultFileIOFactory(); StsClient stsClient = Mockito.mock(StsClient.class); when(stsClient.assumeRole(isA(AssumeRoleRequest.class))) @@ -249,6 +249,7 @@ public void before(TestInfo testInfo) { passthroughView, securityContext, taskExecutor, + accessConfigProvider, fileIOFactory, new NoOpPolarisEventListener()); this.icebergCatalog.initialize( diff --git a/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogTest.java b/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogTest.java index 72cda7ecb4..61026127eb 100644 --- a/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogTest.java +++ b/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogTest.java @@ -102,7 +102,6 @@ import org.apache.polaris.core.config.FeatureConfiguration; import org.apache.polaris.core.config.PolarisConfigurationStore; import org.apache.polaris.core.config.RealmConfig; -import org.apache.polaris.core.context.CallContext; import org.apache.polaris.core.context.RealmContext; import org.apache.polaris.core.entity.CatalogEntity; import org.apache.polaris.core.entity.NamespaceEntity; @@ -117,7 +116,6 @@ import org.apache.polaris.core.identity.provider.ServiceIdentityProvider; import org.apache.polaris.core.persistence.MetaStoreManagerFactory; import org.apache.polaris.core.persistence.PolarisMetaStoreManager; -import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper; import org.apache.polaris.core.persistence.cache.EntityCache; import org.apache.polaris.core.persistence.dao.entity.BaseResult; import org.apache.polaris.core.persistence.dao.entity.EntityResult; @@ -129,7 +127,7 @@ import org.apache.polaris.core.persistence.resolver.ResolverFactory; import org.apache.polaris.core.secrets.UserSecretsManager; import org.apache.polaris.core.secrets.UserSecretsManagerFactory; -import org.apache.polaris.core.storage.PolarisStorageActions; +import org.apache.polaris.core.storage.AccessConfig; import org.apache.polaris.core.storage.PolarisStorageIntegration; import org.apache.polaris.core.storage.PolarisStorageIntegrationProvider; import org.apache.polaris.core.storage.StorageAccessProperty; @@ -360,7 +358,7 @@ public void before(TestInfo testInfo) { .build() .asCatalog(serviceIdentityProvider))); - this.fileIOFactory = new DefaultFileIOFactory(accessConfigProvider); + this.fileIOFactory = new DefaultFileIOFactory(); StsClient stsClient = Mockito.mock(StsClient.class); when(stsClient.assumeRole(isA(AssumeRoleRequest.class))) @@ -462,6 +460,7 @@ protected IcebergCatalog newIcebergCatalog( passthroughView, securityContext, taskExecutor, + accessConfigProvider, fileIOFactory, polarisEventListener); } @@ -1004,7 +1003,7 @@ public void testValidateNotificationFailToCreateFileIO() { // filename. final String tableLocation = "s3://externally-owned-bucket/validate_table/"; final String tableMetadataLocation = tableLocation + "metadata/"; - FileIOFactory fileIOFactory = spy(new DefaultFileIOFactory(accessConfigProvider)); + FileIOFactory fileIOFactory = spy(new DefaultFileIOFactory()); IcebergCatalog catalog = newIcebergCatalog(catalog().name(), metaStoreManager, fileIOFactory); catalog.initialize( CATALOG_NAME, @@ -1025,7 +1024,7 @@ public void testValidateNotificationFailToCreateFileIO() { doThrow(new ForbiddenException("Fake failure applying downscoped credentials")) .when(fileIOFactory) - .loadFileIO(any(), any(), any(), any(), any(), any(), any()); + .loadFileIO(any(), any(), any()); Assertions.assertThatThrownBy(() -> catalog.sendNotification(table, request)) .isInstanceOf(ForbiddenException.class) .hasMessageContaining("Fake failure applying downscoped credentials"); @@ -1921,7 +1920,7 @@ public void testDropTableWithPurge() { .containsEntry(StorageAccessProperty.AWS_SECRET_KEY.getPropertyName(), SECRET_ACCESS_KEY) .containsEntry(StorageAccessProperty.AWS_TOKEN.getPropertyName(), SESSION_TOKEN); FileIO fileIO = - new TaskFileIOSupplier(new DefaultFileIOFactory(accessConfigProvider)) + new TaskFileIOSupplier(new DefaultFileIOFactory(), accessConfigProvider) .apply(taskEntity, TABLE, polarisContext); Assertions.assertThat(fileIO).isNotNull().isInstanceOf(ExceptionMappingFileIO.class); Assertions.assertThat(((ExceptionMappingFileIO) fileIO).getInnerIo()) @@ -2047,7 +2046,7 @@ static Stream testRetriableException() { @Test public void testFileIOWrapper() { - MeasuredFileIOFactory measured = new MeasuredFileIOFactory(accessConfigProvider); + MeasuredFileIOFactory measured = new MeasuredFileIOFactory(); IcebergCatalog catalog = newIcebergCatalog(CATALOG_NAME, metaStoreManager, measured); catalog.initialize( CATALOG_NAME, @@ -2090,23 +2089,14 @@ public void testFileIOWrapper() { new FileIOFactory() { @Override public FileIO loadFileIO( - @Nonnull CallContext callContext, + @Nonnull AccessConfig accessConfig, @Nonnull String ioImplClassName, - @Nonnull Map properties, - @Nonnull TableIdentifier identifier, - @Nonnull Set tableLocations, - @Nonnull Set storageActions, - @Nonnull PolarisResolvedPathWrapper resolvedEntityPath) { + @Nonnull Map properties) { return measured.loadFileIO( - callContext, - "org.apache.iceberg.inmemory.InMemoryFileIO", - Map.of(), - TABLE, - Set.of(table.location()), - Set.of(PolarisStorageActions.ALL), - Mockito.mock()); + accessConfig, "org.apache.iceberg.inmemory.InMemoryFileIO", Map.of()); } - }); + }, + accessConfigProvider); TableCleanupTaskHandler handler = new TableCleanupTaskHandler( diff --git a/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogViewTest.java b/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogViewTest.java index d6fc350050..62ae103056 100644 --- a/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogViewTest.java +++ b/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogViewTest.java @@ -210,7 +210,7 @@ public void before(TestInfo testInfo) { PolarisPassthroughResolutionView passthroughView = new PolarisPassthroughResolutionView( resolutionManifestFactory, securityContext, CATALOG_NAME); - FileIOFactory fileIOFactory = new DefaultFileIOFactory(accessConfigProvider); + FileIOFactory fileIOFactory = new DefaultFileIOFactory(); testPolarisEventListener = (TestPolarisEventListener) polarisEventListener; testPolarisEventListener.clear(); @@ -223,6 +223,7 @@ public void before(TestInfo testInfo) { passthroughView, securityContext, Mockito.mock(), + accessConfigProvider, fileIOFactory, polarisEventListener); Map properties = diff --git a/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandlerAuthzTest.java b/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandlerAuthzTest.java index f3cd56162d..646393a103 100644 --- a/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandlerAuthzTest.java +++ b/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandlerAuthzTest.java @@ -1899,7 +1899,8 @@ public void testSendNotificationSufficientPrivileges() { resolverFactory, managerFactory, Mockito.mock(), - new DefaultFileIOFactory(accessConfigProvider), + accessConfigProvider, + new DefaultFileIOFactory(), polarisEventListener) { @Override public Catalog createCallContextCatalog( diff --git a/runtime/service/src/test/java/org/apache/polaris/service/catalog/io/FileIOFactoryTest.java b/runtime/service/src/test/java/org/apache/polaris/service/catalog/io/FileIOFactoryTest.java index 53ebebf7a6..32af648616 100644 --- a/runtime/service/src/test/java/org/apache/polaris/service/catalog/io/FileIOFactoryTest.java +++ b/runtime/service/src/test/java/org/apache/polaris/service/catalog/io/FileIOFactoryTest.java @@ -27,6 +27,7 @@ import java.lang.reflect.Method; import java.util.List; import java.util.Map; +import java.util.function.Supplier; import org.apache.iceberg.Schema; import org.apache.iceberg.aws.s3.S3FileIOProperties; import org.apache.iceberg.catalog.Namespace; @@ -101,10 +102,10 @@ public void before(TestInfo testInfo) { .build()); // Spy FileIOFactory and check if the credentials are passed to the FileIO - TestServices.FileIOFactorySupplier fileIOFactorySupplier = - (accessConfigProvider) -> + Supplier fileIOFactorySupplier = + () -> Mockito.spy( - new DefaultFileIOFactory(accessConfigProvider) { + new DefaultFileIOFactory() { @Override FileIO loadFileIOInternal( @Nonnull String ioImplClassName, @Nonnull Map properties) { @@ -149,14 +150,7 @@ public void testLoadFileIOForTableLike(String scheme) { // 1. BasePolarisCatalog:doCommit: for writing the table during the creation Mockito.verify(testServices.fileIOFactory(), Mockito.times(1)) - .loadFileIO( - Mockito.any(), - Mockito.any(), - Mockito.any(), - Mockito.any(), - Mockito.any(), - Mockito.any(), - Mockito.any()); + .loadFileIO(Mockito.any(), Mockito.any(), Mockito.any()); } @ParameterizedTest @@ -175,7 +169,8 @@ public void testLoadFileIOForCleanupTask(String scheme) { Assertions.assertThat(tasks).hasSize(1); TaskEntity taskEntity = TaskEntity.of(tasks.get(0)); FileIO fileIO = - new TaskFileIOSupplier(testServices.fileIOFactory()).apply(taskEntity, TABLE, callContext); + new TaskFileIOSupplier(testServices.fileIOFactory(), testServices.accessConfigProvider()) + .apply(taskEntity, TABLE, callContext); Assertions.assertThat(fileIO).isNotNull().isInstanceOf(ExceptionMappingFileIO.class); Assertions.assertThat(((ExceptionMappingFileIO) fileIO).getInnerIo()) .isInstanceOf(InMemoryFileIO.class); @@ -184,14 +179,7 @@ public void testLoadFileIOForCleanupTask(String scheme) { // 2. BasePolarisCatalog:doRefresh: for reading the table during the drop // 3. TaskFileIOSupplier:apply: for clean up metadata files and merge files Mockito.verify(testServices.fileIOFactory(), Mockito.times(3)) - .loadFileIO( - Mockito.any(), - Mockito.any(), - Mockito.any(), - Mockito.any(), - Mockito.any(), - Mockito.any(), - Mockito.any()); + .loadFileIO(Mockito.any(), Mockito.any(), Mockito.any()); } IcebergCatalog createCatalog(TestServices services, String scheme) { @@ -228,6 +216,7 @@ IcebergCatalog createCatalog(TestServices services, String scheme) { passthroughView, services.securityContext(), services.taskExecutor(), + services.accessConfigProvider(), services.fileIOFactory(), services.polarisEventListener()); polarisCatalog.initialize( diff --git a/runtime/service/src/test/java/org/apache/polaris/service/catalog/policy/AbstractPolicyCatalogTest.java b/runtime/service/src/test/java/org/apache/polaris/service/catalog/policy/AbstractPolicyCatalogTest.java index f03afecdbe..5af0c8ac99 100644 --- a/runtime/service/src/test/java/org/apache/polaris/service/catalog/policy/AbstractPolicyCatalogTest.java +++ b/runtime/service/src/test/java/org/apache/polaris/service/catalog/policy/AbstractPolicyCatalogTest.java @@ -234,7 +234,7 @@ public void before(TestInfo testInfo) { new PolarisPassthroughResolutionView( resolutionManifestFactory, securityContext, CATALOG_NAME); TaskExecutor taskExecutor = Mockito.mock(); - this.fileIOFactory = new DefaultFileIOFactory(accessConfigProvider); + this.fileIOFactory = new DefaultFileIOFactory(); StsClient stsClient = Mockito.mock(StsClient.class); when(stsClient.assumeRole(isA(AssumeRoleRequest.class))) @@ -266,6 +266,7 @@ public void before(TestInfo testInfo) { passthroughView, securityContext, taskExecutor, + accessConfigProvider, fileIOFactory, new NoOpPolarisEventListener()); this.icebergCatalog.initialize( diff --git a/runtime/service/src/test/java/org/apache/polaris/service/task/BatchFileCleanupTaskHandlerTest.java b/runtime/service/src/test/java/org/apache/polaris/service/task/BatchFileCleanupTaskHandlerTest.java index 326664620b..28db39376e 100644 --- a/runtime/service/src/test/java/org/apache/polaris/service/task/BatchFileCleanupTaskHandlerTest.java +++ b/runtime/service/src/test/java/org/apache/polaris/service/task/BatchFileCleanupTaskHandlerTest.java @@ -49,7 +49,9 @@ import org.apache.polaris.core.persistence.BasePersistence; import org.apache.polaris.core.persistence.MetaStoreManagerFactory; import org.apache.polaris.service.TestFileIOFactory; +import org.apache.polaris.service.catalog.io.AccessConfigProvider; import org.junit.jupiter.api.Test; +import org.mockito.Mockito; @QuarkusTest public class BatchFileCleanupTaskHandlerTest { @@ -57,7 +59,8 @@ public class BatchFileCleanupTaskHandlerTest { private final RealmContext realmContext = () -> "realmName"; private TaskFileIOSupplier buildTaskFileIOSupplier(FileIO fileIO) { - return new TaskFileIOSupplier(new TestFileIOFactory(fileIO)); + return new TaskFileIOSupplier( + new TestFileIOFactory(fileIO), Mockito.mock(AccessConfigProvider.class)); } private PolarisCallContext newCallContext() { diff --git a/runtime/service/src/test/java/org/apache/polaris/service/task/ManifestFileCleanupTaskHandlerTest.java b/runtime/service/src/test/java/org/apache/polaris/service/task/ManifestFileCleanupTaskHandlerTest.java index 99eacdbe2e..d9ca54fac4 100644 --- a/runtime/service/src/test/java/org/apache/polaris/service/task/ManifestFileCleanupTaskHandlerTest.java +++ b/runtime/service/src/test/java/org/apache/polaris/service/task/ManifestFileCleanupTaskHandlerTest.java @@ -45,7 +45,9 @@ import org.apache.polaris.core.persistence.BasePersistence; import org.apache.polaris.core.persistence.MetaStoreManagerFactory; import org.apache.polaris.service.TestFileIOFactory; +import org.apache.polaris.service.catalog.io.AccessConfigProvider; import org.junit.jupiter.api.Test; +import org.mockito.Mockito; @QuarkusTest class ManifestFileCleanupTaskHandlerTest { @@ -54,7 +56,8 @@ class ManifestFileCleanupTaskHandlerTest { private final RealmContext realmContext = () -> "realmName"; private TaskFileIOSupplier buildTaskFileIOSupplier(FileIO fileIO) { - return new TaskFileIOSupplier(new TestFileIOFactory(fileIO)); + return new TaskFileIOSupplier( + new TestFileIOFactory(fileIO), Mockito.mock(AccessConfigProvider.class)); } private PolarisCallContext newCallContext() { diff --git a/runtime/service/src/test/java/org/apache/polaris/service/task/TableCleanupTaskHandlerTest.java b/runtime/service/src/test/java/org/apache/polaris/service/task/TableCleanupTaskHandlerTest.java index 336607913d..8ebb96a421 100644 --- a/runtime/service/src/test/java/org/apache/polaris/service/task/TableCleanupTaskHandlerTest.java +++ b/runtime/service/src/test/java/org/apache/polaris/service/task/TableCleanupTaskHandlerTest.java @@ -51,6 +51,7 @@ import org.apache.polaris.core.persistence.PolarisMetaStoreManager; import org.apache.polaris.core.persistence.pagination.PageToken; import org.apache.polaris.service.TestFileIOFactory; +import org.apache.polaris.service.catalog.io.AccessConfigProvider; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -69,7 +70,9 @@ class TableCleanupTaskHandlerTest { private final RealmContext realmContext = () -> "realmName"; private TableCleanupTaskHandler newTableCleanupTaskHandler(FileIO fileIO) { - TaskFileIOSupplier taskFileIOSupplier = new TaskFileIOSupplier(new TestFileIOFactory(fileIO)); + TaskFileIOSupplier taskFileIOSupplier = + new TaskFileIOSupplier( + new TestFileIOFactory(fileIO), Mockito.mock(AccessConfigProvider.class)); return new TableCleanupTaskHandler( Mockito.mock(), clock, metaStoreManagerFactory, taskFileIOSupplier); } diff --git a/runtime/service/src/test/java/org/apache/polaris/service/task/TaskExecutorImplTest.java b/runtime/service/src/test/java/org/apache/polaris/service/task/TaskExecutorImplTest.java index 3d59c481a2..d6743a3720 100644 --- a/runtime/service/src/test/java/org/apache/polaris/service/task/TaskExecutorImplTest.java +++ b/runtime/service/src/test/java/org/apache/polaris/service/task/TaskExecutorImplTest.java @@ -63,7 +63,8 @@ void testEventsAreEmitted() { Runnable::run, testServices.clock(), testServices.metaStoreManagerFactory(), - new TaskFileIOSupplier(testServices.fileIOFactory()), + new TaskFileIOSupplier( + testServices.fileIOFactory(), testServices.accessConfigProvider()), testServices.polarisEventListener(), null); diff --git a/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestFileIOFactory.java b/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestFileIOFactory.java index 67d15a94ca..74f2fbc4cf 100644 --- a/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestFileIOFactory.java +++ b/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestFileIOFactory.java @@ -21,12 +21,8 @@ import jakarta.annotation.Nonnull; import java.util.Map; -import java.util.Set; -import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.io.FileIO; -import org.apache.polaris.core.context.CallContext; -import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper; -import org.apache.polaris.core.storage.PolarisStorageActions; +import org.apache.polaris.core.storage.AccessConfig; import org.apache.polaris.service.catalog.io.FileIOFactory; /** A FileIOFactory that always returns the same FileIO instance. */ @@ -40,13 +36,9 @@ public TestFileIOFactory(@Nonnull FileIO fileIO) { @Override public FileIO loadFileIO( - @Nonnull CallContext callContext, + @Nonnull AccessConfig accessConfig, @Nonnull String ioImplClassName, - @Nonnull Map properties, - @Nonnull TableIdentifier identifier, - @Nonnull Set tableLocations, - @Nonnull Set storageActions, - @Nonnull PolarisResolvedPathWrapper resolvedEntityPath) { + @Nonnull Map properties) { return fileIO; } } diff --git a/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java b/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java index a6fa48ef85..cd1652ab02 100644 --- a/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java +++ b/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java @@ -34,7 +34,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.function.Function; +import java.util.function.Supplier; import org.apache.polaris.core.PolarisCallContext; import org.apache.polaris.core.PolarisDefaultDiagServiceImpl; import org.apache.polaris.core.PolarisDiagnostics; @@ -115,9 +115,6 @@ public record TestServices( private static final RealmContext TEST_REALM = () -> "test-realm"; private static final String GCP_ACCESS_TOKEN = "abc"; - @FunctionalInterface - public interface FileIOFactorySupplier extends Function {} - private static class MockedConfigurationStore implements PolarisConfigurationStore { private final Map defaults; @@ -143,8 +140,7 @@ public static class Builder { private RealmContext realmContext = TEST_REALM; private Map config = Map.of(); private StsClient stsClient; - private FileIOFactorySupplier fileIOFactorySupplier = - metaStoreManagerFactory1 -> new MeasuredFileIOFactory(metaStoreManagerFactory1); + private Supplier fileIOFactorySupplier = MeasuredFileIOFactory::new; private Builder() { stsClient = Mockito.mock(StsClient.class, RETURNS_DEEP_STUBS); @@ -169,7 +165,7 @@ public Builder config(Map config) { return this; } - public Builder fileIOFactorySupplier(FileIOFactorySupplier fileIOFactorySupplier) { + public Builder fileIOFactorySupplier(Supplier fileIOFactorySupplier) { this.fileIOFactorySupplier = fileIOFactorySupplier; return this; } @@ -244,7 +240,7 @@ public TestServices build() { AccessConfigProvider accessConfigProvider = new AccessConfigProvider(storageCredentialCache, metaStoreManagerFactory); - FileIOFactory fileIOFactory = fileIOFactorySupplier.apply(accessConfigProvider); + FileIOFactory fileIOFactory = fileIOFactorySupplier.get(); TaskExecutor taskExecutor = Mockito.mock(TaskExecutor.class); @@ -255,6 +251,7 @@ public TestServices build() { resolverFactory, metaStoreManagerFactory, taskExecutor, + accessConfigProvider, fileIOFactory, polarisEventListener); diff --git a/runtime/service/src/testFixtures/java/org/apache/polaris/service/catalog/io/MeasuredFileIOFactory.java b/runtime/service/src/testFixtures/java/org/apache/polaris/service/catalog/io/MeasuredFileIOFactory.java index 04d6878a2e..1d5668d0fc 100644 --- a/runtime/service/src/testFixtures/java/org/apache/polaris/service/catalog/io/MeasuredFileIOFactory.java +++ b/runtime/service/src/testFixtures/java/org/apache/polaris/service/catalog/io/MeasuredFileIOFactory.java @@ -25,13 +25,9 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.Set; import java.util.function.Supplier; -import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.io.FileIO; -import org.apache.polaris.core.context.CallContext; -import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper; -import org.apache.polaris.core.storage.PolarisStorageActions; +import org.apache.polaris.core.storage.AccessConfig; /** * A FileIOFactory that measures the number of bytes read, files written, and files deleted. It can @@ -50,19 +46,15 @@ public class MeasuredFileIOFactory implements FileIOFactory { private final FileIOFactory defaultFileIOFactory; @Inject - public MeasuredFileIOFactory(AccessConfigProvider accessConfigProvider) { - defaultFileIOFactory = new DefaultFileIOFactory(accessConfigProvider); + public MeasuredFileIOFactory() { + defaultFileIOFactory = new DefaultFileIOFactory(); } @Override public FileIO loadFileIO( - @Nonnull CallContext callContext, + @Nonnull AccessConfig accessConfig, @Nonnull String ioImplClassName, - @Nonnull Map properties, - @Nonnull TableIdentifier identifier, - @Nonnull Set tableLocations, - @Nonnull Set storageActions, - @Nonnull PolarisResolvedPathWrapper resolvedEntityPath) { + @Nonnull Map properties) { loadFileIOExceptionSupplier.ifPresent( s -> { throw s.get(); @@ -70,14 +62,7 @@ public FileIO loadFileIO( MeasuredFileIO wrapped = new MeasuredFileIO( - defaultFileIOFactory.loadFileIO( - callContext, - ioImplClassName, - properties, - identifier, - tableLocations, - storageActions, - resolvedEntityPath), + defaultFileIOFactory.loadFileIO(accessConfig, ioImplClassName, properties), newInputFileExceptionSupplier, newOutputFileExceptionSupplier, getLengthExceptionSupplier);