diff --git a/lance-spark-base_2.12/src/main/java/org/lance/spark/LanceSparkReadOptions.java b/lance-spark-base_2.12/src/main/java/org/lance/spark/LanceSparkReadOptions.java index 3cb835d42..2ef61b816 100644 --- a/lance-spark-base_2.12/src/main/java/org/lance/spark/LanceSparkReadOptions.java +++ b/lance-spark-base_2.12/src/main/java/org/lance/spark/LanceSparkReadOptions.java @@ -19,6 +19,7 @@ import org.lance.spark.utils.QueryUtils; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableSet; import java.io.IOException; import java.io.ObjectInputStream; @@ -28,6 +29,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; /** * Read-specific options for Lance Spark connector. @@ -96,6 +98,27 @@ public class LanceSparkReadOptions implements Serializable { public static final String LANCE_FILE_SUFFIX = ".lance"; + /** + * Option keys that {@link Builder#parseTypedFlags(Map)} promotes into dedicated typed fields on + * the builder. After promotion these keys are removed from {@link #getStorageOptions()} so that + * connector-level knobs do not leak into the Rust-side {@code storage_options} map (which is + * reserved for object-store credentials and endpoint config — {@code aws_*}, {@code gcs_*}, + * {@code allow_http}, etc.). Rust silently drops unknown keys, so this stripping is a debug- + * hygiene fix, not a correctness fix. + */ + static final Set RECOGNIZED_TYPED_KEYS = + ImmutableSet.of( + CONFIG_DATASET_URI, + CONFIG_PUSH_DOWN_FILTERS, + CONFIG_BLOCK_SIZE, + CONFIG_VERSION, + CONFIG_INDEX_CACHE_SIZE, + CONFIG_METADATA_CACHE_SIZE, + CONFIG_BATCH_SIZE, + CONFIG_TOP_N_PUSH_DOWN, + CONFIG_NEAREST, + CONFIG_EXECUTOR_CREDENTIAL_REFRESH); + private static final boolean DEFAULT_PUSH_DOWN_FILTERS = true; // Changed from 512 to 8192 for better OLAP scan performance (33x improvement) private static final int DEFAULT_BATCH_SIZE = 8192; @@ -575,6 +598,15 @@ public LanceSparkReadOptions build() { if (datasetUri == null) { throw new IllegalArgumentException("datasetUri is required"); } + // Strip recognized typed flags from storageOptions at build time, not inside + // parseTypedFlags. parseTypedFlags may be called more than once on the same builder + // (e.g. fromOptions followed by withCatalogDefaults), and later calls re-read from + // this.storageOptions; removing too early would cause per-read overrides to disappear + // when catalog defaults are merged on top. Stripping here keeps storageOptions clean + // for the final consumer (Rust storage_options map) without breaking merge semantics. + for (String key : RECOGNIZED_TYPED_KEYS) { + this.storageOptions.remove(key); + } return new LanceSparkReadOptions(this); } } diff --git a/lance-spark-base_2.12/src/main/java/org/lance/spark/LanceSparkWriteOptions.java b/lance-spark-base_2.12/src/main/java/org/lance/spark/LanceSparkWriteOptions.java index 0e0f6db06..695b95c35 100644 --- a/lance-spark-base_2.12/src/main/java/org/lance/spark/LanceSparkWriteOptions.java +++ b/lance-spark-base_2.12/src/main/java/org/lance/spark/LanceSparkWriteOptions.java @@ -18,12 +18,14 @@ import org.lance.namespace.LanceNamespace; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableSet; import java.io.Serializable; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; /** * Write-specific options for Lance Spark connector. @@ -59,6 +61,31 @@ public class LanceSparkWriteOptions implements Serializable { public static final String CONFIG_MAX_BATCH_BYTES = "max_batch_bytes"; public static final String CONFIG_BLOB_PACK_FILE_SIZE_THRESHOLD = "blob_pack_file_size_threshold"; + /** + * Option keys that {@link Builder#fromOptions(Map)} promotes into dedicated typed fields on the + * builder. After promotion these keys are removed from {@link #getStorageOptions()} so that + * connector-level knobs do not leak into the Rust-side {@code storage_options} map (which is + * reserved for object-store credentials and endpoint config — {@code aws_*}, {@code gcs_*}, + * {@code allow_http}, etc.). Rust silently drops unknown keys, so this stripping is a debug- + * hygiene fix, not a correctness fix. See {@link LanceSparkReadOptions#RECOGNIZED_TYPED_KEYS} for + * the read-side counterpart. + */ + static final Set RECOGNIZED_TYPED_KEYS = + ImmutableSet.of( + CONFIG_DATASET_URI, + CONFIG_WRITE_MODE, + CONFIG_MAX_ROWS_PER_FILE, + CONFIG_MAX_ROWS_PER_GROUP, + CONFIG_MAX_BYTES_PER_FILE, + CONFIG_FILE_FORMAT_VERSION, + CONFIG_USE_QUEUED_WRITE_BUFFER, + CONFIG_QUEUE_DEPTH, + CONFIG_BATCH_SIZE, + CONFIG_ENABLE_STABLE_ROW_IDS, + CONFIG_USE_LARGE_VAR_TYPES, + CONFIG_MAX_BATCH_BYTES, + CONFIG_BLOB_PACK_FILE_SIZE_THRESHOLD); + private static final WriteMode DEFAULT_WRITE_MODE = WriteMode.APPEND; private static final boolean DEFAULT_USE_QUEUED_WRITE_BUFFER = false; private static final int DEFAULT_QUEUE_DEPTH = 8; @@ -533,6 +560,11 @@ public LanceSparkWriteOptions build() { if (datasetUri == null) { throw new IllegalArgumentException("datasetUri is required"); } + // Strip recognized typed flags from storageOptions at build time. Mirror of the read-side + // fix in LanceSparkReadOptions.Builder.build(); see RECOGNIZED_TYPED_KEYS for rationale. + for (String key : RECOGNIZED_TYPED_KEYS) { + this.storageOptions.remove(key); + } return new LanceSparkWriteOptions(this); } } diff --git a/lance-spark-base_2.12/src/test/java/org/lance/spark/LanceSparkReadOptionsTypedKeyStrippingTest.java b/lance-spark-base_2.12/src/test/java/org/lance/spark/LanceSparkReadOptionsTypedKeyStrippingTest.java new file mode 100644 index 000000000..803b3e343 --- /dev/null +++ b/lance-spark-base_2.12/src/test/java/org/lance/spark/LanceSparkReadOptionsTypedKeyStrippingTest.java @@ -0,0 +1,225 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.lance.spark; + +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Verifies that recognized typed read options (e.g. {@code batch_size}, {@code block_size}, {@code + * path}, {@code executor_credential_refresh}) are stripped from {@link + * LanceSparkReadOptions#getStorageOptions()} after parsing. + * + *

Motivation: {@code storageOptions} is forwarded verbatim to the native Lance storage layer (S3 + * / OSS / GCS credential + endpoint map). Typed connector-level knobs were previously leaking into + * that map because {@code Builder.fromOptions} saved the entire input map as {@code storageOptions} + * before {@code parseTypedFlags} promoted recognized keys to their dedicated builder fields. The + * Rust layer silently drops unknown keys, so no functional breakage, but it muddies the native log + * and complicates debugging of credential / endpoint issues. + * + *

Pure connector-level knobs and the dataset path must not appear in the storage-options map at + * all. Keys the native layer consumes directly (e.g. {@code aws_region}, {@code aws_endpoint}) must + * still pass through untouched. + */ +public class LanceSparkReadOptionsTypedKeyStrippingTest { + + @Test + public void blockSizeIsStripped() { + Map opts = new HashMap<>(); + opts.put(LanceSparkReadOptions.CONFIG_DATASET_URI, "s3://bucket/path"); + opts.put(LanceSparkReadOptions.CONFIG_BLOCK_SIZE, "1048576"); + + LanceSparkReadOptions options = LanceSparkReadOptions.from(opts); + + assertEquals(Integer.valueOf(1048576), options.getBlockSize()); + assertFalse( + options.getStorageOptions().containsKey(LanceSparkReadOptions.CONFIG_BLOCK_SIZE), + "block_size must not leak into Rust storage_options"); + } + + @Test + public void batchSizeIsStripped() { + Map opts = new HashMap<>(); + opts.put(LanceSparkReadOptions.CONFIG_DATASET_URI, "s3://bucket/path"); + opts.put(LanceSparkReadOptions.CONFIG_BATCH_SIZE, "4096"); + + LanceSparkReadOptions options = LanceSparkReadOptions.from(opts); + + assertEquals(4096, options.getBatchSize()); + assertFalse( + options.getStorageOptions().containsKey(LanceSparkReadOptions.CONFIG_BATCH_SIZE), + "batch_size must not leak into Rust storage_options"); + } + + @Test + public void datasetUriPathKeyIsStripped() { + Map opts = new HashMap<>(); + opts.put(LanceSparkReadOptions.CONFIG_DATASET_URI, "s3://bucket/path"); + + LanceSparkReadOptions options = LanceSparkReadOptions.from(opts); + + assertFalse( + options.getStorageOptions().containsKey(LanceSparkReadOptions.CONFIG_DATASET_URI), + "path must not leak into Rust storage_options — it is a connector-level URL, not a " + + "storage credential"); + } + + @Test + public void pushDownFiltersIsStripped() { + Map opts = new HashMap<>(); + opts.put(LanceSparkReadOptions.CONFIG_DATASET_URI, "s3://bucket/path"); + opts.put(LanceSparkReadOptions.CONFIG_PUSH_DOWN_FILTERS, "false"); + + LanceSparkReadOptions options = LanceSparkReadOptions.from(opts); + + assertFalse(options.isPushDownFilters()); + assertFalse( + options.getStorageOptions().containsKey(LanceSparkReadOptions.CONFIG_PUSH_DOWN_FILTERS)); + } + + @Test + public void versionAndCacheSizesAreStripped() { + Map opts = new HashMap<>(); + opts.put(LanceSparkReadOptions.CONFIG_DATASET_URI, "s3://bucket/path"); + opts.put(LanceSparkReadOptions.CONFIG_VERSION, "42"); + opts.put(LanceSparkReadOptions.CONFIG_INDEX_CACHE_SIZE, "1024"); + opts.put(LanceSparkReadOptions.CONFIG_METADATA_CACHE_SIZE, "2048"); + + LanceSparkReadOptions options = LanceSparkReadOptions.from(opts); + + assertEquals(Integer.valueOf(42), options.getVersion()); + assertEquals(Integer.valueOf(1024), options.getIndexCacheSize()); + assertEquals(Integer.valueOf(2048), options.getMetadataCacheSize()); + + Map storage = options.getStorageOptions(); + assertFalse(storage.containsKey(LanceSparkReadOptions.CONFIG_VERSION)); + assertFalse(storage.containsKey(LanceSparkReadOptions.CONFIG_INDEX_CACHE_SIZE)); + assertFalse(storage.containsKey(LanceSparkReadOptions.CONFIG_METADATA_CACHE_SIZE)); + } + + @Test + public void executorCredentialRefreshAndTopNKeysAreStripped() { + Map opts = new HashMap<>(); + opts.put(LanceSparkReadOptions.CONFIG_DATASET_URI, "s3://bucket/path"); + opts.put(LanceSparkReadOptions.CONFIG_EXECUTOR_CREDENTIAL_REFRESH, "false"); + opts.put(LanceSparkReadOptions.CONFIG_TOP_N_PUSH_DOWN, "false"); + + LanceSparkReadOptions options = LanceSparkReadOptions.from(opts); + + assertFalse(options.isExecutorCredentialRefresh()); + assertFalse(options.isTopNPushDown()); + + Map storage = options.getStorageOptions(); + assertFalse(storage.containsKey(LanceSparkReadOptions.CONFIG_EXECUTOR_CREDENTIAL_REFRESH)); + assertFalse(storage.containsKey(LanceSparkReadOptions.CONFIG_TOP_N_PUSH_DOWN)); + } + + @Test + public void genuineStorageKeysPassThroughUntouched() { + Map opts = new HashMap<>(); + opts.put(LanceSparkReadOptions.CONFIG_DATASET_URI, "s3://bucket/path"); + // Typed connector key (must be stripped) + opts.put(LanceSparkReadOptions.CONFIG_BATCH_SIZE, "4096"); + // Genuine Rust-side storage options (must pass through) + opts.put("aws_region", "us-east-1"); + opts.put("aws_endpoint", "http://localhost:9000"); + opts.put("aws_access_key_id", "minioadmin"); + opts.put("aws_secret_access_key", "minioadmin"); + opts.put("allow_http", "true"); + + LanceSparkReadOptions options = LanceSparkReadOptions.from(opts); + Map storage = options.getStorageOptions(); + + assertFalse(storage.containsKey(LanceSparkReadOptions.CONFIG_BATCH_SIZE)); + assertEquals("us-east-1", storage.get("aws_region")); + assertEquals("http://localhost:9000", storage.get("aws_endpoint")); + assertEquals("minioadmin", storage.get("aws_access_key_id")); + assertEquals("minioadmin", storage.get("aws_secret_access_key")); + assertEquals("true", storage.get("allow_http")); + } + + @Test + public void catalogDefaultsWithTypedKeysDoNotLeak() { + // Simulates spark-defaults.conf: + // spark.sql.catalog.foo.batch_size=4096 + // spark.sql.catalog.foo.executor_credential_refresh=false + // spark.sql.catalog.foo.aws_region=us-east-1 + Map catalogOpts = new HashMap<>(); + catalogOpts.put(LanceSparkReadOptions.CONFIG_BATCH_SIZE, "4096"); + catalogOpts.put(LanceSparkReadOptions.CONFIG_EXECUTOR_CREDENTIAL_REFRESH, "false"); + catalogOpts.put("aws_region", "us-east-1"); + LanceSparkCatalogConfig catalogConfig = LanceSparkCatalogConfig.from(catalogOpts); + + LanceSparkReadOptions options = + LanceSparkReadOptions.builder() + .datasetUri("s3://bucket/path") + .withCatalogDefaults(catalogConfig) + .build(); + + assertEquals(4096, options.getBatchSize()); + assertFalse(options.isExecutorCredentialRefresh()); + + Map storage = options.getStorageOptions(); + assertFalse(storage.containsKey(LanceSparkReadOptions.CONFIG_BATCH_SIZE)); + assertFalse(storage.containsKey(LanceSparkReadOptions.CONFIG_EXECUTOR_CREDENTIAL_REFRESH)); + assertEquals("us-east-1", storage.get("aws_region")); + } + + @Test + public void perReadOptionOverridesCatalogAndDoesNotLeak() { + Map catalogOpts = new HashMap<>(); + catalogOpts.put(LanceSparkReadOptions.CONFIG_BATCH_SIZE, "4096"); + LanceSparkCatalogConfig catalogConfig = LanceSparkCatalogConfig.from(catalogOpts); + + Map readOpts = new HashMap<>(); + readOpts.put(LanceSparkReadOptions.CONFIG_DATASET_URI, "s3://bucket/path"); + readOpts.put(LanceSparkReadOptions.CONFIG_BATCH_SIZE, "8192"); + + LanceSparkReadOptions options = + LanceSparkReadOptions.builder() + .datasetUri("s3://bucket/path") + .fromOptions(readOpts) + .withCatalogDefaults(catalogConfig) + .build(); + + // Per-read .option() must win over catalog default. + assertEquals(8192, options.getBatchSize()); + assertFalse( + options.getStorageOptions().containsKey(LanceSparkReadOptions.CONFIG_BATCH_SIZE), + "catalog + per-read merge must still strip typed keys"); + } + + @Test + public void noTypedKeysStillLeavesGenuineStorageOptionsIntact() { + // No typed read options set — only storage credentials. Ensures the strip + // operation is a no-op when there's nothing to strip. + Map opts = new HashMap<>(); + opts.put(LanceSparkReadOptions.CONFIG_DATASET_URI, "s3://bucket/path"); + opts.put("aws_region", "us-east-1"); + opts.put("aws_endpoint", "http://localhost:9000"); + + LanceSparkReadOptions options = LanceSparkReadOptions.from(opts); + Map storage = options.getStorageOptions(); + + assertTrue(storage.size() >= 2, "genuine storage entries must survive"); + assertEquals("us-east-1", storage.get("aws_region")); + assertEquals("http://localhost:9000", storage.get("aws_endpoint")); + } +} diff --git a/lance-spark-base_2.12/src/test/java/org/lance/spark/LanceSparkWriteOptionsTypedKeyStrippingTest.java b/lance-spark-base_2.12/src/test/java/org/lance/spark/LanceSparkWriteOptionsTypedKeyStrippingTest.java new file mode 100644 index 000000000..693a56f85 --- /dev/null +++ b/lance-spark-base_2.12/src/test/java/org/lance/spark/LanceSparkWriteOptionsTypedKeyStrippingTest.java @@ -0,0 +1,169 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.lance.spark; + +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; + +/** + * Mirror of {@link LanceSparkReadOptionsTypedKeyStrippingTest} for the write path. Verifies that + * recognized typed write options ({@code write_mode}, {@code max_row_per_file}, {@code batch_size}, + * {@code path}, ...) are stripped from {@link LanceSparkWriteOptions#getStorageOptions()} after + * parsing, so connector-level knobs do not leak into the Rust-side storage options map reserved for + * object-store credentials and endpoint config. + */ +public class LanceSparkWriteOptionsTypedKeyStrippingTest { + + @Test + public void writeModeIsStripped() { + Map opts = new HashMap<>(); + opts.put(LanceSparkWriteOptions.CONFIG_DATASET_URI, "s3://bucket/path"); + opts.put(LanceSparkWriteOptions.CONFIG_WRITE_MODE, "APPEND"); + + LanceSparkWriteOptions options = + LanceSparkWriteOptions.builder().datasetUri("s3://bucket/path").fromOptions(opts).build(); + + assertFalse( + options.getStorageOptions().containsKey(LanceSparkWriteOptions.CONFIG_WRITE_MODE), + "write_mode must not leak into Rust storage_options"); + } + + @Test + public void maxRowsAndBytesKeysAreStripped() { + Map opts = new HashMap<>(); + opts.put(LanceSparkWriteOptions.CONFIG_DATASET_URI, "s3://bucket/path"); + opts.put(LanceSparkWriteOptions.CONFIG_MAX_ROWS_PER_FILE, "1000000"); + opts.put(LanceSparkWriteOptions.CONFIG_MAX_ROWS_PER_GROUP, "8192"); + opts.put(LanceSparkWriteOptions.CONFIG_MAX_BYTES_PER_FILE, "1073741824"); + opts.put(LanceSparkWriteOptions.CONFIG_MAX_BATCH_BYTES, "268435456"); + + LanceSparkWriteOptions options = + LanceSparkWriteOptions.builder().datasetUri("s3://bucket/path").fromOptions(opts).build(); + + Map storage = options.getStorageOptions(); + assertFalse(storage.containsKey(LanceSparkWriteOptions.CONFIG_MAX_ROWS_PER_FILE)); + assertFalse(storage.containsKey(LanceSparkWriteOptions.CONFIG_MAX_ROWS_PER_GROUP)); + assertFalse(storage.containsKey(LanceSparkWriteOptions.CONFIG_MAX_BYTES_PER_FILE)); + assertFalse(storage.containsKey(LanceSparkWriteOptions.CONFIG_MAX_BATCH_BYTES)); + } + + @Test + public void batchSizeAndQueueDepthAreStripped() { + Map opts = new HashMap<>(); + opts.put(LanceSparkWriteOptions.CONFIG_DATASET_URI, "s3://bucket/path"); + opts.put(LanceSparkWriteOptions.CONFIG_BATCH_SIZE, "4096"); + opts.put(LanceSparkWriteOptions.CONFIG_QUEUE_DEPTH, "16"); + opts.put(LanceSparkWriteOptions.CONFIG_USE_QUEUED_WRITE_BUFFER, "true"); + + LanceSparkWriteOptions options = + LanceSparkWriteOptions.builder().datasetUri("s3://bucket/path").fromOptions(opts).build(); + + Map storage = options.getStorageOptions(); + assertFalse(storage.containsKey(LanceSparkWriteOptions.CONFIG_BATCH_SIZE)); + assertFalse(storage.containsKey(LanceSparkWriteOptions.CONFIG_QUEUE_DEPTH)); + assertFalse(storage.containsKey(LanceSparkWriteOptions.CONFIG_USE_QUEUED_WRITE_BUFFER)); + } + + @Test + public void fileFormatAndStableRowIdsKeysAreStripped() { + Map opts = new HashMap<>(); + opts.put(LanceSparkWriteOptions.CONFIG_DATASET_URI, "s3://bucket/path"); + opts.put(LanceSparkWriteOptions.CONFIG_FILE_FORMAT_VERSION, "2.1"); + opts.put(LanceSparkWriteOptions.CONFIG_ENABLE_STABLE_ROW_IDS, "true"); + opts.put(LanceSparkWriteOptions.CONFIG_USE_LARGE_VAR_TYPES, "true"); + + LanceSparkWriteOptions options = + LanceSparkWriteOptions.builder().datasetUri("s3://bucket/path").fromOptions(opts).build(); + + Map storage = options.getStorageOptions(); + assertFalse(storage.containsKey(LanceSparkWriteOptions.CONFIG_FILE_FORMAT_VERSION)); + assertFalse(storage.containsKey(LanceSparkWriteOptions.CONFIG_ENABLE_STABLE_ROW_IDS)); + assertFalse(storage.containsKey(LanceSparkWriteOptions.CONFIG_USE_LARGE_VAR_TYPES)); + } + + @Test + public void blobPackFileSizeThresholdIsStripped() { + Map opts = new HashMap<>(); + opts.put(LanceSparkWriteOptions.CONFIG_DATASET_URI, "s3://bucket/path"); + opts.put(LanceSparkWriteOptions.CONFIG_BLOB_PACK_FILE_SIZE_THRESHOLD, "1048576"); + + LanceSparkWriteOptions options = + LanceSparkWriteOptions.builder().datasetUri("s3://bucket/path").fromOptions(opts).build(); + + assertFalse( + options + .getStorageOptions() + .containsKey(LanceSparkWriteOptions.CONFIG_BLOB_PACK_FILE_SIZE_THRESHOLD)); + } + + @Test + public void datasetUriPathKeyIsStripped() { + Map opts = new HashMap<>(); + opts.put(LanceSparkWriteOptions.CONFIG_DATASET_URI, "s3://bucket/path"); + + LanceSparkWriteOptions options = + LanceSparkWriteOptions.builder().datasetUri("s3://bucket/path").fromOptions(opts).build(); + + assertFalse( + options.getStorageOptions().containsKey(LanceSparkWriteOptions.CONFIG_DATASET_URI), + "path must not leak into Rust storage_options — it is a connector-level URL, not a " + + "storage credential"); + } + + @Test + public void genuineStorageKeysPassThroughUntouched() { + Map opts = new HashMap<>(); + opts.put(LanceSparkWriteOptions.CONFIG_DATASET_URI, "s3://bucket/path"); + // Typed connector key (must be stripped) + opts.put(LanceSparkWriteOptions.CONFIG_BATCH_SIZE, "4096"); + // Genuine Rust-side storage options (must pass through) + opts.put("aws_region", "us-east-1"); + opts.put("aws_endpoint", "http://localhost:9000"); + opts.put("allow_http", "true"); + + LanceSparkWriteOptions options = + LanceSparkWriteOptions.builder().datasetUri("s3://bucket/path").fromOptions(opts).build(); + Map storage = options.getStorageOptions(); + + assertFalse(storage.containsKey(LanceSparkWriteOptions.CONFIG_BATCH_SIZE)); + assertEquals("us-east-1", storage.get("aws_region")); + assertEquals("http://localhost:9000", storage.get("aws_endpoint")); + assertEquals("true", storage.get("allow_http")); + } + + @Test + public void catalogDefaultsWithTypedKeysDoNotLeak() { + // Simulates spark-defaults.conf: spark.sql.catalog.foo.batch_size=4096 + // spark.sql.catalog.foo.aws_region=us-east-1 + Map catalogOpts = new HashMap<>(); + catalogOpts.put(LanceSparkWriteOptions.CONFIG_BATCH_SIZE, "4096"); + catalogOpts.put("aws_region", "us-east-1"); + LanceSparkCatalogConfig catalogConfig = LanceSparkCatalogConfig.from(catalogOpts); + + LanceSparkWriteOptions options = + LanceSparkWriteOptions.builder() + .datasetUri("s3://bucket/path") + .withCatalogDefaults(catalogConfig) + .build(); + + Map storage = options.getStorageOptions(); + assertFalse(storage.containsKey(LanceSparkWriteOptions.CONFIG_BATCH_SIZE)); + assertEquals("us-east-1", storage.get("aws_region")); + } +}