Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@
import org.apache.polaris.core.exceptions.CommitConflictException;
import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
import org.apache.polaris.core.persistence.ResolvedPolarisEntity;
import org.apache.polaris.core.persistence.dao.entity.BaseResult;
import org.apache.polaris.core.persistence.dao.entity.DropEntityResult;
import org.apache.polaris.core.persistence.dao.entity.EntityResult;
Expand All @@ -120,12 +119,14 @@
import org.apache.polaris.core.persistence.resolver.ResolverFactory;
import org.apache.polaris.core.persistence.resolver.ResolverPath;
import org.apache.polaris.core.persistence.resolver.ResolverStatus;
import org.apache.polaris.core.storage.AccessConfig;
import org.apache.polaris.core.storage.PolarisStorageActions;
import org.apache.polaris.core.storage.StorageLocation;
import org.apache.polaris.core.storage.StorageUtil;
import org.apache.polaris.service.catalog.SupportsNotifications;
import org.apache.polaris.service.catalog.common.CatalogUtils;
import org.apache.polaris.service.catalog.common.LocationUtils;
import org.apache.polaris.service.catalog.io.AccessConfigProvider;
import org.apache.polaris.service.catalog.io.FileIOFactory;
import org.apache.polaris.service.catalog.io.FileIOUtil;
import org.apache.polaris.service.catalog.validation.IcebergPropertiesValidation;
Expand Down Expand Up @@ -178,6 +179,7 @@ public class IcebergCatalog extends BaseMetastoreViewCatalog
private long catalogId = -1;
private String defaultBaseLocation;
private Map<String, String> catalogProperties;
private final AccessConfigProvider accessConfigProvider;
private FileIOFactory fileIOFactory;
private PolarisMetaStoreManager metaStoreManager;

Expand All @@ -195,6 +197,7 @@ public IcebergCatalog(
PolarisResolutionManifestCatalogView resolvedEntityView,
SecurityContext securityContext,
TaskExecutor taskExecutor,
AccessConfigProvider accessConfigProvider,
FileIOFactory fileIOFactory,
PolarisEventListener polarisEventListener) {
this.diagnostics = diagnostics;
Expand All @@ -207,6 +210,7 @@ public IcebergCatalog(
this.taskExecutor = taskExecutor;
this.catalogId = catalogEntity.getId();
this.catalogName = catalogEntity.getName();
this.accessConfigProvider = accessConfigProvider;
this.fileIOFactory = fileIOFactory;
this.metaStoreManager = metaStoreManager;
this.polarisEventListener = polarisEventListener;
Expand Down Expand Up @@ -2076,16 +2080,16 @@ private FileIO loadFileIOForTableLike(
PolarisResolvedPathWrapper resolvedStorageEntity,
Map<String, String> tableProperties,
Set<PolarisStorageActions> storageActions) {
// Reload fileIO based on table specific context
FileIO fileIO =
fileIOFactory.loadFileIO(
AccessConfig accessConfig =
accessConfigProvider.getAccessConfig(
callContext,
ioImplClassName,
tableProperties,
identifier,
readLocations,
storageActions,
Optional.empty(),
resolvedStorageEntity);
// Reload fileIO based on table specific context
FileIO fileIO = fileIOFactory.loadFileIO(accessConfig, ioImplClassName, tableProperties);
// ensure the new fileIO is closed when the catalog is closed
closeableGroup.addCloseable(fileIO);
return fileIO;
Expand Down Expand Up @@ -2595,26 +2599,6 @@ private Page<TableIdentifier> listTableLike(
.map(record -> TableIdentifier.of(parentNamespace, record.getName()));
}

/**
* Load FileIO with provided impl and properties
*
* @param ioImpl full class name of a custom FileIO implementation
* @param properties used to initialize the FileIO implementation
* @return FileIO object
*/
protected FileIO loadFileIO(String ioImpl, Map<String, String> properties) {
IcebergTableLikeEntity icebergTableLikeEntity = IcebergTableLikeEntity.of(catalogEntity);
TableIdentifier identifier = icebergTableLikeEntity.getTableIdentifier();
Set<String> locations = Set.of(catalogEntity.getBaseLocation());
ResolvedPolarisEntity resolvedCatalogEntity =
new ResolvedPolarisEntity(catalogEntity, List.of(), List.of());
PolarisResolvedPathWrapper resolvedPath =
new PolarisResolvedPathWrapper(List.of(resolvedCatalogEntity));
Set<PolarisStorageActions> storageActions = Set.of(PolarisStorageActions.ALL);
return fileIOFactory.loadFileIO(
callContext, ioImpl, properties, identifier, locations, storageActions, resolvedPath);
}

private int getMaxMetadataRefreshRetries() {
return realmConfig.getConfig(FeatureConfiguration.MAX_METADATA_REFRESH_RETRIES);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,55 +25,32 @@
import jakarta.inject.Inject;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.io.FileIO;
import org.apache.polaris.core.context.CallContext;
import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
import org.apache.polaris.core.storage.AccessConfig;
import org.apache.polaris.core.storage.PolarisStorageActions;

/**
* A default FileIO factory implementation for creating Iceberg {@link FileIO} instances with
* contextual table-level properties.
*
* <p>This class acts as a translation layer between Polaris properties and the properties required
* by Iceberg's {@link FileIO}. For example, it evaluates storage actions and retrieves subscoped
* credentials to initialize a {@link FileIO} instance with the most limited permissions necessary.
* by Iceberg's {@link FileIO}.
*/
@ApplicationScoped
@Identifier("default")
public class DefaultFileIOFactory implements FileIOFactory {

private final AccessConfigProvider accessConfigProvider;

@Inject
public DefaultFileIOFactory(AccessConfigProvider accessConfigProvider) {
this.accessConfigProvider = accessConfigProvider;
}
public DefaultFileIOFactory() {}

@Override
public FileIO loadFileIO(
@Nonnull CallContext callContext,
@Nonnull AccessConfig accessConfig,
@Nonnull String ioImplClassName,
@Nonnull Map<String, String> properties,
@Nonnull TableIdentifier identifier,
@Nonnull Set<String> tableLocations,
@Nonnull Set<PolarisStorageActions> storageActions,
@Nonnull PolarisResolvedPathWrapper resolvedEntityPath) {
@Nonnull Map<String, String> properties) {

// Get subcoped creds
properties = new HashMap<>(properties);
AccessConfig accessConfig =
accessConfigProvider.getAccessConfig(
callContext,
identifier,
tableLocations,
storageActions,
Optional.empty(),
resolvedEntityPath);

// Update the FileIO with the subscoped credentials
// Update with properties in case there are table-level overrides the credentials should
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,8 @@
import jakarta.annotation.Nonnull;
import jakarta.enterprise.context.ApplicationScoped;
import java.util.Map;
import java.util.Set;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.io.FileIO;
import org.apache.polaris.core.context.CallContext;
import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
import org.apache.polaris.core.storage.PolarisStorageActions;
import org.apache.polaris.core.storage.AccessConfig;

/**
* Interface for providing a way to construct FileIO objects, such as for reading/writing S3.
Expand All @@ -41,21 +37,13 @@ public interface FileIOFactory {
* <p>This method may obtain subscoped credentials to restrict the FileIO's permissions, ensuring
* secure and limited access to the table's data and locations.
*
* @param callContext the call for which the FileIO is being loaded.
* @param accessConfig the access configuration containing credentials and other properties.
* @param ioImplClassName the class name of the FileIO implementation to load.
* @param properties configuration properties for the FileIO.
* @param identifier the table identifier.
* @param tableLocations locations associated with the table.
* @param storageActions storage actions allowed for the table.
* @param resolvedEntityPath resolved paths for the entities.
* @return a configured FileIO instance.
*/
FileIO loadFileIO(
@Nonnull CallContext callContext,
@Nonnull AccessConfig accessConfig,
@Nonnull String ioImplClassName,
@Nonnull Map<String, String> properties,
@Nonnull TableIdentifier identifier,
@Nonnull Set<String> tableLocations,
@Nonnull Set<PolarisStorageActions> storageActions,
@Nonnull PolarisResolvedPathWrapper resolvedEntityPath);
@Nonnull Map<String, String> properties);
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,8 @@
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import java.util.Map;
import java.util.Set;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.io.FileIO;
import org.apache.polaris.core.context.CallContext;
import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
import org.apache.polaris.core.storage.PolarisStorageActions;
import org.apache.polaris.core.storage.AccessConfig;

/** A {@link FileIOFactory} that translates WASB paths to ABFS ones */
@ApplicationScoped
Expand All @@ -38,27 +34,16 @@ public class WasbTranslatingFileIOFactory implements FileIOFactory {
private final FileIOFactory defaultFileIOFactory;

@Inject
public WasbTranslatingFileIOFactory(AccessConfigProvider accessConfigProvider) {
defaultFileIOFactory = new DefaultFileIOFactory(accessConfigProvider);
public WasbTranslatingFileIOFactory() {
defaultFileIOFactory = new DefaultFileIOFactory();
}

@Override
public FileIO loadFileIO(
@Nonnull CallContext callContext,
@Nonnull AccessConfig accessConfig,
@Nonnull String ioImplClassName,
@Nonnull Map<String, String> properties,
@Nonnull TableIdentifier identifier,
@Nonnull Set<String> tableLocations,
@Nonnull Set<PolarisStorageActions> storageActions,
@Nonnull PolarisResolvedPathWrapper resolvedEntityPath) {
@Nonnull Map<String, String> properties) {
return new WasbTranslatingFileIO(
defaultFileIOFactory.loadFileIO(
callContext,
ioImplClassName,
properties,
identifier,
tableLocations,
storageActions,
resolvedEntityPath));
defaultFileIOFactory.loadFileIO(accessConfig, ioImplClassName, properties));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.polaris.core.persistence.resolver.PolarisResolutionManifest;
import org.apache.polaris.core.persistence.resolver.ResolverFactory;
import org.apache.polaris.service.catalog.iceberg.IcebergCatalog;
import org.apache.polaris.service.catalog.io.AccessConfigProvider;
import org.apache.polaris.service.catalog.io.FileIOFactory;
import org.apache.polaris.service.events.listeners.PolarisEventListener;
import org.apache.polaris.service.task.TaskExecutor;
Expand All @@ -46,6 +47,7 @@ public class PolarisCallContextCatalogFactory implements CallContextCatalogFacto

private final PolarisDiagnostics diagnostics;
private final TaskExecutor taskExecutor;
private final AccessConfigProvider accessConfigProvider;
private final FileIOFactory fileIOFactory;
private final ResolverFactory resolverFactory;
private final MetaStoreManagerFactory metaStoreManagerFactory;
Expand All @@ -57,12 +59,14 @@ public PolarisCallContextCatalogFactory(
ResolverFactory resolverFactory,
MetaStoreManagerFactory metaStoreManagerFactory,
TaskExecutor taskExecutor,
AccessConfigProvider accessConfigProvider,
FileIOFactory fileIOFactory,
PolarisEventListener polarisEventListener) {
this.diagnostics = diagnostics;
this.resolverFactory = resolverFactory;
this.metaStoreManagerFactory = metaStoreManagerFactory;
this.taskExecutor = taskExecutor;
this.accessConfigProvider = accessConfigProvider;
this.fileIOFactory = fileIOFactory;
this.polarisEventListener = polarisEventListener;
}
Expand All @@ -89,6 +93,7 @@ public Catalog createCallContextCatalog(
resolvedManifest,
securityContext,
taskExecutor,
accessConfigProvider,
fileIOFactory,
polarisEventListener);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.catalog.TableIdentifier;
Expand All @@ -32,19 +33,25 @@
import org.apache.polaris.core.entity.TaskEntity;
import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
import org.apache.polaris.core.persistence.ResolvedPolarisEntity;
import org.apache.polaris.core.storage.AccessConfig;
import org.apache.polaris.core.storage.PolarisStorageActions;
import org.apache.polaris.service.catalog.io.AccessConfigProvider;
import org.apache.polaris.service.catalog.io.FileIOFactory;

@ApplicationScoped
public class TaskFileIOSupplier {
private final FileIOFactory fileIOFactory;
private final AccessConfigProvider accessConfigProvider;

@Inject
public TaskFileIOSupplier(FileIOFactory fileIOFactory) {
public TaskFileIOSupplier(
FileIOFactory fileIOFactory, AccessConfigProvider accessConfigProvider) {
this.fileIOFactory = fileIOFactory;
this.accessConfigProvider = accessConfigProvider;
}

public FileIO apply(TaskEntity task, TableIdentifier identifier, CallContext callContext) {

Map<String, String> internalProperties = task.getInternalPropertiesAsMap();
Map<String, String> properties = new HashMap<>(internalProperties);

Expand All @@ -55,11 +62,14 @@ public FileIO apply(TaskEntity task, TableIdentifier identifier, CallContext cal
new ResolvedPolarisEntity(task, List.of(), List.of());
PolarisResolvedPathWrapper resolvedPath =
new PolarisResolvedPathWrapper(List.of(resolvedTaskEntity));
AccessConfig accessConfig =
accessConfigProvider.getAccessConfig(
callContext, identifier, locations, storageActions, Optional.empty(), resolvedPath);

String ioImpl =
properties.getOrDefault(
CatalogProperties.FILE_IO_IMPL, "org.apache.iceberg.io.ResolvingFileIO");

return fileIOFactory.loadFileIO(
callContext, ioImpl, properties, identifier, locations, storageActions, resolvedPath);
return fileIOFactory.loadFileIO(accessConfig, ioImpl, properties);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,7 @@ private void initBaseCatalog() {
passthroughView,
securityContext,
Mockito.mock(),
accessConfigProvider,
fileIOFactory,
polarisEventListener);
this.baseCatalog.initialize(
Expand All @@ -562,17 +563,18 @@ protected TestPolarisCallContextCatalogFactory() {
@Inject
public TestPolarisCallContextCatalogFactory(
PolarisDiagnostics diagnostics,
StorageCredentialCache storageCredentialCache,
ResolverFactory resolverFactory,
MetaStoreManagerFactory metaStoreManagerFactory,
TaskExecutor taskExecutor,
AccessConfigProvider accessConfigProvider,
FileIOFactory fileIOFactory,
PolarisEventListener polarisEventListener) {
super(
diagnostics,
resolverFactory,
metaStoreManagerFactory,
taskExecutor,
accessConfigProvider,
fileIOFactory,
polarisEventListener);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.exceptions.ForbiddenException;
import org.apache.iceberg.inmemory.InMemoryFileIO;
Expand Down Expand Up @@ -72,16 +73,8 @@ private static String makeTableLocation(
}

public PolarisS3InteroperabilityTest() {
TestServices.FileIOFactorySupplier fileIOFactorySupplier =
(accessConfigProvider) ->
(FileIOFactory)
(callContext,
ioImplClassName,
properties,
identifier,
tableLocations,
storageActions,
resolvedEntityPath) -> new InMemoryFileIO();
Supplier<FileIOFactory> fileIOFactorySupplier =
() -> (FileIOFactory) (accessConfig, ioImplClassName, properties) -> new InMemoryFileIO();
services =
TestServices.builder()
.config(SERVER_CONFIG)
Expand Down
Loading