Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,10 @@

import com.lancedb.lance.WriteParams;
import com.lancedb.lance.namespace.LanceNamespace;
import com.lancedb.lance.namespace.LanceNamespaceException;
import com.lancedb.lance.namespace.LanceNamespaces;
import com.lancedb.lance.namespace.ListTablesIterable;
import com.lancedb.lance.namespace.model.CreateEmptyTableRequest;
import com.lancedb.lance.namespace.model.CreateEmptyTableResponse;
import com.lancedb.lance.namespace.model.DescribeTableRequest;
import com.lancedb.lance.namespace.model.DescribeTableResponse;
import com.lancedb.lance.namespace.model.DropNamespaceRequest;
import com.lancedb.lance.namespace.model.DropTableRequest;
import com.lancedb.lance.namespace.model.ListTablesRequest;
Expand Down Expand Up @@ -356,29 +353,19 @@ public Table loadTable(Identifier ident) throws NoSuchTableException {
// Transform identifier for API call
Identifier actualIdent = transformIdentifierForApi(ident);

DescribeTableRequest request = new DescribeTableRequest();
// Build table ID list from namespace and name
List<String> tableId = new ArrayList<>();
for (String part : actualIdent.namespace()) {
request.addIdItem(part);
}
request.addIdItem(actualIdent.name());

DescribeTableResponse response;
try {
response = namespace.describeTable(request);
} catch (LanceNamespaceException e) {
if (e.getCode() == 404) {
throw new NoSuchTableException(ident);
}
throw e;
tableId.add(part);
}
tableId.add(actualIdent.name());

// Pass storage options from the response to LanceConfig, with fallback to empty map
Map<String, String> storageOptions = response.getStorageOptions();
if (storageOptions == null) {
storageOptions = new HashMap<>();
}
// Create LanceConfig with namespace and table ID
// The OpenDatasetBuilder will automatically call describeTable to get location and storage
// options
LanceConfig config =
LanceConfig.from(CaseInsensitiveStringMap.empty(), namespace, tableId, actualIdent.name());

LanceConfig config = LanceConfig.from(storageOptions, response.getLocation());
Optional<StructType> schema = LanceDatasetAdapter.getSchema(config);
if (!schema.isPresent()) {
throw new NoSuchTableException(ident);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,14 @@
*/
package com.lancedb.lance.spark;

import com.lancedb.lance.namespace.LanceNamespace;
import com.lancedb.lance.spark.utils.Optional;

import org.apache.spark.sql.util.CaseInsensitiveStringMap;
import org.jetbrains.annotations.TestOnly;

import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.Objects;

Expand All @@ -34,18 +38,24 @@ public class LanceConfig implements Serializable {
private final String datasetUri;
private final boolean pushDownFilters;
private final Map<String, String> options;
private final transient Optional<LanceNamespace> namespace;
private final Optional<List<String>> tableId;

private LanceConfig(
String dbPath,
String datasetName,
String datasetUri,
boolean pushDownFilters,
CaseInsensitiveStringMap options) {
CaseInsensitiveStringMap options,
Optional<LanceNamespace> namespace,
Optional<List<String>> tableId) {
this.dbPath = dbPath;
this.datasetName = datasetName;
this.datasetUri = datasetUri;
this.pushDownFilters = pushDownFilters;
this.options = options.asCaseSensitiveMap();
this.namespace = namespace;
this.tableId = tableId;
}

public static LanceConfig from(Map<String, String> properties) {
Expand All @@ -71,7 +81,33 @@ public static LanceConfig from(CaseInsensitiveStringMap options, String datasetU
boolean pushDownFilters =
options.getBoolean(CONFIG_PUSH_DOWN_FILTERS, DEFAULT_PUSH_DOWN_FILTERS);
String[] paths = extractDbPathAndDatasetName(datasetUri);
return new LanceConfig(paths[0], paths[1], datasetUri, pushDownFilters, options);
return new LanceConfig(
paths[0],
paths[1],
datasetUri,
pushDownFilters,
options,
Optional.empty(),
Optional.empty());
}

public static LanceConfig from(
CaseInsensitiveStringMap options,
LanceNamespace namespace,
List<String> tableId,
String datasetName) {
boolean pushDownFilters =
options.getBoolean(CONFIG_PUSH_DOWN_FILTERS, DEFAULT_PUSH_DOWN_FILTERS);
// For namespace-based configs, we don't have a traditional URI structure
// Use table name as both dbPath and datasetName for compatibility
return new LanceConfig(
"", // dbPath not applicable for namespace
datasetName,
"", // datasetUri will be fetched from namespace
pushDownFilters,
options,
Optional.of(namespace),
Optional.of(tableId));
}

@TestOnly
Expand Down Expand Up @@ -119,6 +155,14 @@ public Map<String, String> getOptions() {
return options;
}

public Optional<LanceNamespace> getNamespace() {
return namespace;
}

public Optional<List<String>> getTableId() {
return tableId;
}

@Override
public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) return false;
Expand All @@ -127,11 +171,12 @@ public boolean equals(Object o) {
&& Objects.equals(dbPath, config.dbPath)
&& Objects.equals(datasetName, config.datasetName)
&& Objects.equals(datasetUri, config.datasetUri)
&& Objects.equals(options, config.options);
&& Objects.equals(options, config.options)
&& Objects.equals(tableId, config.tableId);
}

@Override
public int hashCode() {
return Objects.hash(dbPath, datasetName, datasetUri, pushDownFilters, options);
return Objects.hash(dbPath, datasetName, datasetUri, pushDownFilters, options, tableId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,11 @@ public static ReadOptions genReadOptionFromConfig(LanceConfig config) {
if (maps.containsKey(metadata_cache_size)) {
builder.setMetadataCacheSize(Integer.parseInt(maps.get(metadata_cache_size)));
}
builder.setStorageOptions(genStorageOptions(config));
// Only set storage options if not using namespace
// When using namespace, storage options are fetched automatically by OpenDatasetBuilder
if (!config.getNamespace().isPresent()) {
builder.setStorageOptions(genStorageOptions(config));
}
return builder.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,8 @@ public class LanceDatasetAdapter {
public static final BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);

public static Optional<StructType> getSchema(LanceConfig config) {
String uri = config.getDatasetUri();
ReadOptions options = SparkOptions.genReadOptionFromConfig(config);
try (Dataset dataset = Dataset.open(allocator, uri, options)) {
try (Dataset dataset = openDataset(config, options)) {
return Optional.of(LanceArrowUtils.fromArrowSchema(dataset.getSchema()));
} catch (IllegalArgumentException e) {
// dataset not found
Expand All @@ -65,9 +64,8 @@ public static Optional<StructType> getSchema(String datasetUri) {
}

public static Optional<Long> getDatasetRowCount(LanceConfig config) {
String uri = config.getDatasetUri();
ReadOptions options = SparkOptions.genReadOptionFromConfig(config);
try (Dataset dataset = Dataset.open(allocator, uri, options)) {
try (Dataset dataset = openDataset(config, options)) {
return Optional.of(dataset.countRows());
} catch (IllegalArgumentException e) {
// dataset not found
Expand All @@ -76,9 +74,8 @@ public static Optional<Long> getDatasetRowCount(LanceConfig config) {
}

public static Optional<Long> getDatasetDataSize(LanceConfig config) {
String uri = config.getDatasetUri();
ReadOptions options = SparkOptions.genReadOptionFromConfig(config);
try (Dataset dataset = Dataset.open(allocator, uri, options)) {
try (Dataset dataset = openDataset(config, options)) {
return Optional.of(dataset.calculateDataSize());
} catch (IllegalArgumentException e) {
// dataset not found
Expand All @@ -87,17 +84,15 @@ public static Optional<Long> getDatasetDataSize(LanceConfig config) {
}

public static List<Integer> getFragmentIds(LanceConfig config) {
String uri = config.getDatasetUri();
ReadOptions options = SparkOptions.genReadOptionFromConfig(config);
try (Dataset dataset = Dataset.open(allocator, uri, options)) {
try (Dataset dataset = openDataset(config, options)) {
return dataset.getFragments().stream().map(Fragment::getId).collect(Collectors.toList());
}
}

public static List<FragmentMetadata> getFragments(LanceConfig config) {
String uri = config.getDatasetUri();
ReadOptions options = SparkOptions.genReadOptionFromConfig(config);
try (Dataset dataset = Dataset.open(allocator, uri, options)) {
try (Dataset dataset = openDataset(config, options)) {
return dataset.getFragments().stream().map(Fragment::metadata).collect(Collectors.toList());
}
}
Expand All @@ -109,9 +104,8 @@ public static LanceFragmentScanner getFragmentScanner(

public static void appendFragments(LanceConfig config, List<FragmentMetadata> fragments) {
FragmentOperation.Append appendOp = new FragmentOperation.Append(fragments);
String uri = config.getDatasetUri();
ReadOptions options = SparkOptions.genReadOptionFromConfig(config);
try (Dataset datasetRead = Dataset.open(allocator, uri, options)) {
try (Dataset datasetRead = openDataset(config, options)) {
Dataset.commit(
allocator,
config.getDatasetUri(),
Expand All @@ -126,9 +120,8 @@ public static void overwriteFragments(
LanceConfig config, List<FragmentMetadata> fragments, StructType sparkSchema) {
Schema schema = LanceArrowUtils.toArrowSchema(sparkSchema, "UTC", false, false);
FragmentOperation.Overwrite overwrite = new FragmentOperation.Overwrite(fragments, schema);
String uri = config.getDatasetUri();
ReadOptions options = SparkOptions.genReadOptionFromConfig(config);
try (Dataset datasetRead = Dataset.open(allocator, uri, options)) {
try (Dataset datasetRead = openDataset(config, options)) {
Dataset.commit(
allocator,
config.getDatasetUri(),
Expand All @@ -145,9 +138,8 @@ public static void updateFragments(
List<FragmentMetadata> updatedFragments,
List<FragmentMetadata> newFragments) {

String uri = config.getDatasetUri();
ReadOptions options = SparkOptions.genReadOptionFromConfig(config);
try (Dataset dataset = Dataset.open(allocator, uri, options)) {
try (Dataset dataset = openDataset(config, options)) {
Update update =
Update.builder()
.removedFragmentIds(removedFragmentIds)
Expand All @@ -161,9 +153,8 @@ public static void updateFragments(

public static void mergeFragments(
LanceConfig config, List<FragmentMetadata> fragments, Schema schema) {
String uri = config.getDatasetUri();
ReadOptions options = SparkOptions.genReadOptionFromConfig(config);
try (Dataset dataset = Dataset.open(allocator, uri, options)) {
try (Dataset dataset = openDataset(config, options)) {
dataset
.newTransactionBuilder()
.operation(Merge.builder().fragments(fragments).schema(schema).build())
Expand All @@ -174,19 +165,17 @@ public static void mergeFragments(

public static FragmentMergeResult mergeFragmentColumn(
LanceConfig config, int fragmentId, ArrowArrayStream stream, String leftOn, String rightOn) {
String uri = config.getDatasetUri();
ReadOptions options = SparkOptions.genReadOptionFromConfig(config);
try (Dataset dataset = Dataset.open(allocator, uri, options)) {
try (Dataset dataset = openDataset(config, options)) {
Fragment fragment = dataset.getFragment(fragmentId);
return fragment.mergeColumns(stream, leftOn, rightOn);
}
}

public static FragmentMetadata deleteRows(
LanceConfig config, int fragmentId, List<Integer> rowIndexes) {
String uri = config.getDatasetUri();
ReadOptions options = SparkOptions.genReadOptionFromConfig(config);
try (Dataset dataset = Dataset.open(allocator, uri, options)) {
try (Dataset dataset = openDataset(config, options)) {
return dataset.getFragment(fragmentId).deleteRows(rowIndexes);
}
}
Expand Down Expand Up @@ -221,4 +210,30 @@ public static void dropDataset(LanceConfig config) {
ReadOptions options = SparkOptions.genReadOptionFromConfig(config);
Dataset.drop(uri, options.getStorageOptions());
}

/**
* Opens a dataset using the appropriate method based on the config.
*
* <p>If namespace and table ID are present in the config, uses the OpenDatasetBuilder with
* namespace support. Otherwise, falls back to the traditional URI-based opening.
*
* @param config Lance configuration
* @param options Read options (without storage options when using namespace)
* @return Opened dataset
*/
private static Dataset openDataset(LanceConfig config, ReadOptions options) {
if (config.getNamespace().isPresent() && config.getTableId().isPresent()) {
// Use OpenDatasetBuilder with namespace
return Dataset.open()
.allocator(allocator)
.namespace(config.getNamespace().get())
.tableId(config.getTableId().get())
.readOptions(options)
.build();
} else {
// Traditional URI-based opening
String uri = config.getDatasetUri();
return Dataset.open(allocator, uri, options);
}
}
}
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@

<properties>
<lance-spark.version>0.0.15</lance-spark.version>
<lance.version>0.38.3</lance.version>
<lance.version>0.39.0</lance.version>
<lance-namespace.version>0.0.19</lance-namespace.version>

<antlr4.version>4.9.3</antlr4.version>
Expand Down
Loading