Skip to content

Persistence implementations for list pagination #1555

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 23 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,10 @@
import org.apache.polaris.core.persistence.BaseMetaStoreManager;
import org.apache.polaris.core.persistence.PrincipalSecretsGenerator;
import org.apache.polaris.core.persistence.RetryOnConcurrencyException;
import org.apache.polaris.core.persistence.pagination.EntityIdPageToken;
import org.apache.polaris.core.persistence.pagination.HasPageSize;
import org.apache.polaris.core.persistence.pagination.Page;
import org.apache.polaris.core.persistence.pagination.PageRequest;
import org.apache.polaris.core.persistence.pagination.PageToken;
import org.apache.polaris.core.persistence.transactional.AbstractTransactionalPersistence;
import org.apache.polaris.core.policy.PolarisPolicyMappingRecord;
Expand Down Expand Up @@ -428,9 +430,9 @@ public List<EntityNameLookupRecord> lookupEntityActiveBatchInCurrentTxn(
long catalogId,
long parentId,
@Nonnull PolarisEntityType entityType,
@Nonnull PageToken pageToken) {
@Nonnull PageRequest pageRequest) {
return this.listEntitiesInCurrentTxn(
callCtx, catalogId, parentId, entityType, Predicates.alwaysTrue(), pageToken);
callCtx, catalogId, parentId, entityType, Predicates.alwaysTrue(), pageRequest);
}

@Override
Expand All @@ -440,7 +442,7 @@ public List<EntityNameLookupRecord> lookupEntityActiveBatchInCurrentTxn(
long parentId,
@Nonnull PolarisEntityType entityType,
@Nonnull Predicate<PolarisBaseEntity> entityFilter,
@Nonnull PageToken pageToken) {
@Nonnull PageRequest pageRequest) {
// full range scan under the parent for that type
return this.listEntitiesInCurrentTxn(
callCtx,
Expand All @@ -456,7 +458,7 @@ public List<EntityNameLookupRecord> lookupEntityActiveBatchInCurrentTxn(
entity.getName(),
entity.getTypeCode(),
entity.getSubTypeCode()),
pageToken);
pageRequest);
}

@Override
Expand All @@ -467,7 +469,9 @@ public List<EntityNameLookupRecord> lookupEntityActiveBatchInCurrentTxn(
@Nonnull PolarisEntityType entityType,
@Nonnull Predicate<PolarisBaseEntity> entityFilter,
@Nonnull Function<PolarisBaseEntity, T> transformer,
@Nonnull PageToken pageToken) {
@Nonnull PageRequest pageRequest) {
PageToken pageToken = buildPageToken(pageRequest);

// full range scan under the parent for that type
Stream<PolarisBaseEntity> data =
this.store
Expand All @@ -481,7 +485,7 @@ public List<EntityNameLookupRecord> lookupEntityActiveBatchInCurrentTxn(
data = data.limit(hasPageSize.getPageSize());
}

return Page.fromItems(data.map(transformer).collect(Collectors.toList()));
return pageToken.buildNextPage(data.map(transformer).collect(Collectors.toList()));
}

/** {@inheritDoc} */
Expand Down Expand Up @@ -778,4 +782,8 @@ public void rollback() {
session.getTransaction().rollback();
}
}

private PageToken buildPageToken(PageRequest pageRequest) {
return EntityIdPageToken.fromPageRequest(pageRequest);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.polaris.core.entity.PolarisEntityType;
import org.apache.polaris.core.entity.PolarisGrantRecord;
import org.apache.polaris.core.entity.PolarisPrincipalSecrets;
import org.apache.polaris.core.persistence.pagination.EntityIdPageToken;
import org.apache.polaris.core.persistence.pagination.PageToken;
import org.apache.polaris.core.policy.PolarisPolicyMappingRecord;
import org.apache.polaris.core.policy.PolicyEntity;
Expand Down Expand Up @@ -294,14 +295,22 @@ List<ModelEntity> lookupFullEntitiesActive(

// Currently check against ENTITIES not joining with ENTITIES_ACTIVE
String hql =
"SELECT m from ModelEntity m where m.catalogId=:catalogId and m.parentId=:parentId and m.typeCode=:typeCode";
"SELECT m from ModelEntity m where"
+ " m.catalogId=:catalogId and m.parentId=:parentId and m.typeCode=:typeCode and m.id > :tokenId";

long tokenId = EntityIdPageToken.BASE_ID;
if (pageToken instanceof EntityIdPageToken entityIdPageToken) {
hql += " order by m.id asc";
tokenId = entityIdPageToken.getId();
}

TypedQuery<ModelEntity> query =
session
.createQuery(hql, ModelEntity.class)
.setParameter("catalogId", catalogId)
.setParameter("parentId", parentId)
.setParameter("typeCode", entityType.getCode());
.setParameter("typeCode", entityType.getCode())
.setParameter("tokenId", tokenId);

return query.getResultList();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.polaris.core.PolarisCallContext;
import org.apache.polaris.core.PolarisDiagnostics;
import org.apache.polaris.core.entity.EntityNameLookupRecord;
import org.apache.polaris.core.entity.PolarisBaseEntity;
import org.apache.polaris.core.entity.PolarisChangeTrackingVersions;
Expand All @@ -49,8 +50,10 @@
import org.apache.polaris.core.persistence.PolicyMappingAlreadyExistsException;
import org.apache.polaris.core.persistence.PrincipalSecretsGenerator;
import org.apache.polaris.core.persistence.RetryOnConcurrencyException;
import org.apache.polaris.core.persistence.pagination.EntityIdPageToken;
import org.apache.polaris.core.persistence.pagination.HasPageSize;
import org.apache.polaris.core.persistence.pagination.Page;
import org.apache.polaris.core.persistence.pagination.PageRequest;
import org.apache.polaris.core.persistence.pagination.PageToken;
import org.apache.polaris.core.policy.PolarisPolicyMappingRecord;
import org.apache.polaris.core.policy.PolicyType;
Expand All @@ -72,16 +75,19 @@ public class JdbcBasePersistenceImpl implements BasePersistence, IntegrationPers
private final PrincipalSecretsGenerator secretsGenerator;
private final PolarisStorageIntegrationProvider storageIntegrationProvider;
private final String realmId;
private final PolarisDiagnostics polarisDiagnostics;

public JdbcBasePersistenceImpl(
DatasourceOperations databaseOperations,
PrincipalSecretsGenerator secretsGenerator,
PolarisStorageIntegrationProvider storageIntegrationProvider,
String realmId) {
String realmId,
PolarisDiagnostics polarisDiagnostics) {
this.datasourceOperations = databaseOperations;
this.secretsGenerator = secretsGenerator;
this.storageIntegrationProvider = storageIntegrationProvider;
this.realmId = realmId;
this.polarisDiagnostics = polarisDiagnostics;
}

@Override
Expand Down Expand Up @@ -359,15 +365,15 @@ public Page<EntityNameLookupRecord> listEntities(
long catalogId,
long parentId,
@Nonnull PolarisEntityType entityType,
@Nonnull PageToken pageToken) {
@Nonnull PageRequest pageRequest) {
return listEntities(
callCtx,
catalogId,
parentId,
entityType,
entity -> true,
EntityNameLookupRecord::new,
pageToken);
pageRequest);
}

@Nonnull
Expand All @@ -378,15 +384,15 @@ public Page<EntityNameLookupRecord> listEntities(
long parentId,
@Nonnull PolarisEntityType entityType,
@Nonnull Predicate<PolarisBaseEntity> entityFilter,
@Nonnull PageToken pageToken) {
@Nonnull PageRequest pageRequest) {
return listEntities(
callCtx,
catalogId,
parentId,
entityType,
entityFilter,
EntityNameLookupRecord::new,
pageToken);
pageRequest);
}

@Nonnull
Expand All @@ -398,7 +404,7 @@ public <T> Page<T> listEntities(
PolarisEntityType entityType,
@Nonnull Predicate<PolarisBaseEntity> entityFilter,
@Nonnull Function<PolarisBaseEntity, T> transformer,
@Nonnull PageToken pageToken) {
@Nonnull PageRequest pageRequest) {
Map<String, Object> params =
Map.of(
"catalog_id",
Expand All @@ -413,6 +419,11 @@ public <T> Page<T> listEntities(
// Limit can't be pushed down, due to client side filtering
// absence of transaction.
String query = QueryGenerator.generateSelectQuery(new ModelEntity(), params);
PageToken pageToken = buildPageToken(pageRequest);
if (pageToken instanceof EntityIdPageToken entityIdPageToken) {
query += String.format(" AND id > %d ORDER BY id ASC", entityIdPageToken.getId());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How would this work with org.apache.polaris.extension.persistence.relational.jdbc.IdGenerator?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you're right that it won't; this logic is copied from EclipseLink where IDs are always increasing but does not work with the current way that the JDBC metastore creates IDs.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd propose to change the Page/PageToken contract in a way to push the parameter "as is" down to the persistence layer and let the persistence implementation deal with it.

Copy link
Contributor Author

@eric-maynard eric-maynard May 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I spoke with @singhpk234 who noted this is probably the same discussion as here on the old PR. With that context, I think we might be OK here.

IMO it's alright that the list ordering you'd get across metastores won't be the same. Other than that difference, seems like everything should work with JDBC's IdGenerator. Although the IDs aren't generated sequentially, pagination only uses the entity ID as an essentially arbitrary consistent ordering.

The key implication here is that if an entity gets created in the middle of a listing operation (e.g. between list calls 2 and 3) it may or may not show up in the next page. An alternative would be to try to filter it out so that the behavior is more obvious & consistent, but I think the simple approach that ultimately gives the user a chance to see these new entities is good.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Losing new entities that are stored after pagination start is fine from my POV. The JDBC persistence does not implement catalog-level versioning, so this is unavoidable, I guess.

Copy link
Contributor Author

@eric-maynard eric-maynard May 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed that we will naturally lose some entities, the question is whether we are OK with entities stored after pagination start being lost nondeterministically rather than always. Right now, whether the new entity is lost or not depends on what entity ID it gets. If it gets a high entity ID you might see it in a later page and if it gets a low ID you might not.

My thought on this question is "yes", because it's better to show the entity if we can and it simplifies the code.

But if we feel like this is too unintuitive, we can add a secondary filter on the entity's creation time to try and get rid of these entities (on a best-effort basis, since clocks are not perfect).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think current pagination behaviour wrt concurrent changes is fine.

Making it deterministic would be a great addition to Polaris, but that, I think, has a much broader scope. For example, if an entry is deleted after pagination starts, but a client re-submits a page request using an old token, the new response would still be inconsistent with the old response.

From my POV a complete and deterministic pagination solution implies catalog-level versioning.

}

try {
List<PolarisBaseEntity> results = new ArrayList<>();
datasourceOperations.executeSelectOverStream(
Expand All @@ -425,11 +436,8 @@ public <T> Page<T> listEntities(
}
data.forEach(results::add);
});
List<T> resultsOrEmpty =
results == null
? Collections.emptyList()
: results.stream().filter(entityFilter).map(transformer).collect(Collectors.toList());
return Page.fromItems(resultsOrEmpty);
List<T> resultsOrEmpty = results.stream().map(transformer).collect(Collectors.toList());
return pageToken.buildNextPage(resultsOrEmpty);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like buildNextPage does not have to be a function of the previous page token. It is a function of page request + data + persistence impl. WDYT about: PageRequest.buildPage(List<T> data, Function<T, PageToken> nextToken)?

Here, the call would look like: return pageRequest.buildPage(resultsOrEmpty, EntityIdPageToken::fromLastItem)

Note: the nextToken function will be invoked only when the next page is expected (i.e. not "done" and not "everything").

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Side note: if we want to avoid empty last pages (when the list ends exactly on the last element from the query) we may need to request one more entry from the database, but not return it to the caller.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like buildNextPage does not have to be a function of the previous page token.

In this particular implementation, no. In other implementations, such as the previous OffsetPageToken, it is a function of the previous token. Beyond that, this seems intuitive to me because you're essentially adding data to one page to get a new page.

Empty last pages are fine.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OffsetPageToken does not exist (anymore).

That aside, what information would need to flow from the previous page token to the next directly? I suppose all of that is inside PageRequest now 🤔

Copy link
Contributor Author

@eric-maynard eric-maynard May 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That aside, what information would need to flow from the previous page token to the next directly?

In the case of OffsetPageToken, the new token's offset was data.size + currentToken.offset. You need the current token, not just the data.

In the case of EntityIdPageToken, you can use the current token's entity ID to validate the new data comes after the current token.

I suppose all of that is inside PageRequest now

In the sense that PageRequest is entirely duplicative of PageToken, yes.

} catch (SQLException e) {
throw new RuntimeException(
String.format("Failed to retrieve polaris entities due to %s", e.getMessage()), e);
Expand Down Expand Up @@ -915,4 +923,8 @@ PolarisStorageIntegration<T> loadPolarisStorageIntegration(
private interface QueryAction {
Integer apply(String query) throws SQLException;
}

private PageToken buildPageToken(PageRequest pageRequest) {
return EntityIdPageToken.fromPageRequest(pageRequest);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ private void initializeForRealm(
databaseOperations,
secretsGenerator(realmContext, rootCredentialsSet),
storageIntegrationProvider,
realmContext.getRealmIdentifier()));
realmContext.getRealmIdentifier(),
diagServices));

PolarisMetaStoreManager metaStoreManager = createNewMetaStoreManager();
metaStoreManagerMap.put(realmContext.getRealmIdentifier(), metaStoreManager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ protected PolarisTestMetaStoreManager createPolarisTestMetaStoreManager() {
}

JdbcBasePersistenceImpl basePersistence =
new JdbcBasePersistenceImpl(datasourceOperations, RANDOM_SECRETS, Mockito.mock(), "REALM");
new JdbcBasePersistenceImpl(
datasourceOperations, RANDOM_SECRETS, Mockito.mock(), "REALM", diagServices);
return new PolarisTestMetaStoreManager(
new AtomicOperationMetaStoreManager(),
new PolarisCallContext(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,24 @@ public List<Namespace> listNamespaces(String catalog, Namespace parent) {
}
}

public ListNamespacesResponse listNamespaces(
String catalog, Namespace parent, String pageToken, String pageSize) {
Map<String, String> queryParams = new HashMap<>();
if (!parent.isEmpty()) {
// TODO change this for Iceberg 1.7.2:
// queryParams.put("parent", RESTUtil.encodeNamespace(parent));
queryParams.put("parent", Joiner.on('\u001f').join(parent.levels()));
}
queryParams.put("pageToken", pageToken);
queryParams.put("pageSize", pageSize);
try (Response response =
request("v1/{cat}/namespaces", Map.of("cat", catalog), queryParams).get()) {
assertThat(response.getStatus()).isEqualTo(OK.getStatusCode());
ListNamespacesResponse res = response.readEntity(ListNamespacesResponse.class);
return res;
}
}

public List<Namespace> listAllNamespacesChildFirst(String catalog) {
List<Namespace> result = new ArrayList<>();
for (int idx = -1; idx < result.size(); idx++) {
Expand Down Expand Up @@ -142,6 +160,20 @@ public List<TableIdentifier> listTables(String catalog, Namespace namespace) {
}
}

public ListTablesResponse listTables(
String catalog, Namespace namespace, String pageToken, String pageSize) {
String ns = RESTUtil.encodeNamespace(namespace);
Map<String, String> queryParams = new HashMap<>();
queryParams.put("pageToken", pageToken);
queryParams.put("pageSize", pageSize);
try (Response res =
request("v1/{cat}/namespaces/" + ns + "/tables", Map.of("cat", catalog), queryParams)
.get()) {
assertThat(res.getStatus()).isEqualTo(Response.Status.OK.getStatusCode());
return res.readEntity(ListTablesResponse.class);
}
}

public void dropTable(String catalog, TableIdentifier id) {
String ns = RESTUtil.encodeNamespace(id.namespace());
try (Response res =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@
import org.apache.iceberg.rest.RESTUtil;
import org.apache.iceberg.rest.requests.CreateTableRequest;
import org.apache.iceberg.rest.responses.ErrorResponse;
import org.apache.iceberg.rest.responses.ListNamespacesResponse;
import org.apache.iceberg.rest.responses.ListTablesResponse;
import org.apache.iceberg.types.Types;
import org.apache.polaris.core.admin.model.AwsStorageConfigInfo;
import org.apache.polaris.core.admin.model.Catalog;
Expand Down Expand Up @@ -164,7 +166,8 @@ public class PolarisRestCatalogIntegrationTest extends CatalogTests<RESTCatalog>

private static final String[] DEFAULT_CATALOG_PROPERTIES = {
"allow.unstructured.table.location", "true",
"allow.external.table.location", "true"
"allow.external.table.location", "true",
"polaris.config.list-pagination-enabled", "true"
};

@Retention(RetentionPolicy.RUNTIME)
Expand Down Expand Up @@ -1558,4 +1561,72 @@ public void testUpdateTableWithReservedProperty() {
.hasMessageContaining("reserved prefix");
genericTableApi.purge(currentCatalogName, namespace);
}

@Test
public void testPaginatedListNamespaces() {
String prefix = "testPaginatedListNamespaces";
for (int i = 0; i < 20; i++) {
Namespace namespace = Namespace.of(prefix + i);
restCatalog.createNamespace(namespace);
}

try {
Assertions.assertThat(catalogApi.listNamespaces(currentCatalogName, Namespace.empty()))
.hasSize(20);
for (var pageSize : List.of(1, 2, 3, 9, 10, 11, 19, 20, 21, 2000)) {
int total = 0;
String pageToken = null;
do {
ListNamespacesResponse response =
catalogApi.listNamespaces(
currentCatalogName, Namespace.empty(), pageToken, String.valueOf(pageSize));
Assertions.assertThat(response.namespaces().size()).isLessThanOrEqualTo(pageSize);
total += response.namespaces().size();
pageToken = response.nextPageToken();
} while (pageToken != null);
Assertions.assertThat(total)
.as("Total paginated results for pageSize = " + pageSize)
.isEqualTo(20);
}
} finally {
for (int i = 0; i < 20; i++) {
Namespace namespace = Namespace.of(prefix + i);
restCatalog.dropNamespace(namespace);
}
}
}

@Test
public void testPaginatedListTables() {
String prefix = "testPaginatedListTables";
Namespace namespace = Namespace.of(prefix);
restCatalog.createNamespace(namespace);
for (int i = 0; i < 20; i++) {
restCatalog.createTable(TableIdentifier.of(namespace, prefix + i), SCHEMA);
}

try {
Assertions.assertThat(catalogApi.listTables(currentCatalogName, namespace)).hasSize(20);
for (var pageSize : List.of(1, 2, 3, 9, 10, 11, 19, 20, 21, 2000)) {
int total = 0;
String pageToken = null;
do {
ListTablesResponse response =
catalogApi.listTables(
currentCatalogName, namespace, pageToken, String.valueOf(pageSize));
Assertions.assertThat(response.identifiers().size()).isLessThanOrEqualTo(pageSize);
total += response.identifiers().size();
pageToken = response.nextPageToken();
} while (pageToken != null);
Assertions.assertThat(total)
.as("Total paginated results for pageSize = " + pageSize)
.isEqualTo(20);
}
} finally {
for (int i = 0; i < 20; i++) {
restCatalog.dropTable(TableIdentifier.of(namespace, prefix + i));
}
restCatalog.dropNamespace(namespace);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ public static void enforceFeatureEnabledOrThrow(
public static final PolarisConfiguration<Boolean> LIST_PAGINATION_ENABLED =
PolarisConfiguration.<Boolean>builder()
.key("LIST_PAGINATION_ENABLED")
.catalogConfig("polaris.config.list-pagination-enabled")
.description("If set to true, pagination for APIs like listTables is enabled.")
.defaultValue(false)
.buildFeatureConfiguration();
Expand Down
Loading
Loading