Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.lance.spark.utils.QueryUtils;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;

import java.io.IOException;
import java.io.ObjectInputStream;
Expand All @@ -28,6 +29,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

/**
* Read-specific options for Lance Spark connector.
Expand Down Expand Up @@ -96,6 +98,27 @@ public class LanceSparkReadOptions implements Serializable {

public static final String LANCE_FILE_SUFFIX = ".lance";

/**
* Option keys that {@link Builder#parseTypedFlags(Map)} promotes into dedicated typed fields on
* the builder. After promotion these keys are removed from {@link #getStorageOptions()} so that
* connector-level knobs do not leak into the Rust-side {@code storage_options} map (which is
* reserved for object-store credentials and endpoint config — {@code aws_*}, {@code gcs_*},
* {@code allow_http}, etc.). Rust silently drops unknown keys, so this stripping is a debug-
* hygiene fix, not a correctness fix.
*/
static final Set<String> RECOGNIZED_TYPED_KEYS =
ImmutableSet.of(
CONFIG_DATASET_URI,
CONFIG_PUSH_DOWN_FILTERS,
CONFIG_BLOCK_SIZE,
CONFIG_VERSION,
CONFIG_INDEX_CACHE_SIZE,
CONFIG_METADATA_CACHE_SIZE,
CONFIG_BATCH_SIZE,
CONFIG_TOP_N_PUSH_DOWN,
CONFIG_NEAREST,
CONFIG_EXECUTOR_CREDENTIAL_REFRESH);

private static final boolean DEFAULT_PUSH_DOWN_FILTERS = true;
// Changed from 512 to 8192 for better OLAP scan performance (33x improvement)
private static final int DEFAULT_BATCH_SIZE = 8192;
Expand Down Expand Up @@ -575,6 +598,15 @@ public LanceSparkReadOptions build() {
if (datasetUri == null) {
throw new IllegalArgumentException("datasetUri is required");
}
// Strip recognized typed flags from storageOptions at build time, not inside
// parseTypedFlags. parseTypedFlags may be called more than once on the same builder
// (e.g. fromOptions followed by withCatalogDefaults), and later calls re-read from
// this.storageOptions; removing too early would cause per-read overrides to disappear
// when catalog defaults are merged on top. Stripping here keeps storageOptions clean
// for the final consumer (Rust storage_options map) without breaking merge semantics.
for (String key : RECOGNIZED_TYPED_KEYS) {
this.storageOptions.remove(key);
}
return new LanceSparkReadOptions(this);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
import org.lance.namespace.LanceNamespace;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;

import java.io.Serializable;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

/**
* Write-specific options for Lance Spark connector.
Expand Down Expand Up @@ -59,6 +61,31 @@ public class LanceSparkWriteOptions implements Serializable {
public static final String CONFIG_MAX_BATCH_BYTES = "max_batch_bytes";
public static final String CONFIG_BLOB_PACK_FILE_SIZE_THRESHOLD = "blob_pack_file_size_threshold";

/**
* Option keys that {@link Builder#fromOptions(Map)} promotes into dedicated typed fields on the
* builder. After promotion these keys are removed from {@link #getStorageOptions()} so that
* connector-level knobs do not leak into the Rust-side {@code storage_options} map (which is
* reserved for object-store credentials and endpoint config — {@code aws_*}, {@code gcs_*},
* {@code allow_http}, etc.). Rust silently drops unknown keys, so this stripping is a debug-
* hygiene fix, not a correctness fix. See {@link LanceSparkReadOptions#RECOGNIZED_TYPED_KEYS} for
* the read-side counterpart.
*/
static final Set<String> RECOGNIZED_TYPED_KEYS =
ImmutableSet.of(
CONFIG_DATASET_URI,
CONFIG_WRITE_MODE,
CONFIG_MAX_ROWS_PER_FILE,
CONFIG_MAX_ROWS_PER_GROUP,
CONFIG_MAX_BYTES_PER_FILE,
CONFIG_FILE_FORMAT_VERSION,
CONFIG_USE_QUEUED_WRITE_BUFFER,
CONFIG_QUEUE_DEPTH,
CONFIG_BATCH_SIZE,
CONFIG_ENABLE_STABLE_ROW_IDS,
CONFIG_USE_LARGE_VAR_TYPES,
CONFIG_MAX_BATCH_BYTES,
CONFIG_BLOB_PACK_FILE_SIZE_THRESHOLD);

private static final WriteMode DEFAULT_WRITE_MODE = WriteMode.APPEND;
private static final boolean DEFAULT_USE_QUEUED_WRITE_BUFFER = false;
private static final int DEFAULT_QUEUE_DEPTH = 8;
Expand Down Expand Up @@ -533,6 +560,11 @@ public LanceSparkWriteOptions build() {
if (datasetUri == null) {
throw new IllegalArgumentException("datasetUri is required");
}
// Strip recognized typed flags from storageOptions at build time. Mirror of the read-side
// fix in LanceSparkReadOptions.Builder.build(); see RECOGNIZED_TYPED_KEYS for rationale.
for (String key : RECOGNIZED_TYPED_KEYS) {
this.storageOptions.remove(key);
}
return new LanceSparkWriteOptions(this);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.lance.spark;

import org.junit.jupiter.api.Test;

import java.util.HashMap;
import java.util.Map;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

/**
* Verifies that recognized typed read options (e.g. {@code batch_size}, {@code block_size}, {@code
* path}, {@code executor_credential_refresh}) are stripped from {@link
* LanceSparkReadOptions#getStorageOptions()} after parsing.
*
* <p>Motivation: {@code storageOptions} is forwarded verbatim to the native Lance storage layer (S3
* / OSS / GCS credential + endpoint map). Typed connector-level knobs were previously leaking into
* that map because {@code Builder.fromOptions} saved the entire input map as {@code storageOptions}
* before {@code parseTypedFlags} promoted recognized keys to their dedicated builder fields. The
* Rust layer silently drops unknown keys, so no functional breakage, but it muddies the native log
* and complicates debugging of credential / endpoint issues.
*
* <p>Pure connector-level knobs and the dataset path must not appear in the storage-options map at
* all. Keys the native layer consumes directly (e.g. {@code aws_region}, {@code aws_endpoint}) must
* still pass through untouched.
*/
public class LanceSparkReadOptionsTypedKeyStrippingTest {

@Test
public void blockSizeIsStripped() {
Map<String, String> opts = new HashMap<>();
opts.put(LanceSparkReadOptions.CONFIG_DATASET_URI, "s3://bucket/path");
opts.put(LanceSparkReadOptions.CONFIG_BLOCK_SIZE, "1048576");

LanceSparkReadOptions options = LanceSparkReadOptions.from(opts);

assertEquals(Integer.valueOf(1048576), options.getBlockSize());
assertFalse(
options.getStorageOptions().containsKey(LanceSparkReadOptions.CONFIG_BLOCK_SIZE),
"block_size must not leak into Rust storage_options");
}

@Test
public void batchSizeIsStripped() {
Map<String, String> opts = new HashMap<>();
opts.put(LanceSparkReadOptions.CONFIG_DATASET_URI, "s3://bucket/path");
opts.put(LanceSparkReadOptions.CONFIG_BATCH_SIZE, "4096");

LanceSparkReadOptions options = LanceSparkReadOptions.from(opts);

assertEquals(4096, options.getBatchSize());
assertFalse(
options.getStorageOptions().containsKey(LanceSparkReadOptions.CONFIG_BATCH_SIZE),
"batch_size must not leak into Rust storage_options");
}

@Test
public void datasetUriPathKeyIsStripped() {
Map<String, String> opts = new HashMap<>();
opts.put(LanceSparkReadOptions.CONFIG_DATASET_URI, "s3://bucket/path");

LanceSparkReadOptions options = LanceSparkReadOptions.from(opts);

assertFalse(
options.getStorageOptions().containsKey(LanceSparkReadOptions.CONFIG_DATASET_URI),
"path must not leak into Rust storage_options — it is a connector-level URL, not a "
+ "storage credential");
}

@Test
public void pushDownFiltersIsStripped() {
Map<String, String> opts = new HashMap<>();
opts.put(LanceSparkReadOptions.CONFIG_DATASET_URI, "s3://bucket/path");
opts.put(LanceSparkReadOptions.CONFIG_PUSH_DOWN_FILTERS, "false");

LanceSparkReadOptions options = LanceSparkReadOptions.from(opts);

assertFalse(options.isPushDownFilters());
assertFalse(
options.getStorageOptions().containsKey(LanceSparkReadOptions.CONFIG_PUSH_DOWN_FILTERS));
}

@Test
public void versionAndCacheSizesAreStripped() {
Map<String, String> opts = new HashMap<>();
opts.put(LanceSparkReadOptions.CONFIG_DATASET_URI, "s3://bucket/path");
opts.put(LanceSparkReadOptions.CONFIG_VERSION, "42");
opts.put(LanceSparkReadOptions.CONFIG_INDEX_CACHE_SIZE, "1024");
opts.put(LanceSparkReadOptions.CONFIG_METADATA_CACHE_SIZE, "2048");

LanceSparkReadOptions options = LanceSparkReadOptions.from(opts);

assertEquals(Integer.valueOf(42), options.getVersion());
assertEquals(Integer.valueOf(1024), options.getIndexCacheSize());
assertEquals(Integer.valueOf(2048), options.getMetadataCacheSize());

Map<String, String> storage = options.getStorageOptions();
assertFalse(storage.containsKey(LanceSparkReadOptions.CONFIG_VERSION));
assertFalse(storage.containsKey(LanceSparkReadOptions.CONFIG_INDEX_CACHE_SIZE));
assertFalse(storage.containsKey(LanceSparkReadOptions.CONFIG_METADATA_CACHE_SIZE));
}

@Test
public void executorCredentialRefreshAndTopNKeysAreStripped() {
Map<String, String> opts = new HashMap<>();
opts.put(LanceSparkReadOptions.CONFIG_DATASET_URI, "s3://bucket/path");
opts.put(LanceSparkReadOptions.CONFIG_EXECUTOR_CREDENTIAL_REFRESH, "false");
opts.put(LanceSparkReadOptions.CONFIG_TOP_N_PUSH_DOWN, "false");

LanceSparkReadOptions options = LanceSparkReadOptions.from(opts);

assertFalse(options.isExecutorCredentialRefresh());
assertFalse(options.isTopNPushDown());

Map<String, String> storage = options.getStorageOptions();
assertFalse(storage.containsKey(LanceSparkReadOptions.CONFIG_EXECUTOR_CREDENTIAL_REFRESH));
assertFalse(storage.containsKey(LanceSparkReadOptions.CONFIG_TOP_N_PUSH_DOWN));
}

@Test
public void genuineStorageKeysPassThroughUntouched() {
Map<String, String> opts = new HashMap<>();
opts.put(LanceSparkReadOptions.CONFIG_DATASET_URI, "s3://bucket/path");
// Typed connector key (must be stripped)
opts.put(LanceSparkReadOptions.CONFIG_BATCH_SIZE, "4096");
// Genuine Rust-side storage options (must pass through)
opts.put("aws_region", "us-east-1");
opts.put("aws_endpoint", "http://localhost:9000");
opts.put("aws_access_key_id", "minioadmin");
opts.put("aws_secret_access_key", "minioadmin");
opts.put("allow_http", "true");

LanceSparkReadOptions options = LanceSparkReadOptions.from(opts);
Map<String, String> storage = options.getStorageOptions();

assertFalse(storage.containsKey(LanceSparkReadOptions.CONFIG_BATCH_SIZE));
assertEquals("us-east-1", storage.get("aws_region"));
assertEquals("http://localhost:9000", storage.get("aws_endpoint"));
assertEquals("minioadmin", storage.get("aws_access_key_id"));
assertEquals("minioadmin", storage.get("aws_secret_access_key"));
assertEquals("true", storage.get("allow_http"));
}

@Test
public void catalogDefaultsWithTypedKeysDoNotLeak() {
// Simulates spark-defaults.conf:
// spark.sql.catalog.foo.batch_size=4096
// spark.sql.catalog.foo.executor_credential_refresh=false
// spark.sql.catalog.foo.aws_region=us-east-1
Map<String, String> catalogOpts = new HashMap<>();
catalogOpts.put(LanceSparkReadOptions.CONFIG_BATCH_SIZE, "4096");
catalogOpts.put(LanceSparkReadOptions.CONFIG_EXECUTOR_CREDENTIAL_REFRESH, "false");
catalogOpts.put("aws_region", "us-east-1");
LanceSparkCatalogConfig catalogConfig = LanceSparkCatalogConfig.from(catalogOpts);

LanceSparkReadOptions options =
LanceSparkReadOptions.builder()
.datasetUri("s3://bucket/path")
.withCatalogDefaults(catalogConfig)
.build();

assertEquals(4096, options.getBatchSize());
assertFalse(options.isExecutorCredentialRefresh());

Map<String, String> storage = options.getStorageOptions();
assertFalse(storage.containsKey(LanceSparkReadOptions.CONFIG_BATCH_SIZE));
assertFalse(storage.containsKey(LanceSparkReadOptions.CONFIG_EXECUTOR_CREDENTIAL_REFRESH));
assertEquals("us-east-1", storage.get("aws_region"));
}

@Test
public void perReadOptionOverridesCatalogAndDoesNotLeak() {
Map<String, String> catalogOpts = new HashMap<>();
catalogOpts.put(LanceSparkReadOptions.CONFIG_BATCH_SIZE, "4096");
LanceSparkCatalogConfig catalogConfig = LanceSparkCatalogConfig.from(catalogOpts);

Map<String, String> readOpts = new HashMap<>();
readOpts.put(LanceSparkReadOptions.CONFIG_DATASET_URI, "s3://bucket/path");
readOpts.put(LanceSparkReadOptions.CONFIG_BATCH_SIZE, "8192");

LanceSparkReadOptions options =
LanceSparkReadOptions.builder()
.datasetUri("s3://bucket/path")
.fromOptions(readOpts)
.withCatalogDefaults(catalogConfig)
.build();

// Per-read .option() must win over catalog default.
assertEquals(8192, options.getBatchSize());
assertFalse(
options.getStorageOptions().containsKey(LanceSparkReadOptions.CONFIG_BATCH_SIZE),
"catalog + per-read merge must still strip typed keys");
}

@Test
public void noTypedKeysStillLeavesGenuineStorageOptionsIntact() {
// No typed read options set — only storage credentials. Ensures the strip
// operation is a no-op when there's nothing to strip.
Map<String, String> opts = new HashMap<>();
opts.put(LanceSparkReadOptions.CONFIG_DATASET_URI, "s3://bucket/path");
opts.put("aws_region", "us-east-1");
opts.put("aws_endpoint", "http://localhost:9000");

LanceSparkReadOptions options = LanceSparkReadOptions.from(opts);
Map<String, String> storage = options.getStorageOptions();

assertTrue(storage.size() >= 2, "genuine storage entries must survive");
assertEquals("us-east-1", storage.get("aws_region"));
assertEquals("http://localhost:9000", storage.get("aws_endpoint"));
}
}
Loading
Loading