diff --git a/src/main/scala/MilvusOption.scala b/src/main/scala/MilvusOption.scala index 33039f6..ff7012e 100644 --- a/src/main/scala/MilvusOption.scala +++ b/src/main/scala/MilvusOption.scala @@ -42,8 +42,94 @@ case class MilvusOption( fieldIDs: String = "", extraColumns: Seq[String] = Seq.empty, options: Map[String, String] = Map.empty, - vectorSearchConfig: Option[VectorSearchConfig] = None -) + vectorSearchConfig: Option[VectorSearchConfig] = None, + // S3/FileSystem configuration (merged from MilvusS3Option) + readerType: String = "", + s3FileSystemType: String = "", + s3BucketName: String = "", + s3RootPath: String = "", + s3Endpoint: String = "", + s3AccessKey: String = "", + s3SecretKey: String = "", + s3UseSSL: Boolean = false, + s3PathStyleAccess: Boolean = true, + s3MaxConnections: Int = 32, + s3PreloadPoolSize: Int = 4 +) extends Serializable { + + def notEmpty(str: String): Boolean = str != null && str.trim.nonEmpty + + def getConf(): Configuration = { + val conf = new Configuration() + if (notEmpty(s3FileSystemType)) { + // Basic S3 configuration + conf.set("fs.s3a.endpoint", s3Endpoint) + conf.set("fs.s3a.path.style.access", s3PathStyleAccess.toString) + conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") + conf.set( + "fs.s3a.aws.credentials.provider", + "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider,com.amazonaws.auth.DefaultAWSCredentialsProviderChain" + ) + conf.set("fs.s3a.access.key", s3AccessKey) + conf.set("fs.s3a.secret.key", s3SecretKey) + conf.set("fs.s3a.connection.ssl.enabled", s3UseSSL.toString) + + // Performance optimization settings + conf.set("fs.s3a.block.size", "134217728") // 128MB + conf.set("fs.s3a.threads.max", s3MaxConnections.toString) + conf.set("fs.s3a.threads.core", (s3MaxConnections / 2).toString) + conf.set("fs.s3a.connection.maximum", (s3MaxConnections + 32).toString) + conf.set("fs.s3a.connection.timeout", "30000") + conf.set("fs.s3a.socket.timeout", "30000") + conf.set("fs.s3a.retry.limit", "3") + } + conf + } + + def getFileSystem(path: Path): FileSystem = { + if (notEmpty(s3FileSystemType)) { + val conf = getConf() + val fileSystem = new S3AFileSystem() + try { + fileSystem.initialize( + new URI( + s"s3a://${s3BucketName}/" + ), + conf + ) + fileSystem + } catch { + case e: Exception => + // Close the filesystem if initialization failed + try { + fileSystem.close() + } catch { + case _: Exception => // Ignore close errors + } + throw new RuntimeException( + s"Failed to initialize S3 FileSystem for bucket $s3BucketName: ${e.getMessage}", + e + ) + } + } else { + val conf = getConf() + path.getFileSystem(conf) + } + } + + def getFilePath(path: String): Path = { + if (notEmpty(s3FileSystemType)) { + if (path.startsWith("s3a://")) { + new Path(path) + } else { + val finalPath = s"s3a://${s3BucketName}/${s3RootPath}/${path}" + new Path(new URI(finalPath)) + } + } else { + new Path(path) + } + } +} object MilvusOption { // Constants for map keys @@ -80,35 +166,37 @@ object MilvusOption { val VectorSearchVectorColumn = "vector.search.column" val VectorSearchIdColumn = "vector.search.idColumn" - // s3 config (legacy, for V1 binlog) - val S3FileSystemTypeName = Constants.S3FileSystemTypeName - val S3Endpoint = Constants.S3Endpoint - val S3BucketName = Constants.S3BucketName - val S3RootPath = Constants.S3RootPath - val S3AccessKey = Constants.S3AccessKey - val S3SecretKey = Constants.S3SecretKey - val S3UseSSL = Constants.S3UseSSL - val S3PathStyleAccess = Constants.S3PathStyleAccess + // S3 configuration (for V1 binlog via Hadoop S3A FileSystem) + val S3FileSystemTypeName = "s3.fs" // default: s3a:// + val S3Endpoint = "s3.endpoint" + val S3BucketName = "s3.bucket" + val S3RootPath = "s3.rootPath" + val S3AccessKey = "s3.user" + val S3SecretKey = "s3.password" + val S3UseSSL = "s3.useSSL" + val S3PathStyleAccess = "s3.pathStyleAccess" + val S3MaxConnections = "s3.maxConnections" + val S3PreloadPoolSize = "s3.preloadPoolSize" - // FFI (Storage V2) filesystem property keys - val FsAddress = Constants.FsAddress - val FsBucketName = Constants.FsBucketName - val FsAccessKeyId = Constants.FsAccessKeyId - val FsAccessKeyValue = Constants.FsAccessKeyValue - val FsRootPath = Constants.FsRootPath - val FsStorageType = Constants.FsStorageType - val FsCloudProvider = Constants.FsCloudProvider - val FsIamEndpoint = Constants.FsIamEndpoint - val FsLogLevel = Constants.FsLogLevel - val FsRegion = Constants.FsRegion - val FsUseSSL = Constants.FsUseSSL - val FsSslCaCert = Constants.FsSslCaCert - val FsUseIam = Constants.FsUseIam - val FsUseVirtualHost = Constants.FsUseVirtualHost - val FsRequestTimeoutMs = Constants.FsRequestTimeoutMs - val FsGcpNativeWithoutAuth = Constants.FsGcpNativeWithoutAuth - val FsGcpCredentialJson = Constants.FsGcpCredentialJson - val FsUseCustomPartUpload = Constants.FsUseCustomPartUpload + // FFI filesystem configuration (for Storage V2 via native milvus-storage library) + val FsAddress = "fs.address" + val FsBucketName = "fs.bucket_name" + val FsAccessKeyId = "fs.access_key_id" + val FsAccessKeyValue = "fs.access_key_value" + val FsRootPath = "fs.root_path" + val FsStorageType = "fs.storage_type" + val FsCloudProvider = "fs.cloud_provider" + val FsIamEndpoint = "fs.iam_endpoint" + val FsLogLevel = "fs.log_level" + val FsRegion = "fs.region" + val FsUseSSL = "fs.use_ssl" + val FsSslCaCert = "fs.ssl_ca_cert" + val FsUseIam = "fs.use_iam" + val FsUseVirtualHost = "fs.use_virtual_host" + val FsRequestTimeoutMs = "fs.request_timeout_ms" + val FsGcpNativeWithoutAuth = "fs.gcp_native_without_auth" + val FsGcpCredentialJson = "fs.gcp_credential_json" + val FsUseCustomPartUpload = "fs.use_custom_part_upload" // Create MilvusOption from a map def apply(options: CaseInsensitiveStringMap): MilvusOption = { @@ -147,6 +235,19 @@ object MilvusOption { // Parse vector search configuration val vectorSearchConfig = parseVectorSearchConfig(options) + // Parse S3/FileSystem configuration (merged from MilvusS3Option) + val readerType = options.getOrDefault(ReaderType, "") + val s3FileSystemType = options.getOrDefault(S3FileSystemTypeName, "") + val s3BucketName = options.getOrDefault(S3BucketName, "a-bucket") + val s3RootPath = options.getOrDefault(S3RootPath, "files") + val s3Endpoint = options.getOrDefault(S3Endpoint, "localhost:9000") + val s3AccessKey = options.getOrDefault(S3AccessKey, "minioadmin") + val s3SecretKey = options.getOrDefault(S3SecretKey, "minioadmin") + val s3UseSSL = options.getOrDefault(S3UseSSL, "false").toBoolean + val s3PathStyleAccess = options.getOrDefault(S3PathStyleAccess, "true").toBoolean + val s3MaxConnections = options.getOrDefault(S3MaxConnections, "32").toInt + val s3PreloadPoolSize = options.getOrDefault(S3PreloadPoolSize, "4").toInt + MilvusOption( uri, token, @@ -168,7 +269,18 @@ object MilvusOption { fieldIDs, extraColumns, optionsMap, - vectorSearchConfig + vectorSearchConfig, + readerType, + s3FileSystemType, + s3BucketName, + s3RootPath, + s3Endpoint, + s3AccessKey, + s3SecretKey, + s3UseSSL, + s3PathStyleAccess, + s3MaxConnections, + s3PreloadPoolSize ) } @@ -231,109 +343,3 @@ object MilvusOption { } } -case class MilvusS3Option( - readerType: String, - s3FileSystemType: String, - s3BucketName: String, - s3RootPath: String, - s3Endpoint: String, - s3AccessKey: String, - s3SecretKey: String, - s3UseSSL: Boolean, - s3PathStyleAccess: Boolean, - milvusPKType: String, - s3MaxConnections: Int, - s3PreloadPoolSize: Int -) extends Serializable { - def notEmpty(str: String): Boolean = str != null && str.trim.nonEmpty - - def getConf(): Configuration = { - val conf = new Configuration() - if (notEmpty(s3FileSystemType)) { - // Basic S3 configuration - conf.set("fs.s3a.endpoint", s3Endpoint) - conf.set("fs.s3a.path.style.access", s3PathStyleAccess.toString) - conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") - conf.set( - "fs.s3a.aws.credentials.provider", - "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider,com.amazonaws.auth.DefaultAWSCredentialsProviderChain" - ) - conf.set("fs.s3a.access.key", s3AccessKey) - conf.set("fs.s3a.secret.key", s3SecretKey) - conf.set("fs.s3a.connection.ssl.enabled", s3UseSSL.toString) - - // Performance optimization settings - conf.set("fs.s3a.block.size", "134217728") // 128MB - conf.set("fs.s3a.threads.max", s3MaxConnections.toString) - conf.set("fs.s3a.threads.core", (s3MaxConnections / 2).toString) - conf.set("fs.s3a.connection.maximum", (s3MaxConnections + 32).toString) - conf.set("fs.s3a.connection.timeout", "30000") - conf.set("fs.s3a.socket.timeout", "30000") - conf.set("fs.s3a.retry.limit", "3") - } - conf - } - - def getFileSystem(path: Path): FileSystem = { - if (notEmpty(s3FileSystemType)) { - val conf = getConf() - val fileSystem = new S3AFileSystem() - try { - fileSystem.initialize( - new URI( - s"s3a://${s3BucketName}/" - ), - conf - ) - fileSystem - } catch { - case e: Exception => - // Close the filesystem if initialization failed - try { - fileSystem.close() - } catch { - case _: Exception => // Ignore close errors - } - throw new RuntimeException( - s"Failed to initialize S3 FileSystem for bucket $s3BucketName: ${e.getMessage}", - e - ) - } - } else { - val conf = getConf() - path.getFileSystem(conf) - } - } - - def getFilePath(path: String): Path = { - if (notEmpty(s3FileSystemType)) { - if (path.startsWith("s3a://")) { - new Path(path) - } else { - val finalPath = s"s3a://${s3BucketName}/${s3RootPath}/${path}" - new Path(new URI(finalPath)) - } - } else { - new Path(path) - } - } -} - -object MilvusS3Option { - def apply(options: CaseInsensitiveStringMap): MilvusS3Option = { - new MilvusS3Option( - options.get(Constants.LogReaderTypeParamName), - options.get(Constants.S3FileSystemTypeName), - options.getOrDefault(Constants.S3BucketName, "a-bucket"), - options.getOrDefault(Constants.S3RootPath, "files"), - options.getOrDefault(Constants.S3Endpoint, "localhost:9000"), - options.getOrDefault(Constants.S3AccessKey, "minioadmin"), - options.getOrDefault(Constants.S3SecretKey, "minioadmin"), - options.getOrDefault(Constants.S3UseSSL, "false").toBoolean, - options.getOrDefault(Constants.S3PathStyleAccess, "true").toBoolean, - options.getOrDefault(MilvusOption.MilvusCollectionPKType, ""), - options.getOrDefault(Constants.S3MaxConnections, "32").toInt, - options.getOrDefault(Constants.S3PreloadPoolSize, "4").toInt - ) - } -} diff --git a/src/main/scala/binlog/LogObject.scala b/src/main/scala/binlog/LogObject.scala index ac4a5ff..e3c8e2e 100644 --- a/src/main/scala/binlog/LogObject.scala +++ b/src/main/scala/binlog/LogObject.scala @@ -40,38 +40,6 @@ object Constants { val LogReaderTypeDelete = "delete" val LogReaderFieldIDs = "reader.field.ids" - // s3 config - val S3FileSystemTypeName = "s3.fs" // default: s3a:// - val S3Endpoint = "s3.endpoint" - val S3BucketName = "s3.bucket" - val S3RootPath = "s3.rootPath" - val S3AccessKey = "s3.user" - val S3SecretKey = "s3.password" - val S3UseSSL = "s3.useSSL" - val S3PathStyleAccess = "s3.pathStyleAccess" - val S3MaxConnections = "s3.maxConnections" - val S3PreloadPoolSize = "s3.preloadPoolSize" - - // FFI (Storage V2) filesystem property keys - val FsAddress = "fs.address" - val FsBucketName = "fs.bucket_name" - val FsAccessKeyId = "fs.access_key_id" - val FsAccessKeyValue = "fs.access_key_value" - val FsRootPath = "fs.root_path" - val FsStorageType = "fs.storage_type" - val FsCloudProvider = "fs.cloud_provider" - val FsIamEndpoint = "fs.iam_endpoint" - val FsLogLevel = "fs.log_level" - val FsRegion = "fs.region" - val FsUseSSL = "fs.use_ssl" - val FsSslCaCert = "fs.ssl_ca_cert" - val FsUseIam = "fs.use_iam" - val FsUseVirtualHost = "fs.use_virtual_host" - val FsRequestTimeoutMs = "fs.request_timeout_ms" - val FsGcpNativeWithoutAuth = "fs.gcp_native_without_auth" - val FsGcpCredentialJson = "fs.gcp_credential_json" - val FsUseCustomPartUpload = "fs.use_custom_part_upload" - val TimestampFieldID = "1" def readMagicNumber(buffer: ByteBuffer) = { diff --git a/src/main/scala/binlog/MilvusBinlogDataSource.scala b/src/main/scala/binlog/MilvusBinlogDataSource.scala index 57bba4c..88a2db5 100644 --- a/src/main/scala/binlog/MilvusBinlogDataSource.scala +++ b/src/main/scala/binlog/MilvusBinlogDataSource.scala @@ -38,8 +38,7 @@ import org.apache.spark.unsafe.types.UTF8String import com.zilliz.spark.connector.{ MilvusClient, MilvusCollectionInfo, - MilvusOption, - MilvusS3Option + MilvusOption } import io.milvus.grpc.schema.DataType @@ -279,7 +278,6 @@ class MilvusBinlogScan( with Batch with Logging { private val milvusOption = MilvusOption(options) - private val readerOptions = MilvusS3Option(options) private val pathOption: String = getPathOption() if (pathOption == null) { throw new IllegalArgumentException( @@ -288,7 +286,7 @@ class MilvusBinlogScan( } def getPathOption(): String = { - if (!readerOptions.notEmpty(readerOptions.s3FileSystemType)) { + if (!milvusOption.notEmpty(milvusOption.s3FileSystemType)) { return options.get(MilvusOption.ReaderPath) } val collection = milvusOption.collectionID @@ -299,14 +297,14 @@ class MilvusBinlogScan( return options.get(MilvusOption.ReaderPath) } if ( - readerOptions.readerType == Constants.LogReaderTypeInsert && field.isEmpty + milvusOption.readerType == Constants.LogReaderTypeInsert && field.isEmpty ) { throw new IllegalArgumentException( "Option 'field' is required for insert log." ) } val firstPath = - if (readerOptions.readerType == Constants.LogReaderTypeInsert) { + if (milvusOption.readerType == Constants.LogReaderTypeInsert) { "insert_log" } else { "delta_log" @@ -317,7 +315,7 @@ class MilvusBinlogScan( if (segment.isEmpty) { return s"${firstPath}/${collection}/${partition}" } - if (readerOptions.readerType == Constants.LogReaderTypeInsert) { + if (milvusOption.readerType == Constants.LogReaderTypeInsert) { return s"${firstPath}/${collection}/${partition}/${segment}/${field}" } return s"${firstPath}/${collection}/${partition}/${segment}" @@ -346,7 +344,7 @@ class MilvusBinlogScan( try { val field = options.getOrDefault("field", "") - if (readerOptions.readerType == Constants.LogReaderTypeInsert) { + if (milvusOption.readerType == Constants.LogReaderTypeInsert) { fs.listStatus(segmentPath) .filter(_.getPath.getName == field) .filter(_.isDirectory()) @@ -412,17 +410,17 @@ class MilvusBinlogScan( } override def planInputPartitions(): Array[InputPartition] = { - var path = readerOptions.getFilePath(pathOption) + var path = milvusOption.getFilePath(pathOption) var fileStatuses = new SHashMap[String, Seq[FileStatus]]() - val fs = readerOptions.getFileSystem(path) + val fs = milvusOption.getFileSystem(path) val collection = milvusOption.collectionID val partition = milvusOption.partitionID val segment = milvusOption.segmentID val field = milvusOption.fieldID if ( - readerOptions.notEmpty( - readerOptions.s3FileSystemType + milvusOption.notEmpty( + milvusOption.s3FileSystemType ) && !collection.isEmpty ) { val client = MilvusClient(milvusOption) @@ -510,7 +508,7 @@ class MilvusBinlogPartitionReaderFactory( pushedFilters: Array[Filter] ) extends PartitionReaderFactory { - private val readerOptions = MilvusS3Option(options) + private val milvusOption = MilvusOption(options) override def createReader( partition: InputPartition @@ -519,7 +517,7 @@ class MilvusBinlogPartitionReaderFactory( new MilvusBinlogPartitionReader( schema, filePaths, - readerOptions, + milvusOption, pushedFilters ) } @@ -529,7 +527,7 @@ class MilvusBinlogPartitionReaderFactory( class MilvusBinlogPartitionReader( schema: StructType, filePaths: Array[String], - options: MilvusS3Option, + options: MilvusOption, pushedFilters: Array[Filter] ) extends PartitionReader[InternalRow] with Logging { @@ -606,7 +604,7 @@ class MilvusBinlogPartitionReader( schema.fields.map(field => { field.name match { case "data" => { - if (isDelete && MilvusOption.isInt64PK(options.milvusPKType)) { + if (isDelete && MilvusOption.isInt64PK(options.collectionPKType)) { values.append(data.toLong) } else { values.append(UTF8String.fromString(data)) diff --git a/src/main/scala/read/MilvusPartitionReader.scala b/src/main/scala/read/MilvusPartitionReader.scala index f47db60..2c64d99 100644 --- a/src/main/scala/read/MilvusPartitionReader.scala +++ b/src/main/scala/read/MilvusPartitionReader.scala @@ -28,7 +28,7 @@ import com.zilliz.spark.connector.binlog.{ InsertEventData, LogReader } -import com.zilliz.spark.connector.MilvusS3Option +import com.zilliz.spark.connector.MilvusOption import io.milvus.grpc.schema.{DataType => MilvusDataType} // for Milvus 2.5 and below version data source @@ -36,7 +36,7 @@ class MilvusPartitionReader( schema: StructType, fieldFilesSeq: Seq[Map[String, String]], partition: String, - options: MilvusS3Option, + options: MilvusOption, pushedFilters: Array[Filter] = Array.empty[Filter] ) extends PartitionReader[InternalRow] with Logging { @@ -85,7 +85,7 @@ class MilvusPartitionReader( private class MilvusBinlogFieldFileReader( filePath: String, - options: MilvusS3Option + options: MilvusOption ) extends FieldFileReader with Logging { diff --git a/src/main/scala/read/MilvusPartitionReaderFactory.scala b/src/main/scala/read/MilvusPartitionReaderFactory.scala index 0292706..f9a284a 100644 --- a/src/main/scala/read/MilvusPartitionReaderFactory.scala +++ b/src/main/scala/read/MilvusPartitionReaderFactory.scala @@ -7,7 +7,7 @@ import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap -import com.zilliz.spark.connector.MilvusS3Option +import com.zilliz.spark.connector.MilvusOption import io.milvus.grpc.schema.CollectionSchema // Unified PartitionReaderFactory that dispatches to V1 or V2 readers based on partition type @@ -18,12 +18,12 @@ class MilvusPartitionReaderFactory( ) extends PartitionReaderFactory with Logging { // Reconstruct CaseInsensitiveStringMap for V1 reader - @transient private lazy val readerOptions = { + @transient private lazy val milvusOption = { import scala.jdk.CollectionConverters._ import java.util.HashMap val javaMap = new HashMap[String, String]() optionsMap.foreach { case (k, v) => javaMap.put(k, v) } - MilvusS3Option(new CaseInsensitiveStringMap(javaMap)) + MilvusOption(new CaseInsensitiveStringMap(javaMap)) } override def createReader(partition: InputPartition): PartitionReader[InternalRow] = { @@ -35,7 +35,7 @@ class MilvusPartitionReaderFactory( schema, milvusPartition.fieldFiles, milvusPartition.partition, - readerOptions, + milvusOption, pushedFilters ) diff --git a/src/main/scala/sources/MilvusDataSource.scala b/src/main/scala/sources/MilvusDataSource.scala index 90e6c64..8a995b0 100644 --- a/src/main/scala/sources/MilvusDataSource.scala +++ b/src/main/scala/sources/MilvusDataSource.scala @@ -41,7 +41,6 @@ import com.zilliz.spark.connector.{ MilvusClient, MilvusCollectionInfo, MilvusOption, - MilvusS3Option, VectorSearchConfig } import com.zilliz.spark.connector.read.{ @@ -422,7 +421,6 @@ class MilvusScan( with Batch with Logging { private val milvusOption = MilvusOption(options) - private val readerOption = MilvusS3Option(options) private val pathOption: String = getPathOption() if (pathOption == null) { throw new IllegalArgumentException( @@ -628,8 +626,8 @@ class MilvusScan( override def toBatch: Batch = this override def planInputPartitions(): Array[InputPartition] = { - val rootPath = readerOption.getFilePath(pathOption) - val fs = readerOption.getFileSystem(rootPath) + val rootPath = milvusOption.getFilePath(pathOption) + val fs = milvusOption.getFileSystem(rootPath) // segment path val rawPath = options.getOrDefault(MilvusOption.ReaderPath, "") diff --git a/src/main/scala/sources/MilvusSparkNativeImportWriter.scala b/src/main/scala/sources/MilvusSparkNativeImportWriter.scala index 2103f0f..89876c6 100644 --- a/src/main/scala/sources/MilvusSparkNativeImportWriter.scala +++ b/src/main/scala/sources/MilvusSparkNativeImportWriter.scala @@ -9,7 +9,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.util.CaseInsensitiveStringMap -import com.zilliz.spark.connector.{MilvusClient, MilvusOption, MilvusS3Option} +import com.zilliz.spark.connector.{MilvusClient, MilvusOption} class MilvusSparkNativeImportWriter( uri: String, @@ -30,10 +30,6 @@ class MilvusSparkNativeImportWriter( new CaseInsensitiveStringMap(optionsMap.asJava) ) - private val s3Option = MilvusS3Option( - new CaseInsensitiveStringMap(optionsMap.asJava) - ) - def write(df: DataFrame): Try[Seq[Long]] = { val spark = df.sparkSession @@ -61,7 +57,7 @@ class MilvusSparkNativeImportWriter( private def writeToS3WithSpark(df: DataFrame, s3Path: String): Try[Unit] = { try { val fullS3Path = - s"s3a://${s3Option.s3BucketName}/${s3Option.s3RootPath}/${s3Path}" + s"s3a://${milvusOption.s3BucketName}/${milvusOption.s3RootPath}/${s3Path}" val spark = df.sparkSession configureSparkS3Settings(spark) @@ -84,11 +80,11 @@ class MilvusSparkNativeImportWriter( val conf = spark.sparkContext.hadoopConfiguration // Basic S3 connection settings - conf.set("fs.s3a.endpoint", s3Option.s3Endpoint) - conf.set("fs.s3a.access.key", s3Option.s3AccessKey) - conf.set("fs.s3a.secret.key", s3Option.s3SecretKey) - conf.set("fs.s3a.path.style.access", s3Option.s3PathStyleAccess.toString) - conf.set("fs.s3a.connection.ssl.enabled", s3Option.s3UseSSL.toString) + conf.set("fs.s3a.endpoint", milvusOption.s3Endpoint) + conf.set("fs.s3a.access.key", milvusOption.s3AccessKey) + conf.set("fs.s3a.secret.key", milvusOption.s3SecretKey) + conf.set("fs.s3a.path.style.access", milvusOption.s3PathStyleAccess.toString) + conf.set("fs.s3a.connection.ssl.enabled", milvusOption.s3UseSSL.toString) // Performance optimization settings // Multipart upload size: 128MB (134217728 bytes) - larger parts reduce overhead for big files @@ -113,9 +109,9 @@ class MilvusSparkNativeImportWriter( private def collectS3FilesAndImport(s3Path: String): Try[Seq[Long]] = { try { - val conf = s3Option.getConf() + val conf = milvusOption.getConf() val fullS3Path = - s"s3a://${s3Option.s3BucketName}/${s3Option.s3RootPath}/$s3Path" + s"s3a://${milvusOption.s3BucketName}/${milvusOption.s3RootPath}/$s3Path" val path = new Path(fullS3Path) val fs = path.getFileSystem(conf) @@ -146,7 +142,7 @@ class MilvusSparkNativeImportWriter( parquetFiles ++= collectParquetFiles(fs, fileStatus.getPath) } else if (fileStatus.getPath.getName.endsWith(".parquet")) { val s3Path = fileStatus.getPath.toString.replace( - s"s3a://${s3Option.s3BucketName}", + s"s3a://${milvusOption.s3BucketName}", "" ) parquetFiles += s3Path