diff --git a/extension/persistence/eclipselink/src/main/java/org/apache/polaris/extension/persistence/impl/eclipselink/PolarisEclipseLinkMetaStoreSessionImpl.java b/extension/persistence/eclipselink/src/main/java/org/apache/polaris/extension/persistence/impl/eclipselink/PolarisEclipseLinkMetaStoreSessionImpl.java index 151d1a5487..e891ec4905 100644 --- a/extension/persistence/eclipselink/src/main/java/org/apache/polaris/extension/persistence/impl/eclipselink/PolarisEclipseLinkMetaStoreSessionImpl.java +++ b/extension/persistence/eclipselink/src/main/java/org/apache/polaris/extension/persistence/impl/eclipselink/PolarisEclipseLinkMetaStoreSessionImpl.java @@ -53,8 +53,10 @@ import org.apache.polaris.core.persistence.BaseMetaStoreManager; import org.apache.polaris.core.persistence.PrincipalSecretsGenerator; import org.apache.polaris.core.persistence.RetryOnConcurrencyException; +import org.apache.polaris.core.persistence.pagination.EntityIdPageToken; import org.apache.polaris.core.persistence.pagination.HasPageSize; import org.apache.polaris.core.persistence.pagination.Page; +import org.apache.polaris.core.persistence.pagination.PageRequest; import org.apache.polaris.core.persistence.pagination.PageToken; import org.apache.polaris.core.persistence.transactional.AbstractTransactionalPersistence; import org.apache.polaris.core.policy.PolarisPolicyMappingRecord; @@ -428,9 +430,9 @@ public List lookupEntityActiveBatchInCurrentTxn( long catalogId, long parentId, @Nonnull PolarisEntityType entityType, - @Nonnull PageToken pageToken) { + @Nonnull PageRequest pageRequest) { return this.listEntitiesInCurrentTxn( - callCtx, catalogId, parentId, entityType, Predicates.alwaysTrue(), pageToken); + callCtx, catalogId, parentId, entityType, Predicates.alwaysTrue(), pageRequest); } @Override @@ -440,7 +442,7 @@ public List lookupEntityActiveBatchInCurrentTxn( long parentId, @Nonnull PolarisEntityType entityType, @Nonnull Predicate entityFilter, - @Nonnull PageToken pageToken) { + @Nonnull PageRequest pageRequest) { // full range scan under the parent for that type return this.listEntitiesInCurrentTxn( callCtx, @@ -456,7 +458,7 @@ public List lookupEntityActiveBatchInCurrentTxn( entity.getName(), entity.getTypeCode(), entity.getSubTypeCode()), - pageToken); + pageRequest); } @Override @@ -467,7 +469,9 @@ public List lookupEntityActiveBatchInCurrentTxn( @Nonnull PolarisEntityType entityType, @Nonnull Predicate entityFilter, @Nonnull Function transformer, - @Nonnull PageToken pageToken) { + @Nonnull PageRequest pageRequest) { + PageToken pageToken = buildPageToken(pageRequest); + // full range scan under the parent for that type Stream data = this.store @@ -481,7 +485,7 @@ public List lookupEntityActiveBatchInCurrentTxn( data = data.limit(hasPageSize.getPageSize()); } - return Page.fromItems(data.map(transformer).collect(Collectors.toList())); + return pageToken.buildNextPage(data.map(transformer).collect(Collectors.toList())); } /** {@inheritDoc} */ @@ -778,4 +782,8 @@ public void rollback() { session.getTransaction().rollback(); } } + + private PageToken buildPageToken(PageRequest pageRequest) { + return EntityIdPageToken.fromPageRequest(pageRequest); + } } diff --git a/extension/persistence/eclipselink/src/main/java/org/apache/polaris/extension/persistence/impl/eclipselink/PolarisEclipseLinkStore.java b/extension/persistence/eclipselink/src/main/java/org/apache/polaris/extension/persistence/impl/eclipselink/PolarisEclipseLinkStore.java index 16fb32356e..67816c0b63 100644 --- a/extension/persistence/eclipselink/src/main/java/org/apache/polaris/extension/persistence/impl/eclipselink/PolarisEclipseLinkStore.java +++ b/extension/persistence/eclipselink/src/main/java/org/apache/polaris/extension/persistence/impl/eclipselink/PolarisEclipseLinkStore.java @@ -35,6 +35,7 @@ import org.apache.polaris.core.entity.PolarisEntityType; import org.apache.polaris.core.entity.PolarisGrantRecord; import org.apache.polaris.core.entity.PolarisPrincipalSecrets; +import org.apache.polaris.core.persistence.pagination.EntityIdPageToken; import org.apache.polaris.core.persistence.pagination.PageToken; import org.apache.polaris.core.policy.PolarisPolicyMappingRecord; import org.apache.polaris.core.policy.PolicyEntity; @@ -294,14 +295,22 @@ List lookupFullEntitiesActive( // Currently check against ENTITIES not joining with ENTITIES_ACTIVE String hql = - "SELECT m from ModelEntity m where m.catalogId=:catalogId and m.parentId=:parentId and m.typeCode=:typeCode"; + "SELECT m from ModelEntity m where" + + " m.catalogId=:catalogId and m.parentId=:parentId and m.typeCode=:typeCode and m.id > :tokenId"; + + long tokenId = EntityIdPageToken.BASE_ID; + if (pageToken instanceof EntityIdPageToken entityIdPageToken) { + hql += " order by m.id asc"; + tokenId = entityIdPageToken.getId(); + } TypedQuery query = session .createQuery(hql, ModelEntity.class) .setParameter("catalogId", catalogId) .setParameter("parentId", parentId) - .setParameter("typeCode", entityType.getCode()); + .setParameter("typeCode", entityType.getCode()) + .setParameter("tokenId", tokenId); return query.getResultList(); } diff --git a/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/JdbcBasePersistenceImpl.java b/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/JdbcBasePersistenceImpl.java index 5fdb320064..c99b8e41c4 100644 --- a/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/JdbcBasePersistenceImpl.java +++ b/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/JdbcBasePersistenceImpl.java @@ -34,6 +34,7 @@ import java.util.function.Predicate; import java.util.stream.Collectors; import org.apache.polaris.core.PolarisCallContext; +import org.apache.polaris.core.PolarisDiagnostics; import org.apache.polaris.core.entity.EntityNameLookupRecord; import org.apache.polaris.core.entity.PolarisBaseEntity; import org.apache.polaris.core.entity.PolarisChangeTrackingVersions; @@ -49,8 +50,10 @@ import org.apache.polaris.core.persistence.PolicyMappingAlreadyExistsException; import org.apache.polaris.core.persistence.PrincipalSecretsGenerator; import org.apache.polaris.core.persistence.RetryOnConcurrencyException; +import org.apache.polaris.core.persistence.pagination.EntityIdPageToken; import org.apache.polaris.core.persistence.pagination.HasPageSize; import org.apache.polaris.core.persistence.pagination.Page; +import org.apache.polaris.core.persistence.pagination.PageRequest; import org.apache.polaris.core.persistence.pagination.PageToken; import org.apache.polaris.core.policy.PolarisPolicyMappingRecord; import org.apache.polaris.core.policy.PolicyType; @@ -72,16 +75,19 @@ public class JdbcBasePersistenceImpl implements BasePersistence, IntegrationPers private final PrincipalSecretsGenerator secretsGenerator; private final PolarisStorageIntegrationProvider storageIntegrationProvider; private final String realmId; + private final PolarisDiagnostics polarisDiagnostics; public JdbcBasePersistenceImpl( DatasourceOperations databaseOperations, PrincipalSecretsGenerator secretsGenerator, PolarisStorageIntegrationProvider storageIntegrationProvider, - String realmId) { + String realmId, + PolarisDiagnostics polarisDiagnostics) { this.datasourceOperations = databaseOperations; this.secretsGenerator = secretsGenerator; this.storageIntegrationProvider = storageIntegrationProvider; this.realmId = realmId; + this.polarisDiagnostics = polarisDiagnostics; } @Override @@ -359,7 +365,7 @@ public Page listEntities( long catalogId, long parentId, @Nonnull PolarisEntityType entityType, - @Nonnull PageToken pageToken) { + @Nonnull PageRequest pageRequest) { return listEntities( callCtx, catalogId, @@ -367,7 +373,7 @@ public Page listEntities( entityType, entity -> true, EntityNameLookupRecord::new, - pageToken); + pageRequest); } @Nonnull @@ -378,7 +384,7 @@ public Page listEntities( long parentId, @Nonnull PolarisEntityType entityType, @Nonnull Predicate entityFilter, - @Nonnull PageToken pageToken) { + @Nonnull PageRequest pageRequest) { return listEntities( callCtx, catalogId, @@ -386,7 +392,7 @@ public Page listEntities( entityType, entityFilter, EntityNameLookupRecord::new, - pageToken); + pageRequest); } @Nonnull @@ -398,7 +404,7 @@ public Page listEntities( PolarisEntityType entityType, @Nonnull Predicate entityFilter, @Nonnull Function transformer, - @Nonnull PageToken pageToken) { + @Nonnull PageRequest pageRequest) { Map params = Map.of( "catalog_id", @@ -413,6 +419,11 @@ public Page listEntities( // Limit can't be pushed down, due to client side filtering // absence of transaction. String query = QueryGenerator.generateSelectQuery(new ModelEntity(), params); + PageToken pageToken = buildPageToken(pageRequest); + if (pageToken instanceof EntityIdPageToken entityIdPageToken) { + query += String.format(" AND id > %d ORDER BY id ASC", entityIdPageToken.getId()); + } + try { List results = new ArrayList<>(); datasourceOperations.executeSelectOverStream( @@ -425,11 +436,8 @@ public Page listEntities( } data.forEach(results::add); }); - List resultsOrEmpty = - results == null - ? Collections.emptyList() - : results.stream().filter(entityFilter).map(transformer).collect(Collectors.toList()); - return Page.fromItems(resultsOrEmpty); + List resultsOrEmpty = results.stream().map(transformer).collect(Collectors.toList()); + return pageToken.buildNextPage(resultsOrEmpty); } catch (SQLException e) { throw new RuntimeException( String.format("Failed to retrieve polaris entities due to %s", e.getMessage()), e); @@ -915,4 +923,8 @@ PolarisStorageIntegration loadPolarisStorageIntegration( private interface QueryAction { Integer apply(String query) throws SQLException; } + + private PageToken buildPageToken(PageRequest pageRequest) { + return EntityIdPageToken.fromPageRequest(pageRequest); + } } diff --git a/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/JdbcMetaStoreManagerFactory.java b/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/JdbcMetaStoreManagerFactory.java index f3d028eb9d..48403cd62a 100644 --- a/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/JdbcMetaStoreManagerFactory.java +++ b/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/JdbcMetaStoreManagerFactory.java @@ -102,7 +102,8 @@ private void initializeForRealm( databaseOperations, secretsGenerator(realmContext, rootCredentialsSet), storageIntegrationProvider, - realmContext.getRealmIdentifier())); + realmContext.getRealmIdentifier(), + diagServices)); PolarisMetaStoreManager metaStoreManager = createNewMetaStoreManager(); metaStoreManagerMap.put(realmContext.getRealmIdentifier(), metaStoreManager); diff --git a/extension/persistence/relational-jdbc/src/test/java/org/apache/polaris/extension/persistence/relational/jdbc/AtomicMetastoreManagerWithJdbcBasePersistenceImplTest.java b/extension/persistence/relational-jdbc/src/test/java/org/apache/polaris/extension/persistence/relational/jdbc/AtomicMetastoreManagerWithJdbcBasePersistenceImplTest.java index 1012aff02d..fe51d19a1d 100644 --- a/extension/persistence/relational-jdbc/src/test/java/org/apache/polaris/extension/persistence/relational/jdbc/AtomicMetastoreManagerWithJdbcBasePersistenceImplTest.java +++ b/extension/persistence/relational-jdbc/src/test/java/org/apache/polaris/extension/persistence/relational/jdbc/AtomicMetastoreManagerWithJdbcBasePersistenceImplTest.java @@ -57,7 +57,8 @@ protected PolarisTestMetaStoreManager createPolarisTestMetaStoreManager() { } JdbcBasePersistenceImpl basePersistence = - new JdbcBasePersistenceImpl(datasourceOperations, RANDOM_SECRETS, Mockito.mock(), "REALM"); + new JdbcBasePersistenceImpl( + datasourceOperations, RANDOM_SECRETS, Mockito.mock(), "REALM", diagServices); return new PolarisTestMetaStoreManager( new AtomicOperationMetaStoreManager(), new PolarisCallContext( diff --git a/integration-tests/src/main/java/org/apache/polaris/service/it/env/CatalogApi.java b/integration-tests/src/main/java/org/apache/polaris/service/it/env/CatalogApi.java index 7be67f1947..5e10081d8c 100644 --- a/integration-tests/src/main/java/org/apache/polaris/service/it/env/CatalogApi.java +++ b/integration-tests/src/main/java/org/apache/polaris/service/it/env/CatalogApi.java @@ -101,6 +101,24 @@ public List listNamespaces(String catalog, Namespace parent) { } } + public ListNamespacesResponse listNamespaces( + String catalog, Namespace parent, String pageToken, String pageSize) { + Map queryParams = new HashMap<>(); + if (!parent.isEmpty()) { + // TODO change this for Iceberg 1.7.2: + // queryParams.put("parent", RESTUtil.encodeNamespace(parent)); + queryParams.put("parent", Joiner.on('\u001f').join(parent.levels())); + } + queryParams.put("pageToken", pageToken); + queryParams.put("pageSize", pageSize); + try (Response response = + request("v1/{cat}/namespaces", Map.of("cat", catalog), queryParams).get()) { + assertThat(response.getStatus()).isEqualTo(OK.getStatusCode()); + ListNamespacesResponse res = response.readEntity(ListNamespacesResponse.class); + return res; + } + } + public List listAllNamespacesChildFirst(String catalog) { List result = new ArrayList<>(); for (int idx = -1; idx < result.size(); idx++) { @@ -142,6 +160,20 @@ public List listTables(String catalog, Namespace namespace) { } } + public ListTablesResponse listTables( + String catalog, Namespace namespace, String pageToken, String pageSize) { + String ns = RESTUtil.encodeNamespace(namespace); + Map queryParams = new HashMap<>(); + queryParams.put("pageToken", pageToken); + queryParams.put("pageSize", pageSize); + try (Response res = + request("v1/{cat}/namespaces/" + ns + "/tables", Map.of("cat", catalog), queryParams) + .get()) { + assertThat(res.getStatus()).isEqualTo(Response.Status.OK.getStatusCode()); + return res.readEntity(ListTablesResponse.class); + } + } + public void dropTable(String catalog, TableIdentifier id) { String ns = RESTUtil.encodeNamespace(id.namespace()); try (Response res = diff --git a/integration-tests/src/main/java/org/apache/polaris/service/it/test/PolarisRestCatalogIntegrationTest.java b/integration-tests/src/main/java/org/apache/polaris/service/it/test/PolarisRestCatalogIntegrationTest.java index 4c62a87480..4fd0aa3e83 100644 --- a/integration-tests/src/main/java/org/apache/polaris/service/it/test/PolarisRestCatalogIntegrationTest.java +++ b/integration-tests/src/main/java/org/apache/polaris/service/it/test/PolarisRestCatalogIntegrationTest.java @@ -67,6 +67,8 @@ import org.apache.iceberg.rest.RESTUtil; import org.apache.iceberg.rest.requests.CreateTableRequest; import org.apache.iceberg.rest.responses.ErrorResponse; +import org.apache.iceberg.rest.responses.ListNamespacesResponse; +import org.apache.iceberg.rest.responses.ListTablesResponse; import org.apache.iceberg.types.Types; import org.apache.polaris.core.admin.model.AwsStorageConfigInfo; import org.apache.polaris.core.admin.model.Catalog; @@ -164,7 +166,8 @@ public class PolarisRestCatalogIntegrationTest extends CatalogTests private static final String[] DEFAULT_CATALOG_PROPERTIES = { "allow.unstructured.table.location", "true", - "allow.external.table.location", "true" + "allow.external.table.location", "true", + "polaris.config.list-pagination-enabled", "true" }; @Retention(RetentionPolicy.RUNTIME) @@ -1558,4 +1561,72 @@ public void testUpdateTableWithReservedProperty() { .hasMessageContaining("reserved prefix"); genericTableApi.purge(currentCatalogName, namespace); } + + @Test + public void testPaginatedListNamespaces() { + String prefix = "testPaginatedListNamespaces"; + for (int i = 0; i < 20; i++) { + Namespace namespace = Namespace.of(prefix + i); + restCatalog.createNamespace(namespace); + } + + try { + Assertions.assertThat(catalogApi.listNamespaces(currentCatalogName, Namespace.empty())) + .hasSize(20); + for (var pageSize : List.of(1, 2, 3, 9, 10, 11, 19, 20, 21, 2000)) { + int total = 0; + String pageToken = null; + do { + ListNamespacesResponse response = + catalogApi.listNamespaces( + currentCatalogName, Namespace.empty(), pageToken, String.valueOf(pageSize)); + Assertions.assertThat(response.namespaces().size()).isLessThanOrEqualTo(pageSize); + total += response.namespaces().size(); + pageToken = response.nextPageToken(); + } while (pageToken != null); + Assertions.assertThat(total) + .as("Total paginated results for pageSize = " + pageSize) + .isEqualTo(20); + } + } finally { + for (int i = 0; i < 20; i++) { + Namespace namespace = Namespace.of(prefix + i); + restCatalog.dropNamespace(namespace); + } + } + } + + @Test + public void testPaginatedListTables() { + String prefix = "testPaginatedListTables"; + Namespace namespace = Namespace.of(prefix); + restCatalog.createNamespace(namespace); + for (int i = 0; i < 20; i++) { + restCatalog.createTable(TableIdentifier.of(namespace, prefix + i), SCHEMA); + } + + try { + Assertions.assertThat(catalogApi.listTables(currentCatalogName, namespace)).hasSize(20); + for (var pageSize : List.of(1, 2, 3, 9, 10, 11, 19, 20, 21, 2000)) { + int total = 0; + String pageToken = null; + do { + ListTablesResponse response = + catalogApi.listTables( + currentCatalogName, namespace, pageToken, String.valueOf(pageSize)); + Assertions.assertThat(response.identifiers().size()).isLessThanOrEqualTo(pageSize); + total += response.identifiers().size(); + pageToken = response.nextPageToken(); + } while (pageToken != null); + Assertions.assertThat(total) + .as("Total paginated results for pageSize = " + pageSize) + .isEqualTo(20); + } + } finally { + for (int i = 0; i < 20; i++) { + restCatalog.dropTable(TableIdentifier.of(namespace, prefix + i)); + } + restCatalog.dropNamespace(namespace); + } + } } diff --git a/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java b/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java index 7546212282..a5dde46f19 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java @@ -217,6 +217,7 @@ public static void enforceFeatureEnabledOrThrow( public static final PolarisConfiguration LIST_PAGINATION_ENABLED = PolarisConfiguration.builder() .key("LIST_PAGINATION_ENABLED") + .catalogConfig("polaris.config.list-pagination-enabled") .description("If set to true, pagination for APIs like listTables is enabled.") .defaultValue(false) .buildFeatureConfiguration(); diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/AtomicOperationMetaStoreManager.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/AtomicOperationMetaStoreManager.java index 92ccc6eed5..893603e6ee 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/AtomicOperationMetaStoreManager.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/AtomicOperationMetaStoreManager.java @@ -63,7 +63,7 @@ import org.apache.polaris.core.persistence.dao.entity.ResolvedEntityResult; import org.apache.polaris.core.persistence.dao.entity.ScopedCredentialsResult; import org.apache.polaris.core.persistence.pagination.Page; -import org.apache.polaris.core.persistence.pagination.PageToken; +import org.apache.polaris.core.persistence.pagination.PageRequest; import org.apache.polaris.core.policy.PolarisPolicyMappingRecord; import org.apache.polaris.core.policy.PolicyEntity; import org.apache.polaris.core.policy.PolicyMappingUtil; @@ -694,7 +694,7 @@ private void revokeGrantRecord( @Nullable List catalogPath, @Nonnull PolarisEntityType entityType, @Nonnull PolarisEntitySubType entitySubType, - @Nonnull PageToken pageToken) { + @Nonnull PageRequest pageRequest) { // get meta store we should be using BasePersistence ms = callCtx.getMetaStore(); @@ -707,15 +707,15 @@ private void revokeGrantRecord( ? 0l : catalogPath.get(catalogPath.size() - 1).getId(); Page resultPage = - ms.listEntities(callCtx, catalogId, parentId, entityType, pageToken); + ms.listEntities(callCtx, catalogId, parentId, entityType, pageRequest); // prune the returned list with only entities matching the entity subtype if (entitySubType != PolarisEntitySubType.ANY_SUBTYPE) { resultPage = - pageToken.buildNextPage( - resultPage.items.stream() - .filter(rec -> rec.getSubTypeCode() == entitySubType.getCode()) - .collect(Collectors.toList())); + resultPage.filter( + rec -> { + return rec.getSubTypeCode() == entitySubType.getCode(); + }); } // TODO: Use post-validation to enforce consistent view against catalogPath. In the @@ -1190,7 +1190,7 @@ private void revokeGrantRecord( PolarisEntityType.CATALOG_ROLE, entity -> true, Function.identity(), - PageToken.fromLimit(2)) + PageRequest.fromLimit(2)) .items; // if we have 2, we cannot drop the catalog. If only one left, better be the admin role @@ -1500,7 +1500,7 @@ private void revokeGrantRecord( @Override public @Nonnull EntitiesResult loadTasks( - @Nonnull PolarisCallContext callCtx, String executorId, PageToken pageToken) { + @Nonnull PolarisCallContext callCtx, String executorId, PageRequest pageRequest) { BasePersistence ms = callCtx.getMetaStore(); // find all available tasks @@ -1525,7 +1525,7 @@ private void revokeGrantRecord( || callCtx.getClock().millis() - taskState.lastAttemptStartTime > taskAgeTimeout; }, Function.identity(), - pageToken); + pageRequest); List loadedTasks = new ArrayList<>(); final AtomicInteger failedLeaseCount = new AtomicInteger(0); diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/BasePersistence.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/BasePersistence.java index dc85f2183b..6394fe7574 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/BasePersistence.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/BasePersistence.java @@ -32,7 +32,7 @@ import org.apache.polaris.core.entity.PolarisEntityType; import org.apache.polaris.core.entity.PolarisGrantRecord; import org.apache.polaris.core.persistence.pagination.Page; -import org.apache.polaris.core.persistence.pagination.PageToken; +import org.apache.polaris.core.persistence.pagination.PageRequest; import org.apache.polaris.core.policy.PolicyMappingPersistence; /** @@ -272,7 +272,7 @@ List lookupEntityVersions( * @param catalogId catalog id for that entity, NULL_ID if the entity is top-level * @param parentId id of the parent, can be the special 0 value representing the root entity * @param entityType type of entities to list - * @param pageToken the token to start listing after + * @param pageRequest the token to start listing after * @return the list of entities for the specified list operation */ @Nonnull @@ -281,7 +281,7 @@ Page listEntities( long catalogId, long parentId, @Nonnull PolarisEntityType entityType, - @Nonnull PageToken pageToken); + @Nonnull PageRequest pageRequest); /** * List entities where some predicate returns true @@ -292,7 +292,7 @@ Page listEntities( * @param entityType type of entities to list * @param entityFilter the filter to be applied to each entity. Only entities where the predicate * returns true are returned in the list - * @param pageToken the token to start listing after + * @param pageRequest the token to start listing after * @return the list of entities for which the predicate returns true */ @Nonnull @@ -302,7 +302,7 @@ Page listEntities( long parentId, @Nonnull PolarisEntityType entityType, @Nonnull Predicate entityFilter, - @Nonnull PageToken pageToken); + @Nonnull PageRequest pageRequest); /** * List entities where some predicate returns true and transform the entities with a function @@ -315,6 +315,7 @@ Page listEntities( * returns true are returned in the list * @param transformer the transformation function applied to the {@link PolarisBaseEntity} before * returning + * @param pageRequest * @return the list of entities for which the predicate returns true */ @Nonnull @@ -325,7 +326,7 @@ Page listEntities( @Nonnull PolarisEntityType entityType, @Nonnull Predicate entityFilter, @Nonnull Function transformer, - PageToken pageToken); + PageRequest pageRequest); /** * Lookup the current entityGrantRecordsVersion for the specified entity. That version is changed diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/PolarisMetaStoreManager.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/PolarisMetaStoreManager.java index 2a20ad5c1e..0d56d6af42 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/PolarisMetaStoreManager.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/PolarisMetaStoreManager.java @@ -42,7 +42,7 @@ import org.apache.polaris.core.persistence.dao.entity.GenerateEntityIdResult; import org.apache.polaris.core.persistence.dao.entity.ListEntitiesResult; import org.apache.polaris.core.persistence.dao.entity.ResolvedEntityResult; -import org.apache.polaris.core.persistence.pagination.PageToken; +import org.apache.polaris.core.persistence.pagination.PageRequest; import org.apache.polaris.core.policy.PolarisPolicyMappingManager; import org.apache.polaris.core.storage.PolarisCredentialVendor; @@ -114,6 +114,7 @@ EntityResult readEntityByName( * @param entityType entity type * @param entitySubType entity subtype. Can be the special value ANY_SUBTYPE to match any subtype. * Else exact match will be performed. + * @param pageRequest * @return all entities name, ids and subtype under the specified namespace. */ @Nonnull @@ -122,7 +123,7 @@ ListEntitiesResult listEntities( @Nullable List catalogPath, @Nonnull PolarisEntityType entityType, @Nonnull PolarisEntitySubType entitySubType, - @Nonnull PageToken pageToken); + @Nonnull PageRequest pageRequest); /** * Generate a new unique id that can be used by the Polaris client when it needs to create a new @@ -302,12 +303,12 @@ EntityResult loadEntity( * * @param callCtx call context * @param executorId executor id - * @param pageToken page token to start after + * @param pageRequest page token to start after * @return list of tasks to be completed */ @Nonnull EntitiesResult loadTasks( - @Nonnull PolarisCallContext callCtx, String executorId, PageToken pageToken); + @Nonnull PolarisCallContext callCtx, String executorId, PageRequest pageRequest); /** * Load change tracking information for a set of entities in one single shot and return for each diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/TransactionWorkspaceMetaStoreManager.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/TransactionWorkspaceMetaStoreManager.java index b7ba47e83e..0b8ab7daf3 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/TransactionWorkspaceMetaStoreManager.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/TransactionWorkspaceMetaStoreManager.java @@ -50,7 +50,7 @@ import org.apache.polaris.core.persistence.dao.entity.PrivilegeResult; import org.apache.polaris.core.persistence.dao.entity.ResolvedEntityResult; import org.apache.polaris.core.persistence.dao.entity.ScopedCredentialsResult; -import org.apache.polaris.core.persistence.pagination.PageToken; +import org.apache.polaris.core.persistence.pagination.PageRequest; import org.apache.polaris.core.policy.PolicyEntity; import org.apache.polaris.core.policy.PolicyType; @@ -120,7 +120,7 @@ public ListEntitiesResult listEntities( @Nullable List catalogPath, @Nonnull PolarisEntityType entityType, @Nonnull PolarisEntitySubType entitySubType, - @Nonnull PageToken pageToken) { + @Nonnull PageRequest pageRequest) { callCtx.getDiagServices().fail("illegal_method_in_transaction_workspace", "listEntities"); return null; } @@ -322,7 +322,7 @@ public EntityResult loadEntity( @Override public EntitiesResult loadTasks( - @Nonnull PolarisCallContext callCtx, String executorId, PageToken pageToken) { + @Nonnull PolarisCallContext callCtx, String executorId, PageRequest pageRequest) { callCtx.getDiagServices().fail("illegal_method_in_transaction_workspace", "loadTasks"); return null; } diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/EntityIdPageToken.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/EntityIdPageToken.java new file mode 100644 index 0000000000..dfa1df10f9 --- /dev/null +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/EntityIdPageToken.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.core.persistence.pagination; + +import java.util.List; +import java.util.Optional; +import org.apache.polaris.core.entity.EntityNameLookupRecord; +import org.apache.polaris.core.entity.PolarisBaseEntity; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class EntityIdPageToken extends PageToken implements HasPageSize { + private static final Logger LOGGER = LoggerFactory.getLogger(EntityIdPageToken.class); + + public static final String PREFIX = "entity-id"; + + /** The minimum ID that could be attached to an entity */ + private static final long MINIMUM_ID = 0; + + /** The entity ID to use to start with. */ + public static final long BASE_ID = MINIMUM_ID - 1; + + private final long entityId; + private final int pageSize; + + public EntityIdPageToken(int pageSize) { + this.entityId = BASE_ID; + this.pageSize = pageSize; + } + + public EntityIdPageToken(long entityId, int pageSize) { + this.entityId = entityId; + this.pageSize = pageSize; + } + + /** + * Build an {@link EntityIdPageToken} from a {@link PageRequest}, or else a {@link + * ReadEverythingPageToken} if the request doesn't require pagination. + */ + public static PageToken fromPageRequest(PageRequest pageRequest) { + if (pageRequest.getPageTokenString().isEmpty()) { + if (pageRequest.getPageSize().isEmpty()) { + return PageToken.readEverything(); + } else { + return new EntityIdPageToken(pageRequest.getPageSize().get()); + } + } else { + String pageTokenString = pageRequest.getPageTokenString().get(); + Optional pageSize = pageRequest.getPageSize(); + + try { + String[] parts = pageRequest.getPageTokenString().get().split("/"); + if (parts.length < 1) { + throw new IllegalArgumentException("Invalid token format: " + pageTokenString); + } else if (parts[0].equals(EntityIdPageToken.PREFIX)) { + int resolvedPageSize = pageSize.orElse(Integer.parseInt(parts[2])); + return new EntityIdPageToken(Long.parseLong(parts[1]), resolvedPageSize); + } else if (parts[0].equals(LimitPageToken.PREFIX)) { + return new LimitPageToken(pageRequest.getPageSize().get()); + } else { + throw new IllegalArgumentException("Unrecognized page token: " + pageTokenString); + } + } catch (NumberFormatException | IndexOutOfBoundsException e) { + LOGGER.debug(e.getMessage()); + throw new IllegalArgumentException("Invalid token format: " + pageTokenString); + } + } + } + + public long getId() { + return entityId; + } + + @Override + public int getPageSize() { + return this.pageSize; + } + + @Override + public String toTokenString() { + return String.format("%s/%d/%d", PREFIX, entityId, pageSize); + } + + /** + * Builds a new page token to reflect new data that's been read. This implementation assumes that + * the input list is sorted, and checks that it's a list of `PolarisBaseEntity` or + * `EntityNameLookupRecord` + */ + @Override + public PageToken updated(List newData) { + if (newData == null || newData.size() < this.pageSize) { + return new DonePageToken(); + } else { + // Assumed to be sorted with the greatest entity ID last + var tail = newData.get(newData.size() - 1); + if (tail instanceof PolarisBaseEntity) { + return new EntityIdPageToken(((PolarisBaseEntity) tail).getId(), this.pageSize); + } else if (tail instanceof EntityNameLookupRecord) { + return new EntityIdPageToken(((EntityNameLookupRecord) tail).getId(), this.pageSize); + } else { + throw new IllegalStateException( + "Cannot build a page token from: " + tail.getClass().getSimpleName()); + } + } + } +} diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/LimitPageToken.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/LimitPageToken.java index 18586446ca..e3f28b171a 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/LimitPageToken.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/LimitPageToken.java @@ -16,17 +16,12 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.polaris.core.persistence.pagination; import java.util.List; -/** - * A {@link PageToken} implementation that has a page size, but no start offset. This can be used to - * represent a `limit`. When updated, it returns {@link DonePageToken}. As such it should never be - * user-facing and doesn't truly paginate. - */ public class LimitPageToken extends PageToken implements HasPageSize { - public static final String PREFIX = "limit"; private final int pageSize; @@ -42,7 +37,7 @@ public int getPageSize() { @Override public String toTokenString() { - return String.format("%s/%d", PREFIX, pageSize); + return null; } @Override diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/Page.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/Page.java index 18287f85c1..b3714af679 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/Page.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/Page.java @@ -19,6 +19,8 @@ package org.apache.polaris.core.persistence.pagination; import java.util.List; +import java.util.function.Predicate; +import java.util.stream.Collectors; /** * An immutable page of items plus their paging cursor. The {@link PageToken} here can be used to @@ -39,4 +41,9 @@ public Page(PageToken pageToken, List items) { public static Page fromItems(List items) { return new Page<>(new DonePageToken(), items); } + + public Page filter(Predicate predicate) { + return new Page<>( + this.pageToken, this.items.stream().filter(predicate).collect(Collectors.toList())); + } } diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/PageRequest.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/PageRequest.java new file mode 100644 index 0000000000..f13e74a763 --- /dev/null +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/PageRequest.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.core.persistence.pagination; + +import java.util.Optional; + +/** + * A wrapper for pagination information passed in as part of a request. This can potentially be + * translated into a `PageToken` + */ +public class PageRequest { + private final Optional pageTokenStringOpt; + private final Optional pageSizeOpt; + + public PageRequest(String pageTokenString, Integer pageSize) { + this.pageTokenStringOpt = Optional.ofNullable(pageTokenString); + this.pageSizeOpt = Optional.ofNullable(pageSize); + } + + public static PageRequest readEverything() { + return new PageRequest(null, null); + } + + public static PageRequest fromLimit(Integer pageSize) { + return new PageRequest(LimitPageToken.PREFIX, pageSize); + } + + public boolean isPaginationRequested() { + return pageTokenStringOpt.isPresent() || pageSizeOpt.isPresent(); + } + + public Optional getPageTokenString() { + return pageTokenStringOpt; + } + + public Optional getPageSize() { + return pageSizeOpt; + } +} diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/PageToken.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/PageToken.java index 2e335ccd40..c1be8707d2 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/PageToken.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/PageToken.java @@ -20,6 +20,8 @@ import java.util.List; import java.util.Objects; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Represents a page token that can be used by operations like `listTables`. Clients that specify a @@ -31,6 +33,7 @@ * response, that means there is no more data to read. */ public abstract class PageToken { + private static Logger LOGGER = LoggerFactory.getLogger(PageToken.class); /** Build a new PageToken that reads everything */ public static PageToken readEverything() { @@ -42,22 +45,29 @@ public static PageToken fromString(String token) { return build(token, null); } - /** Build a new PageToken from a limit */ - public static PageToken fromLimit(Integer pageSize) { - return build(null, pageSize); - } - /** Build a {@link PageToken} from the input string and page size */ public static PageToken build(String token, Integer pageSize) { if (token == null || token.isEmpty()) { if (pageSize != null) { - return new LimitPageToken(pageSize); + return new EntityIdPageToken(pageSize); } else { return new ReadEverythingPageToken(); } } else { - // TODO implement, split out by the token's prefix - throw new IllegalArgumentException("Unrecognized page token: " + token); + try { + String[] parts = token.split("/"); + if (parts.length < 1) { + throw new IllegalArgumentException("Invalid token format: " + token); + } else if (parts[0].equals(EntityIdPageToken.PREFIX)) { + int resolvedPageSize = pageSize == null ? Integer.parseInt(parts[2]) : pageSize; + return new EntityIdPageToken(Long.parseLong(parts[1]), resolvedPageSize); + } else { + throw new IllegalArgumentException("Unrecognized page token: " + token); + } + } catch (NumberFormatException | IndexOutOfBoundsException e) { + LOGGER.debug(e.getMessage()); + throw new IllegalArgumentException("Invalid token format: " + token); + } } } diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/AbstractTransactionalPersistence.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/AbstractTransactionalPersistence.java index f08b85e122..b4d4406fde 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/AbstractTransactionalPersistence.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/AbstractTransactionalPersistence.java @@ -38,7 +38,7 @@ import org.apache.polaris.core.persistence.PolicyMappingAlreadyExistsException; import org.apache.polaris.core.persistence.RetryOnConcurrencyException; import org.apache.polaris.core.persistence.pagination.Page; -import org.apache.polaris.core.persistence.pagination.PageToken; +import org.apache.polaris.core.persistence.pagination.PageRequest; import org.apache.polaris.core.policy.PolarisPolicyMappingRecord; import org.apache.polaris.core.policy.PolicyType; import org.apache.polaris.core.storage.PolarisStorageConfigurationInfo; @@ -359,10 +359,10 @@ public Page listEntities( long catalogId, long parentId, @Nonnull PolarisEntityType entityType, - @Nonnull PageToken pageToken) { + @Nonnull PageRequest pageRequest) { return runInReadTransaction( callCtx, - () -> this.listEntitiesInCurrentTxn(callCtx, catalogId, parentId, entityType, pageToken)); + () -> this.listEntitiesInCurrentTxn(callCtx, catalogId, parentId, entityType, pageRequest)); } /** {@inheritDoc} */ @@ -374,12 +374,12 @@ public Page listEntities( long parentId, @Nonnull PolarisEntityType entityType, @Nonnull Predicate entityFilter, - @Nonnull PageToken pageToken) { + @Nonnull PageRequest pageRequest) { return runInReadTransaction( callCtx, () -> this.listEntitiesInCurrentTxn( - callCtx, catalogId, parentId, entityType, entityFilter, pageToken)); + callCtx, catalogId, parentId, entityType, entityFilter, pageRequest)); } /** {@inheritDoc} */ @@ -392,12 +392,12 @@ public Page listEntities( @Nonnull PolarisEntityType entityType, @Nonnull Predicate entityFilter, @Nonnull Function transformer, - @Nonnull PageToken pageToken) { + @Nonnull PageRequest pageRequest) { return runInReadTransaction( callCtx, () -> this.listEntitiesInCurrentTxn( - callCtx, catalogId, parentId, entityType, entityFilter, transformer, pageToken)); + callCtx, catalogId, parentId, entityType, entityFilter, transformer, pageRequest)); } /** {@inheritDoc} */ diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalMetaStoreManagerImpl.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalMetaStoreManagerImpl.java index 0224dec493..6900dab105 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalMetaStoreManagerImpl.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalMetaStoreManagerImpl.java @@ -65,6 +65,7 @@ import org.apache.polaris.core.persistence.dao.entity.ResolvedEntityResult; import org.apache.polaris.core.persistence.dao.entity.ScopedCredentialsResult; import org.apache.polaris.core.persistence.pagination.Page; +import org.apache.polaris.core.persistence.pagination.PageRequest; import org.apache.polaris.core.persistence.pagination.PageToken; import org.apache.polaris.core.policy.PolarisPolicyMappingRecord; import org.apache.polaris.core.policy.PolicyEntity; @@ -683,8 +684,8 @@ private void bootstrapPolarisService( } /** - * See {@link #listEntities(PolarisCallContext, List, PolarisEntityType, PolarisEntitySubType, - * PageToken)} + * See {@link PolarisMetaStoreManager#listEntities(PolarisCallContext, List, PolarisEntityType, + * PolarisEntitySubType, PageRequest)} */ private @Nonnull ListEntitiesResult listEntities( @Nonnull PolarisCallContext callCtx, @@ -692,7 +693,7 @@ private void bootstrapPolarisService( @Nullable List catalogPath, @Nonnull PolarisEntityType entityType, @Nonnull PolarisEntitySubType entitySubType, - @Nonnull PageToken pageToken) { + @Nonnull PageRequest pageRequest) { // first resolve again the catalogPath to that entity PolarisEntityResolver resolver = new PolarisEntityResolver(callCtx, ms, catalogPath); @@ -705,15 +706,19 @@ private void bootstrapPolarisService( // return list of active entities Page resultPage = ms.listEntitiesInCurrentTxn( - callCtx, resolver.getCatalogIdOrNull(), resolver.getParentId(), entityType, pageToken); + callCtx, + resolver.getCatalogIdOrNull(), + resolver.getParentId(), + entityType, + pageRequest); // prune the returned list with only entities matching the entity subtype if (entitySubType != PolarisEntitySubType.ANY_SUBTYPE) { resultPage = - pageToken.buildNextPage( - resultPage.items.stream() - .filter(rec -> rec.getSubTypeCode() == entitySubType.getCode()) - .collect(Collectors.toList())); + resultPage.filter( + rec -> { + return rec.getSubTypeCode() == entitySubType.getCode(); + }); } // done @@ -727,14 +732,14 @@ private void bootstrapPolarisService( @Nullable List catalogPath, @Nonnull PolarisEntityType entityType, @Nonnull PolarisEntitySubType entitySubType, - @Nonnull PageToken pageToken) { + @Nonnull PageRequest pageRequest) { // get meta store we should be using TransactionalPersistence ms = ((TransactionalPersistence) callCtx.getMetaStore()); // run operation in a read transaction return ms.runInReadTransaction( callCtx, - () -> listEntities(callCtx, ms, catalogPath, entityType, entitySubType, pageToken)); + () -> listEntities(callCtx, ms, catalogPath, entityType, entitySubType, pageRequest)); } /** {@link #createPrincipal(PolarisCallContext, PolarisBaseEntity)} */ @@ -1377,7 +1382,7 @@ private void bootstrapPolarisService( PolarisEntityType.CATALOG_ROLE, entity -> true, Function.identity(), - PageToken.fromLimit(2)) + PageRequest.fromLimit(2)) .items; // if we have 2, we cannot drop the catalog. If only one left, better be the admin role @@ -1940,7 +1945,7 @@ private PolarisEntityResolver resolveSecurableToRoleGrant( @Nonnull PolarisCallContext callCtx, @Nonnull TransactionalPersistence ms, String executorId, - PageToken pageToken) { + PageRequest pageRequest) { // find all available tasks Page availableTasks = @@ -1964,7 +1969,7 @@ private PolarisEntityResolver resolveSecurableToRoleGrant( || callCtx.getClock().millis() - taskState.lastAttemptStartTime > taskAgeTimeout; }, Function.identity(), - pageToken); + pageRequest); List loadedTasks = new ArrayList<>(); availableTasks.items.forEach( @@ -2003,9 +2008,9 @@ private PolarisEntityResolver resolveSecurableToRoleGrant( @Override public @Nonnull EntitiesResult loadTasks( - @Nonnull PolarisCallContext callCtx, String executorId, PageToken pageToken) { + @Nonnull PolarisCallContext callCtx, String executorId, PageRequest pageRequest) { TransactionalPersistence ms = ((TransactionalPersistence) callCtx.getMetaStore()); - return ms.runInTransaction(callCtx, () -> this.loadTasks(callCtx, ms, executorId, pageToken)); + return ms.runInTransaction(callCtx, () -> this.loadTasks(callCtx, ms, executorId, pageRequest)); } /** {@inheritDoc} */ diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalPersistence.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalPersistence.java index 1c58334d55..6189101416 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalPersistence.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalPersistence.java @@ -37,7 +37,7 @@ import org.apache.polaris.core.persistence.BasePersistence; import org.apache.polaris.core.persistence.IntegrationPersistence; import org.apache.polaris.core.persistence.pagination.Page; -import org.apache.polaris.core.persistence.pagination.PageToken; +import org.apache.polaris.core.persistence.pagination.PageRequest; import org.apache.polaris.core.policy.TransactionalPolicyMappingPersistence; import org.apache.polaris.core.storage.PolarisStorageConfigurationInfo; import org.apache.polaris.core.storage.PolarisStorageIntegration; @@ -208,7 +208,7 @@ Page listEntitiesInCurrentTxn( long catalogId, long parentId, @Nonnull PolarisEntityType entityType, - @Nonnull PageToken pageToken); + @Nonnull PageRequest pageRequest); /** See {@link org.apache.polaris.core.persistence.BasePersistence#listEntities} */ @Nonnull @@ -218,7 +218,7 @@ Page listEntitiesInCurrentTxn( long parentId, @Nonnull PolarisEntityType entityType, @Nonnull Predicate entityFilter, - @Nonnull PageToken pageToken); + @Nonnull PageRequest pageRequest); /** See {@link org.apache.polaris.core.persistence.BasePersistence#listEntities} */ @Nonnull @@ -229,7 +229,7 @@ Page listEntitiesInCurrentTxn( @Nonnull PolarisEntityType entityType, @Nonnull Predicate entityFilter, @Nonnull Function transformer, - @Nonnull PageToken pageToken); + @Nonnull PageRequest pageRequest); /** * See {@link org.apache.polaris.core.persistence.BasePersistence#lookupEntityGrantRecordsVersion} diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TreeMapTransactionalPersistenceImpl.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TreeMapTransactionalPersistenceImpl.java index 44b37c2760..1e9a56448b 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TreeMapTransactionalPersistenceImpl.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TreeMapTransactionalPersistenceImpl.java @@ -21,6 +21,7 @@ import com.google.common.base.Predicates; import jakarta.annotation.Nonnull; import jakarta.annotation.Nullable; +import java.util.Comparator; import java.util.List; import java.util.function.Function; import java.util.function.Predicate; @@ -39,8 +40,10 @@ import org.apache.polaris.core.entity.PolarisPrincipalSecrets; import org.apache.polaris.core.persistence.BaseMetaStoreManager; import org.apache.polaris.core.persistence.PrincipalSecretsGenerator; +import org.apache.polaris.core.persistence.pagination.EntityIdPageToken; import org.apache.polaris.core.persistence.pagination.HasPageSize; import org.apache.polaris.core.persistence.pagination.Page; +import org.apache.polaris.core.persistence.pagination.PageRequest; import org.apache.polaris.core.persistence.pagination.PageToken; import org.apache.polaris.core.policy.PolarisPolicyMappingRecord; import org.apache.polaris.core.policy.PolicyEntity; @@ -311,9 +314,9 @@ public List lookupEntityActiveBatchInCurrentTxn( long catalogId, long parentId, @Nonnull PolarisEntityType entityType, - @Nonnull PageToken pageToken) { + @Nonnull PageRequest pageRequest) { return this.listEntitiesInCurrentTxn( - callCtx, catalogId, parentId, entityType, Predicates.alwaysTrue(), pageToken); + callCtx, catalogId, parentId, entityType, Predicates.alwaysTrue(), pageRequest); } @Override @@ -323,7 +326,7 @@ public List lookupEntityActiveBatchInCurrentTxn( long parentId, @Nonnull PolarisEntityType entityType, @Nonnull Predicate entityFilter, - @Nonnull PageToken pageToken) { + @Nonnull PageRequest pageRequest) { // full range scan under the parent for that type return this.listEntitiesInCurrentTxn( callCtx, @@ -339,7 +342,7 @@ public List lookupEntityActiveBatchInCurrentTxn( entity.getName(), entity.getTypeCode(), entity.getSubTypeCode()), - pageToken); + pageRequest); } @Override @@ -350,7 +353,7 @@ public List lookupEntityActiveBatchInCurrentTxn( @Nonnull PolarisEntityType entityType, @Nonnull Predicate entityFilter, @Nonnull Function transformer, - @Nonnull PageToken pageToken) { + @Nonnull PageRequest pageRequest) { // full range scan under the parent for that type Stream data = this.store @@ -361,13 +364,22 @@ public List lookupEntityActiveBatchInCurrentTxn( .map( nameRecord -> this.lookupEntityInCurrentTxn( - callCtx, catalogId, nameRecord.getId(), entityType.getCode())) - .filter(entityFilter); + callCtx, catalogId, nameRecord.getId(), entityType.getCode())); + + PageToken pageToken = buildPageToken(pageRequest); + if (pageToken instanceof EntityIdPageToken) { + data = + data.sorted(Comparator.comparingLong(PolarisEntityCore::getId)) + .filter(e -> e.getId() > ((EntityIdPageToken) pageToken).getId()); + } + + data = data.filter(entityFilter); + if (pageToken instanceof HasPageSize) { data = data.limit(((HasPageSize) pageToken).getPageSize()); } - return Page.fromItems(data.map(transformer).collect(Collectors.toList())); + return pageToken.buildNextPage(data.map(transformer).collect(Collectors.toList())); } /** {@inheritDoc} */ @@ -663,4 +675,8 @@ record -> this.store.getSlicePolicyMappingRecordsByPolicy().delete(record)); .getSlicePolicyMappingRecordsByPolicy() .readRange(this.store.buildPrefixKeyComposite(policyTypeCode, policyCatalogId, policyId)); } + + private PageToken buildPageToken(PageRequest pageRequest) { + return EntityIdPageToken.fromPageRequest(pageRequest); + } } diff --git a/polaris-core/src/testFixtures/java/org/apache/polaris/core/persistence/BasePolarisMetaStoreManagerTest.java b/polaris-core/src/testFixtures/java/org/apache/polaris/core/persistence/BasePolarisMetaStoreManagerTest.java index 0f834bc760..0dac7518b2 100644 --- a/polaris-core/src/testFixtures/java/org/apache/polaris/core/persistence/BasePolarisMetaStoreManagerTest.java +++ b/polaris-core/src/testFixtures/java/org/apache/polaris/core/persistence/BasePolarisMetaStoreManagerTest.java @@ -43,7 +43,7 @@ import org.apache.polaris.core.entity.PolarisEntitySubType; import org.apache.polaris.core.entity.PolarisEntityType; import org.apache.polaris.core.entity.TaskEntity; -import org.apache.polaris.core.persistence.pagination.PageToken; +import org.apache.polaris.core.persistence.pagination.PageRequest; import org.assertj.core.api.Assertions; import org.assertj.core.api.InstanceOfAssertFactories; import org.junit.jupiter.api.BeforeEach; @@ -130,7 +130,7 @@ protected void testCreateEntities() { null, PolarisEntityType.TASK, PolarisEntitySubType.NULL_SUBTYPE, - PageToken.readEverything()) + PageRequest.readEverything()) .getEntities(); Assertions.assertThat(listedEntities) .isNotNull() @@ -309,7 +309,7 @@ protected void testLoadTasks() { PolarisMetaStoreManager metaStoreManager = polarisTestMetaStoreManager.polarisMetaStoreManager; PolarisCallContext callCtx = polarisTestMetaStoreManager.polarisCallContext; List taskList = - metaStoreManager.loadTasks(callCtx, executorId, PageToken.fromLimit(5)).getEntities(); + metaStoreManager.loadTasks(callCtx, executorId, PageRequest.fromLimit(5)).getEntities(); Assertions.assertThat(taskList) .isNotNull() .isNotEmpty() @@ -329,7 +329,7 @@ protected void testLoadTasks() { // grab a second round of tasks. Assert that none of the original 5 are in the list List newTaskList = - metaStoreManager.loadTasks(callCtx, executorId, PageToken.fromLimit(5)).getEntities(); + metaStoreManager.loadTasks(callCtx, executorId, PageRequest.fromLimit(5)).getEntities(); Assertions.assertThat(newTaskList) .isNotNull() .isNotEmpty() @@ -343,7 +343,7 @@ protected void testLoadTasks() { // only 10 tasks are unassigned. Requesting 20, we should only receive those 10 List lastTen = - metaStoreManager.loadTasks(callCtx, executorId, PageToken.fromLimit(20)).getEntities(); + metaStoreManager.loadTasks(callCtx, executorId, PageRequest.fromLimit(20)).getEntities(); Assertions.assertThat(lastTen) .isNotNull() @@ -357,7 +357,7 @@ protected void testLoadTasks() { .collect(Collectors.toSet()); List emtpyList = - metaStoreManager.loadTasks(callCtx, executorId, PageToken.fromLimit(20)).getEntities(); + metaStoreManager.loadTasks(callCtx, executorId, PageRequest.fromLimit(20)).getEntities(); Assertions.assertThat(emtpyList).isNotNull().isEmpty(); @@ -365,7 +365,7 @@ protected void testLoadTasks() { // all the tasks are unassigned. Fetch them all List allTasks = - metaStoreManager.loadTasks(callCtx, executorId, PageToken.fromLimit(20)).getEntities(); + metaStoreManager.loadTasks(callCtx, executorId, PageRequest.fromLimit(20)).getEntities(); Assertions.assertThat(allTasks) .isNotNull() @@ -380,7 +380,7 @@ protected void testLoadTasks() { timeSource.add(Duration.ofMinutes(10)); List finalList = - metaStoreManager.loadTasks(callCtx, executorId, PageToken.fromLimit(20)).getEntities(); + metaStoreManager.loadTasks(callCtx, executorId, PageRequest.fromLimit(20)).getEntities(); Assertions.assertThat(finalList).isNotNull().isEmpty(); } @@ -410,7 +410,7 @@ protected void testLoadTasksInParallel() throws Exception { try { taskList = metaStoreManager - .loadTasks(callCtx, executorId, PageToken.fromLimit(5)) + .loadTasks(callCtx, executorId, PageRequest.fromLimit(5)) .getEntities(); taskList.stream().map(PolarisBaseEntity::getName).forEach(taskNames::add); } catch (RetryOnConcurrencyException e) { diff --git a/polaris-core/src/testFixtures/java/org/apache/polaris/core/persistence/PolarisTestMetaStoreManager.java b/polaris-core/src/testFixtures/java/org/apache/polaris/core/persistence/PolarisTestMetaStoreManager.java index 7d90ab23da..a3d4bc0fb7 100644 --- a/polaris-core/src/testFixtures/java/org/apache/polaris/core/persistence/PolarisTestMetaStoreManager.java +++ b/polaris-core/src/testFixtures/java/org/apache/polaris/core/persistence/PolarisTestMetaStoreManager.java @@ -51,7 +51,7 @@ import org.apache.polaris.core.persistence.dao.entity.LoadPolicyMappingsResult; import org.apache.polaris.core.persistence.dao.entity.PolicyAttachmentResult; import org.apache.polaris.core.persistence.dao.entity.ResolvedEntityResult; -import org.apache.polaris.core.persistence.pagination.PageToken; +import org.apache.polaris.core.persistence.pagination.PageRequest; import org.apache.polaris.core.policy.PolarisPolicyMappingRecord; import org.apache.polaris.core.policy.PolicyEntity; import org.apache.polaris.core.policy.PolicyType; @@ -768,7 +768,7 @@ void dropEntity(List catalogPath, PolarisBaseEntity entityToD path, PolarisEntityType.NAMESPACE, PolarisEntitySubType.NULL_SUBTYPE, - PageToken.readEverything()) + PageRequest.readEverything()) .getEntities(); Assertions.assertThat(children).isNotNull(); if (children.isEmpty() && entity.getType() == PolarisEntityType.NAMESPACE) { @@ -779,7 +779,7 @@ void dropEntity(List catalogPath, PolarisBaseEntity entityToD path, PolarisEntityType.TABLE_LIKE, PolarisEntitySubType.ANY_SUBTYPE, - PageToken.readEverything()) + PageRequest.readEverything()) .getEntities(); Assertions.assertThat(children).isNotNull(); } else if (children.isEmpty()) { @@ -790,7 +790,7 @@ void dropEntity(List catalogPath, PolarisBaseEntity entityToD path, PolarisEntityType.CATALOG_ROLE, PolarisEntitySubType.ANY_SUBTYPE, - PageToken.readEverything()) + PageRequest.readEverything()) .getEntities(); Assertions.assertThat(children).isNotNull(); // if only one left, it can be dropped. @@ -1564,7 +1564,7 @@ private void validateListReturn( path, entityType, entitySubType, - PageToken.readEverything()) + PageRequest.readEverything()) .getEntities(); Assertions.assertThat(result).isNotNull(); @@ -1882,7 +1882,7 @@ void validateBootstrap() { null, PolarisEntityType.PRINCIPAL, PolarisEntitySubType.NULL_SUBTYPE, - PageToken.readEverything()) + PageRequest.readEverything()) .getEntities(); // ensure not null, one element only @@ -1909,7 +1909,7 @@ void validateBootstrap() { null, PolarisEntityType.PRINCIPAL_ROLE, PolarisEntitySubType.NULL_SUBTYPE, - PageToken.readEverything()) + PageRequest.readEverything()) .getEntities(); // ensure not null, one element only @@ -2648,7 +2648,7 @@ public void testLookup() { null, PolarisEntityType.PRINCIPAL, PolarisEntitySubType.NULL_SUBTYPE, - PageToken.readEverything()) + PageRequest.readEverything()) .getEntities(); // ensure not null, one element only diff --git a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogTest.java b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogTest.java index 4722f92eb6..809621a598 100644 --- a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogTest.java +++ b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogTest.java @@ -98,7 +98,8 @@ import org.apache.polaris.core.persistence.dao.entity.BaseResult; import org.apache.polaris.core.persistence.dao.entity.EntityResult; import org.apache.polaris.core.persistence.dao.entity.PrincipalSecretsResult; -import org.apache.polaris.core.persistence.pagination.PageToken; +import org.apache.polaris.core.persistence.pagination.Page; +import org.apache.polaris.core.persistence.pagination.PageRequest; import org.apache.polaris.core.persistence.transactional.TransactionalPersistence; import org.apache.polaris.core.secrets.UserSecretsManager; import org.apache.polaris.core.secrets.UserSecretsManagerFactory; @@ -180,6 +181,8 @@ public Map getConfigOverrides() { "polaris.event-listener.type", "test", "polaris.readiness.ignore-severe-issues", + "true", + "LIST_PAGINATION_ENABLED", "true"); } } @@ -1534,7 +1537,7 @@ public void testDropTableWithPurge() { .rejects(TABLE); List tasks = metaStoreManager - .loadTasks(polarisContext, "testExecutor", PageToken.fromLimit(1)) + .loadTasks(polarisContext, "testExecutor", PageRequest.fromLimit(1)) .getEntities(); Assertions.assertThat(tasks).hasSize(1); TaskEntity taskEntity = TaskEntity.of(tasks.get(0)); @@ -1745,7 +1748,7 @@ public void testFileIOWrapper() { TaskEntity.of( metaStoreManager .loadTasks( - callContext.getPolarisCallContext(), "testExecutor", PageToken.fromLimit(1)) + callContext.getPolarisCallContext(), "testExecutor", PageRequest.fromLimit(1)) .getEntities() .getFirst()); Map properties = taskEntity.getInternalPropertiesAsMap(); @@ -2009,4 +2012,141 @@ public void testEventsAreEmitted() { Assertions.assertThat(afterTableEvent.base().properties().get(key)).isEqualTo(valOld); Assertions.assertThat(afterTableEvent.metadata().properties().get(key)).isEqualTo(valNew); } + + @Test + public void testPaginatedListTables() { + Assumptions.assumeTrue( + requiresNamespaceCreate(), + "Only applicable if namespaces must be created before adding children"); + + catalog.createNamespace(NS); + + for (int i = 0; i < 5; i++) { + catalog.buildTable(TableIdentifier.of(NS, "pagination_table_" + i), SCHEMA).create(); + } + + try { + // List without pagination + Assertions.assertThat(catalog.listTables(NS)).isNotNull().hasSize(5); + + // List with a limit: + Page firstListResult = catalog.listTables(NS, null, 2); + Assertions.assertThat(firstListResult.items.size()).isEqualTo(2); + Assertions.assertThat(firstListResult.pageToken.toTokenString()).isNotNull().isNotEmpty(); + + // List using the previously obtained token: + Page secondListResult = + catalog.listTables(NS, firstListResult.pageToken.toTokenString(), null); + Assertions.assertThat(secondListResult.items.size()).isEqualTo(2); + Assertions.assertThat(secondListResult.pageToken.toTokenString()).isNotNull().isNotEmpty(); + + // List using the final token: + Page finalListResult = + catalog.listTables(NS, secondListResult.pageToken.toTokenString(), null); + Assertions.assertThat(finalListResult.items.size()).isEqualTo(1); + Assertions.assertThat(finalListResult.pageToken.toTokenString()).isNull(); + } finally { + for (int i = 0; i < 5; i++) { + catalog.dropTable(TableIdentifier.of(NS, "pagination_table_" + i)); + } + } + } + + @Test + public void testPaginatedListViews() { + Assumptions.assumeTrue( + requiresNamespaceCreate(), + "Only applicable if namespaces must be created before adding children"); + + catalog.createNamespace(NS); + + for (int i = 0; i < 5; i++) { + catalog + .buildView(TableIdentifier.of(NS, "pagination_view_" + i)) + .withQuery("a_" + i, "SELECT 1 id") + .withSchema(SCHEMA) + .withDefaultNamespace(NS) + .create(); + } + + try { + // List without pagination + Assertions.assertThat(catalog.listViews(NS)).isNotNull().hasSize(5); + + // List with a limit: + Page firstListResult = catalog.listViews(NS, null, 2); + Assertions.assertThat(firstListResult.items.size()).isEqualTo(2); + Assertions.assertThat(firstListResult.pageToken.toTokenString()).isNotNull().isNotEmpty(); + + // List using the previously obtained token: + Page secondListResult = + catalog.listViews(NS, firstListResult.pageToken.toTokenString(), null); + Assertions.assertThat(secondListResult.items.size()).isEqualTo(2); + Assertions.assertThat(secondListResult.pageToken.toTokenString()).isNotNull().isNotEmpty(); + + // List using the final token: + Page finalListResult = + catalog.listViews(NS, secondListResult.pageToken.toTokenString(), null); + Assertions.assertThat(finalListResult.items.size()).isEqualTo(1); + Assertions.assertThat(finalListResult.pageToken.toTokenString()).isNull(); + } finally { + for (int i = 0; i < 5; i++) { + catalog.dropTable(TableIdentifier.of(NS, "pagination_view_" + i)); + } + } + } + + @Test + public void testPaginatedListNamespaces() { + for (int i = 0; i < 5; i++) { + catalog.createNamespace(Namespace.of("pagination_namespace_" + i)); + } + + try { + // List without pagination + Assertions.assertThat(catalog.listNamespaces()).isNotNull().hasSize(5); + + // List with a limit: + Page firstListResult = catalog.listNamespaces(Namespace.empty(), null, 2); + Assertions.assertThat(firstListResult.items.size()).isEqualTo(2); + Assertions.assertThat(firstListResult.pageToken.toTokenString()).isNotNull().isNotEmpty(); + + // List using the previously obtained token: + Page secondListResult = + catalog.listNamespaces( + Namespace.empty(), firstListResult.pageToken.toTokenString(), null); + Assertions.assertThat(secondListResult.items.size()).isEqualTo(2); + Assertions.assertThat(secondListResult.pageToken.toTokenString()).isNotNull().isNotEmpty(); + + // List using the final token: + Page finalListResult = + catalog.listNamespaces( + Namespace.empty(), secondListResult.pageToken.toTokenString(), null); + Assertions.assertThat(finalListResult.items.size()).isEqualTo(1); + Assertions.assertThat(finalListResult.pageToken.toTokenString()).isNull(); + + // List with page size matching the amount of data + Page firstExactListResult = catalog.listNamespaces(Namespace.empty(), null, 5); + Assertions.assertThat(firstExactListResult.items.size()).isEqualTo(5); + Assertions.assertThat(firstExactListResult.pageToken.toTokenString()) + .isNotNull() + .isNotEmpty(); + + // Again list with matching page size + Page secondExactListResult = + catalog.listNamespaces( + Namespace.empty(), firstExactListResult.pageToken.toTokenString(), null); + Assertions.assertThat(secondExactListResult.items).isEmpty(); + Assertions.assertThat(secondExactListResult.pageToken.toTokenString()).isNull(); + + // List with huge page size: + Page bigListResult = catalog.listNamespaces(Namespace.empty(), null, 9999); + Assertions.assertThat(bigListResult.items.size()).isEqualTo(5); + Assertions.assertThat(bigListResult.pageToken.toTokenString()).isNull(); + } finally { + for (int i = 0; i < 5; i++) { + catalog.dropNamespace(Namespace.of("pagination_namespace_" + i)); + } + } + } } diff --git a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/TableCleanupTaskHandlerTest.java b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/TableCleanupTaskHandlerTest.java index d4147252f3..9e3141a4d2 100644 --- a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/TableCleanupTaskHandlerTest.java +++ b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/TableCleanupTaskHandlerTest.java @@ -56,7 +56,7 @@ import org.apache.polaris.core.persistence.BasePersistence; import org.apache.polaris.core.persistence.MetaStoreManagerFactory; import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper; -import org.apache.polaris.core.persistence.pagination.PageToken; +import org.apache.polaris.core.persistence.pagination.PageRequest; import org.apache.polaris.core.storage.PolarisStorageActions; import org.apache.polaris.service.catalog.io.FileIOFactory; import org.apache.polaris.service.task.BatchFileCleanupTaskHandler; @@ -157,7 +157,7 @@ public void testTableCleanup() throws IOException { assertThat( metaStoreManagerFactory .getOrCreateMetaStoreManager(realmContext) - .loadTasks(callContext.getPolarisCallContext(), "test", PageToken.fromLimit(2)) + .loadTasks(callContext.getPolarisCallContext(), "test", PageRequest.fromLimit(2)) .getEntities()) .hasSize(2) .satisfiesExactlyInAnyOrder( @@ -237,7 +237,7 @@ public void close() { assertThat( metaStoreManagerFactory .getOrCreateMetaStoreManager(realmContext) - .loadTasks(callContext.getPolarisCallContext(), "test", PageToken.fromLimit(5)) + .loadTasks(callContext.getPolarisCallContext(), "test", PageRequest.fromLimit(5)) .getEntities()) .hasSize(2); } @@ -298,7 +298,7 @@ public void close() { assertThat( metaStoreManagerFactory .getOrCreateMetaStoreManager(realmContext) - .loadTasks(callContext.getPolarisCallContext(), "test", PageToken.fromLimit(5)) + .loadTasks(callContext.getPolarisCallContext(), "test", PageRequest.fromLimit(5)) .getEntities()) .hasSize(4) .satisfiesExactlyInAnyOrder( @@ -418,7 +418,7 @@ public void testTableCleanupMultipleSnapshots() throws IOException { List entities = metaStoreManagerFactory .getOrCreateMetaStoreManager(realmContext) - .loadTasks(callContext.getPolarisCallContext(), "test", PageToken.fromLimit(5)) + .loadTasks(callContext.getPolarisCallContext(), "test", PageRequest.fromLimit(5)) .getEntities(); List manifestCleanupTasks = @@ -592,7 +592,7 @@ public void testTableCleanupMultipleMetadata() throws IOException { List entities = metaStoreManagerFactory .getOrCreateMetaStoreManager(callContext.getRealmContext()) - .loadTasks(callContext.getPolarisCallContext(), "test", PageToken.fromLimit(6)) + .loadTasks(callContext.getPolarisCallContext(), "test", PageRequest.fromLimit(6)) .getEntities(); List manifestCleanupTasks = diff --git a/service/common/src/main/java/org/apache/polaris/service/admin/PolarisAdminService.java b/service/common/src/main/java/org/apache/polaris/service/admin/PolarisAdminService.java index cd0f124ce2..a5e42bd8fa 100644 --- a/service/common/src/main/java/org/apache/polaris/service/admin/PolarisAdminService.java +++ b/service/common/src/main/java/org/apache/polaris/service/admin/PolarisAdminService.java @@ -98,7 +98,7 @@ import org.apache.polaris.core.persistence.dao.entity.DropEntityResult; import org.apache.polaris.core.persistence.dao.entity.EntityResult; import org.apache.polaris.core.persistence.dao.entity.LoadGrantsResult; -import org.apache.polaris.core.persistence.pagination.PageToken; +import org.apache.polaris.core.persistence.pagination.PageRequest; import org.apache.polaris.core.persistence.resolver.PolarisResolutionManifest; import org.apache.polaris.core.persistence.resolver.ResolverPath; import org.apache.polaris.core.persistence.resolver.ResolverStatus; @@ -940,7 +940,7 @@ private List listCatalogsUnsafe() { null, PolarisEntityType.CATALOG, PolarisEntitySubType.ANY_SUBTYPE, - PageToken.readEverything()) + PageRequest.readEverything()) .getEntities() .stream() .map( @@ -1122,7 +1122,7 @@ public List listPrincipals() { null, PolarisEntityType.PRINCIPAL, PolarisEntitySubType.NULL_SUBTYPE, - PageToken.readEverything()) + PageRequest.readEverything()) .getEntities() .stream() .map( @@ -1235,7 +1235,7 @@ public List listPrincipalRoles() { null, PolarisEntityType.PRINCIPAL_ROLE, PolarisEntitySubType.NULL_SUBTYPE, - PageToken.readEverything()) + PageRequest.readEverything()) .getEntities() .stream() .map( @@ -1367,7 +1367,7 @@ public List listCatalogRoles(String catalogName) { PolarisEntity.toCoreList(List.of(catalogEntity)), PolarisEntityType.CATALOG_ROLE, PolarisEntitySubType.NULL_SUBTYPE, - PageToken.readEverything()) + PageRequest.readEverything()) .getEntities() .stream() .map( diff --git a/service/common/src/main/java/org/apache/polaris/service/catalog/generic/PolarisGenericTableCatalog.java b/service/common/src/main/java/org/apache/polaris/service/catalog/generic/PolarisGenericTableCatalog.java index 2b884e787f..3a8bc143bb 100644 --- a/service/common/src/main/java/org/apache/polaris/service/catalog/generic/PolarisGenericTableCatalog.java +++ b/service/common/src/main/java/org/apache/polaris/service/catalog/generic/PolarisGenericTableCatalog.java @@ -37,7 +37,7 @@ import org.apache.polaris.core.persistence.dao.entity.BaseResult; import org.apache.polaris.core.persistence.dao.entity.DropEntityResult; import org.apache.polaris.core.persistence.dao.entity.EntityResult; -import org.apache.polaris.core.persistence.pagination.PageToken; +import org.apache.polaris.core.persistence.pagination.PageRequest; import org.apache.polaris.core.persistence.resolver.PolarisResolutionManifestCatalogView; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -191,7 +191,7 @@ public List listGenericTables(Namespace namespace) { PolarisEntity.toCoreList(catalogPath), PolarisEntityType.TABLE_LIKE, PolarisEntitySubType.GENERIC_TABLE, - PageToken.readEverything()) + PageRequest.readEverything()) .getEntities()); return PolarisCatalogHelpers.nameAndIdToTableIdentifiers(catalogPath, entities); } diff --git a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java index c972b2b643..32bbdcb32c 100644 --- a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java +++ b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java @@ -109,6 +109,7 @@ import org.apache.polaris.core.persistence.dao.entity.EntityResult; import org.apache.polaris.core.persistence.dao.entity.ListEntitiesResult; import org.apache.polaris.core.persistence.pagination.Page; +import org.apache.polaris.core.persistence.pagination.PageRequest; import org.apache.polaris.core.persistence.pagination.PageToken; import org.apache.polaris.core.persistence.resolver.PolarisResolutionManifest; import org.apache.polaris.core.persistence.resolver.PolarisResolutionManifestCatalogView; @@ -454,20 +455,20 @@ public boolean dropTable(TableIdentifier tableIdentifier, boolean purge) { @Override public List listTables(Namespace namespace) { - return listTables(namespace, PageToken.readEverything()).items; + return listTables(namespace, PageRequest.readEverything()).items; } public Page listTables(Namespace namespace, String pageToken, Integer pageSize) { - return listTables(namespace, buildPageToken(pageToken, pageSize)); + return listTables(namespace, buildPageRequest(pageToken, pageSize)); } - private Page listTables(Namespace namespace, PageToken pageToken) { + private Page listTables(Namespace namespace, PageRequest pageRequest) { if (!namespaceExists(namespace)) { throw new NoSuchNamespaceException( "Cannot list tables for namespace. Namespace does not exist: '%s'", namespace); } - return listTableLike(PolarisEntitySubType.ICEBERG_TABLE, namespace, pageToken); + return listTableLike(PolarisEntitySubType.ICEBERG_TABLE, namespace, pageRequest); } @Override @@ -777,14 +778,14 @@ public List listNamespaces() { @Override public List listNamespaces(Namespace namespace) throws NoSuchNamespaceException { - return listNamespaces(namespace, PageToken.readEverything()).items; + return listNamespaces(namespace, PageRequest.readEverything()).items; } public Page listNamespaces(Namespace namespace, String pageToken, Integer pageSize) { - return listNamespaces(namespace, buildPageToken(pageToken, pageSize)); + return listNamespaces(namespace, buildPageRequest(pageToken, pageSize)); } - private Page listNamespaces(Namespace namespace, PageToken pageToken) + private Page listNamespaces(Namespace namespace, PageRequest pageRequest) throws NoSuchNamespaceException { PolarisResolvedPathWrapper resolvedEntities = resolvedEntityView.getResolvedPath(namespace); if (resolvedEntities == null) { @@ -799,7 +800,7 @@ private Page listNamespaces(Namespace namespace, PageToken pageToken) PolarisEntity.toCoreList(catalogPath), PolarisEntityType.NAMESPACE, PolarisEntitySubType.NULL_SUBTYPE, - pageToken); + pageRequest); List entities = PolarisEntity.toNameAndIdList(listResult.getEntities()); List namespaces = PolarisCatalogHelpers.nameAndIdToNamespaces(catalogPath, entities); @@ -818,20 +819,20 @@ public void close() throws IOException { @Override public List listViews(Namespace namespace) { - return listViews(namespace, PageToken.readEverything()).items; + return listViews(namespace, PageRequest.readEverything()).items; } public Page listViews(Namespace namespace, String pageToken, Integer pageSize) { - return listViews(namespace, buildPageToken(pageToken, pageSize)); + return listViews(namespace, buildPageRequest(pageToken, pageSize)); } - private Page listViews(Namespace namespace, PageToken pageToken) { + private Page listViews(Namespace namespace, PageRequest pageRequest) { if (!namespaceExists(namespace)) { throw new NoSuchNamespaceException( "Cannot list views for namespace. Namespace does not exist: '%s'", namespace); } - return listTableLike(PolarisEntitySubType.ICEBERG_VIEW, namespace, pageToken); + return listTableLike(PolarisEntitySubType.ICEBERG_VIEW, namespace, pageRequest); } @VisibleForTesting @@ -1060,7 +1061,7 @@ private void validateNoLocationOverlap( parentPath.stream().map(PolarisEntity::toCore).collect(Collectors.toList()), PolarisEntityType.NAMESPACE, PolarisEntitySubType.ANY_SUBTYPE, - PageToken.readEverything()); + PageRequest.readEverything()); if (!siblingNamespacesResult.isSuccess()) { throw new IllegalStateException( "Unable to resolve siblings entities to validate location - could not list namespaces"); @@ -1086,7 +1087,7 @@ private void validateNoLocationOverlap( .collect(Collectors.toList()), PolarisEntityType.TABLE_LIKE, PolarisEntitySubType.ANY_SUBTYPE, - PageToken.readEverything()); + PageRequest.readEverything()); if (!siblingTablesResult.isSuccess()) { throw new IllegalStateException( "Unable to resolve siblings entities to validate location - could not list tables"); @@ -2452,7 +2453,7 @@ private void createNonExistingNamespaces(Namespace namespace) { } private Page listTableLike( - PolarisEntitySubType subType, Namespace namespace, PageToken pageToken) { + PolarisEntitySubType subType, Namespace namespace, PageRequest pageRequest) { PolarisResolvedPathWrapper resolvedEntities = resolvedEntityView.getResolvedPath(namespace); if (resolvedEntities == null) { // Illegal state because the namespace should've already been in the static resolution set. @@ -2468,7 +2469,7 @@ private Page listTableLike( PolarisEntity.toCoreList(catalogPath), PolarisEntityType.TABLE_LIKE, subType, - pageToken); + pageRequest); List entities = PolarisEntity.toNameAndIdList(listResult.getEntities()); List identifiers = @@ -2525,8 +2526,7 @@ private int getMaxMetadataRefreshRetries() { } /** Build a {@link PageToken} from a string and page size. */ - private PageToken buildPageToken(@Nullable String tokenString, @Nullable Integer pageSize) { - + private PageRequest buildPageRequest(@Nullable String tokenString, @Nullable Integer pageSize) { boolean paginationEnabled = callContext .getPolarisCallContext() @@ -2536,9 +2536,9 @@ private PageToken buildPageToken(@Nullable String tokenString, @Nullable Integer catalogEntity, FeatureConfiguration.LIST_PAGINATION_ENABLED); if (!paginationEnabled) { - return PageToken.readEverything(); + return PageRequest.readEverything(); } else { - return PageToken.build(tokenString, pageSize); + return new PageRequest(tokenString, pageSize); } } } diff --git a/service/common/src/main/java/org/apache/polaris/service/catalog/policy/PolicyCatalog.java b/service/common/src/main/java/org/apache/polaris/service/catalog/policy/PolicyCatalog.java index e0edebfc64..3cc0847194 100644 --- a/service/common/src/main/java/org/apache/polaris/service/catalog/policy/PolicyCatalog.java +++ b/service/common/src/main/java/org/apache/polaris/service/catalog/policy/PolicyCatalog.java @@ -50,7 +50,7 @@ import org.apache.polaris.core.persistence.PolicyMappingAlreadyExistsException; import org.apache.polaris.core.persistence.dao.entity.EntityResult; import org.apache.polaris.core.persistence.dao.entity.LoadPolicyMappingsResult; -import org.apache.polaris.core.persistence.pagination.PageToken; +import org.apache.polaris.core.persistence.pagination.PageRequest; import org.apache.polaris.core.persistence.resolver.PolarisResolutionManifestCatalogView; import org.apache.polaris.core.policy.PolicyEntity; import org.apache.polaris.core.policy.PolicyType; @@ -169,7 +169,7 @@ public List listPolicies(Namespace namespace, PolicyType polic PolarisEntity.toCoreList(catalogPath), PolarisEntityType.POLICY, PolarisEntitySubType.NULL_SUBTYPE, - PageToken.readEverything()) + PageRequest.readEverything()) .getEntities() .stream() .map( diff --git a/service/common/src/test/java/org/apache/polaris/service/catalog/io/FileIOFactoryTest.java b/service/common/src/test/java/org/apache/polaris/service/catalog/io/FileIOFactoryTest.java index 64a85df323..c57220ebf9 100644 --- a/service/common/src/test/java/org/apache/polaris/service/catalog/io/FileIOFactoryTest.java +++ b/service/common/src/test/java/org/apache/polaris/service/catalog/io/FileIOFactoryTest.java @@ -46,7 +46,7 @@ import org.apache.polaris.core.context.CallContext; import org.apache.polaris.core.context.RealmContext; import org.apache.polaris.core.entity.*; -import org.apache.polaris.core.persistence.pagination.PageToken; +import org.apache.polaris.core.persistence.pagination.PageRequest; import org.apache.polaris.service.TestServices; import org.apache.polaris.service.catalog.PolarisPassthroughResolutionView; import org.apache.polaris.service.catalog.iceberg.IcebergCatalog; @@ -194,7 +194,8 @@ public void testLoadFileIOForCleanupTask() { testServices .metaStoreManagerFactory() .getOrCreateMetaStoreManager(realmContext) - .loadTasks(callContext.getPolarisCallContext(), "testExecutor", PageToken.fromLimit(1)) + .loadTasks( + callContext.getPolarisCallContext(), "testExecutor", PageRequest.fromLimit(1)) .getEntities(); Assertions.assertThat(tasks).hasSize(1); TaskEntity taskEntity = TaskEntity.of(tasks.get(0)); diff --git a/service/common/src/test/java/org/apache/polaris/service/persistence/pagination/PageTokenTest.java b/service/common/src/test/java/org/apache/polaris/service/persistence/pagination/PageTokenTest.java index 97e52fb842..e863f94cb3 100644 --- a/service/common/src/test/java/org/apache/polaris/service/persistence/pagination/PageTokenTest.java +++ b/service/common/src/test/java/org/apache/polaris/service/persistence/pagination/PageTokenTest.java @@ -18,7 +18,12 @@ */ package org.apache.polaris.service.persistence.pagination; +import java.util.List; +import org.apache.polaris.core.entity.PolarisBaseEntity; +import org.apache.polaris.core.entity.PolarisEntitySubType; +import org.apache.polaris.core.entity.PolarisEntityType; import org.apache.polaris.core.persistence.pagination.DonePageToken; +import org.apache.polaris.core.persistence.pagination.EntityIdPageToken; import org.apache.polaris.core.persistence.pagination.HasPageSize; import org.apache.polaris.core.persistence.pagination.PageToken; import org.assertj.core.api.Assertions; @@ -46,5 +51,54 @@ void testReadEverythingPageToken() { Assertions.assertThat(token).isNotInstanceOf(HasPageSize.class); Assertions.assertThat(PageToken.readEverything()).isEqualTo(PageToken.readEverything()); + + Assertions.assertThat(PageToken.readEverything().buildNextPage(List.of()).pageToken) + .isInstanceOf(DonePageToken.class); + } + + @Test + void testEntityIdPageToken() { + EntityIdPageToken token = new EntityIdPageToken(2); + + Assertions.assertThat(token).isInstanceOf(EntityIdPageToken.class); + Assertions.assertThat(token.getId()).isEqualTo(-1L); + + // EntityIdPageToken can only build a new page from certain types that have an Entity ID + List badData = List.of("some", "data"); + Assertions.assertThatThrownBy(() -> token.buildNextPage(badData)) + .isInstanceOf(IllegalStateException.class); + + List data = + List.of( + new PolarisBaseEntity( + 0, 101, PolarisEntityType.NULL_TYPE, PolarisEntitySubType.ANY_SUBTYPE, 0, "101"), + new PolarisBaseEntity( + 0, 102, PolarisEntityType.NULL_TYPE, PolarisEntitySubType.ANY_SUBTYPE, 0, "102")); + var page = token.buildNextPage(data); + + Assertions.assertThat(page.pageToken).isNotNull(); + Assertions.assertThat(page.pageToken).isInstanceOf(EntityIdPageToken.class); + Assertions.assertThat(((EntityIdPageToken) page.pageToken).getPageSize()).isEqualTo(2); + Assertions.assertThat(((EntityIdPageToken) page.pageToken).getId()).isEqualTo(102); + Assertions.assertThat(page.items).isEqualTo(data); + + Assertions.assertThat(PageToken.fromString(page.pageToken.toTokenString())) + .isEqualTo(page.pageToken); + } + + @Test + void testInvalidPageTokens() { + Assertions.assertThatCode(() -> PageToken.fromString("not-real")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Unrecognized page token"); + + PageToken goodToken = new EntityIdPageToken(100); + Assertions.assertThatCode(() -> PageToken.fromString(goodToken.toTokenString() + "???")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Invalid token format"); + + Assertions.assertThatCode(() -> PageToken.fromString(EntityIdPageToken.PREFIX + "/1")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Invalid token format"); } }