Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
280 changes: 143 additions & 137 deletions src/main/scala/MilvusOption.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,94 @@ case class MilvusOption(
fieldIDs: String = "",
extraColumns: Seq[String] = Seq.empty,
options: Map[String, String] = Map.empty,
vectorSearchConfig: Option[VectorSearchConfig] = None
)
vectorSearchConfig: Option[VectorSearchConfig] = None,
// S3/FileSystem configuration (merged from MilvusS3Option)
readerType: String = "",
s3FileSystemType: String = "",
s3BucketName: String = "",
s3RootPath: String = "",
s3Endpoint: String = "",
s3AccessKey: String = "",
s3SecretKey: String = "",
s3UseSSL: Boolean = false,
s3PathStyleAccess: Boolean = true,
s3MaxConnections: Int = 32,
s3PreloadPoolSize: Int = 4
) extends Serializable {

def notEmpty(str: String): Boolean = str != null && str.trim.nonEmpty

def getConf(): Configuration = {
val conf = new Configuration()
if (notEmpty(s3FileSystemType)) {
// Basic S3 configuration
conf.set("fs.s3a.endpoint", s3Endpoint)
conf.set("fs.s3a.path.style.access", s3PathStyleAccess.toString)
conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
conf.set(
"fs.s3a.aws.credentials.provider",
"org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider,com.amazonaws.auth.DefaultAWSCredentialsProviderChain"
)
conf.set("fs.s3a.access.key", s3AccessKey)
conf.set("fs.s3a.secret.key", s3SecretKey)
conf.set("fs.s3a.connection.ssl.enabled", s3UseSSL.toString)

// Performance optimization settings
conf.set("fs.s3a.block.size", "134217728") // 128MB
conf.set("fs.s3a.threads.max", s3MaxConnections.toString)
conf.set("fs.s3a.threads.core", (s3MaxConnections / 2).toString)
conf.set("fs.s3a.connection.maximum", (s3MaxConnections + 32).toString)
conf.set("fs.s3a.connection.timeout", "30000")
conf.set("fs.s3a.socket.timeout", "30000")
conf.set("fs.s3a.retry.limit", "3")
}
conf
}

def getFileSystem(path: Path): FileSystem = {
if (notEmpty(s3FileSystemType)) {
val conf = getConf()
val fileSystem = new S3AFileSystem()
try {
fileSystem.initialize(
new URI(
s"s3a://${s3BucketName}/"
),
conf
)
fileSystem
} catch {
case e: Exception =>
// Close the filesystem if initialization failed
try {
fileSystem.close()
} catch {
case _: Exception => // Ignore close errors
}
throw new RuntimeException(
s"Failed to initialize S3 FileSystem for bucket $s3BucketName: ${e.getMessage}",
e
)
}
} else {
val conf = getConf()
path.getFileSystem(conf)
}
}

def getFilePath(path: String): Path = {
if (notEmpty(s3FileSystemType)) {
if (path.startsWith("s3a://")) {
new Path(path)
} else {
val finalPath = s"s3a://${s3BucketName}/${s3RootPath}/${path}"
new Path(new URI(finalPath))
}
} else {
new Path(path)
}
}
}

object MilvusOption {
// Constants for map keys
Expand Down Expand Up @@ -80,35 +166,37 @@ object MilvusOption {
val VectorSearchVectorColumn = "vector.search.column"
val VectorSearchIdColumn = "vector.search.idColumn"

// s3 config (legacy, for V1 binlog)
val S3FileSystemTypeName = Constants.S3FileSystemTypeName
val S3Endpoint = Constants.S3Endpoint
val S3BucketName = Constants.S3BucketName
val S3RootPath = Constants.S3RootPath
val S3AccessKey = Constants.S3AccessKey
val S3SecretKey = Constants.S3SecretKey
val S3UseSSL = Constants.S3UseSSL
val S3PathStyleAccess = Constants.S3PathStyleAccess
// S3 configuration (for V1 binlog via Hadoop S3A FileSystem)
val S3FileSystemTypeName = "s3.fs" // default: s3a://
val S3Endpoint = "s3.endpoint"
val S3BucketName = "s3.bucket"
val S3RootPath = "s3.rootPath"
val S3AccessKey = "s3.user"
val S3SecretKey = "s3.password"
val S3UseSSL = "s3.useSSL"
val S3PathStyleAccess = "s3.pathStyleAccess"
val S3MaxConnections = "s3.maxConnections"
val S3PreloadPoolSize = "s3.preloadPoolSize"

// FFI (Storage V2) filesystem property keys
val FsAddress = Constants.FsAddress
val FsBucketName = Constants.FsBucketName
val FsAccessKeyId = Constants.FsAccessKeyId
val FsAccessKeyValue = Constants.FsAccessKeyValue
val FsRootPath = Constants.FsRootPath
val FsStorageType = Constants.FsStorageType
val FsCloudProvider = Constants.FsCloudProvider
val FsIamEndpoint = Constants.FsIamEndpoint
val FsLogLevel = Constants.FsLogLevel
val FsRegion = Constants.FsRegion
val FsUseSSL = Constants.FsUseSSL
val FsSslCaCert = Constants.FsSslCaCert
val FsUseIam = Constants.FsUseIam
val FsUseVirtualHost = Constants.FsUseVirtualHost
val FsRequestTimeoutMs = Constants.FsRequestTimeoutMs
val FsGcpNativeWithoutAuth = Constants.FsGcpNativeWithoutAuth
val FsGcpCredentialJson = Constants.FsGcpCredentialJson
val FsUseCustomPartUpload = Constants.FsUseCustomPartUpload
// FFI filesystem configuration (for Storage V2 via native milvus-storage library)
val FsAddress = "fs.address"
val FsBucketName = "fs.bucket_name"
val FsAccessKeyId = "fs.access_key_id"
val FsAccessKeyValue = "fs.access_key_value"
val FsRootPath = "fs.root_path"
val FsStorageType = "fs.storage_type"
val FsCloudProvider = "fs.cloud_provider"
val FsIamEndpoint = "fs.iam_endpoint"
val FsLogLevel = "fs.log_level"
val FsRegion = "fs.region"
val FsUseSSL = "fs.use_ssl"
val FsSslCaCert = "fs.ssl_ca_cert"
val FsUseIam = "fs.use_iam"
val FsUseVirtualHost = "fs.use_virtual_host"
val FsRequestTimeoutMs = "fs.request_timeout_ms"
val FsGcpNativeWithoutAuth = "fs.gcp_native_without_auth"
val FsGcpCredentialJson = "fs.gcp_credential_json"
val FsUseCustomPartUpload = "fs.use_custom_part_upload"

// Create MilvusOption from a map
def apply(options: CaseInsensitiveStringMap): MilvusOption = {
Expand Down Expand Up @@ -147,6 +235,19 @@ object MilvusOption {
// Parse vector search configuration
val vectorSearchConfig = parseVectorSearchConfig(options)

// Parse S3/FileSystem configuration (merged from MilvusS3Option)
val readerType = options.getOrDefault(ReaderType, "")
val s3FileSystemType = options.getOrDefault(S3FileSystemTypeName, "")
val s3BucketName = options.getOrDefault(S3BucketName, "a-bucket")
val s3RootPath = options.getOrDefault(S3RootPath, "files")
val s3Endpoint = options.getOrDefault(S3Endpoint, "localhost:9000")
val s3AccessKey = options.getOrDefault(S3AccessKey, "minioadmin")
val s3SecretKey = options.getOrDefault(S3SecretKey, "minioadmin")
val s3UseSSL = options.getOrDefault(S3UseSSL, "false").toBoolean
val s3PathStyleAccess = options.getOrDefault(S3PathStyleAccess, "true").toBoolean
val s3MaxConnections = options.getOrDefault(S3MaxConnections, "32").toInt
val s3PreloadPoolSize = options.getOrDefault(S3PreloadPoolSize, "4").toInt

MilvusOption(
uri,
token,
Expand All @@ -168,7 +269,18 @@ object MilvusOption {
fieldIDs,
extraColumns,
optionsMap,
vectorSearchConfig
vectorSearchConfig,
readerType,
s3FileSystemType,
s3BucketName,
s3RootPath,
s3Endpoint,
s3AccessKey,
s3SecretKey,
s3UseSSL,
s3PathStyleAccess,
s3MaxConnections,
s3PreloadPoolSize
)
}

Expand Down Expand Up @@ -231,109 +343,3 @@ object MilvusOption {
}
}

case class MilvusS3Option(
readerType: String,
s3FileSystemType: String,
s3BucketName: String,
s3RootPath: String,
s3Endpoint: String,
s3AccessKey: String,
s3SecretKey: String,
s3UseSSL: Boolean,
s3PathStyleAccess: Boolean,
milvusPKType: String,
s3MaxConnections: Int,
s3PreloadPoolSize: Int
) extends Serializable {
def notEmpty(str: String): Boolean = str != null && str.trim.nonEmpty

def getConf(): Configuration = {
val conf = new Configuration()
if (notEmpty(s3FileSystemType)) {
// Basic S3 configuration
conf.set("fs.s3a.endpoint", s3Endpoint)
conf.set("fs.s3a.path.style.access", s3PathStyleAccess.toString)
conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
conf.set(
"fs.s3a.aws.credentials.provider",
"org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider,com.amazonaws.auth.DefaultAWSCredentialsProviderChain"
)
conf.set("fs.s3a.access.key", s3AccessKey)
conf.set("fs.s3a.secret.key", s3SecretKey)
conf.set("fs.s3a.connection.ssl.enabled", s3UseSSL.toString)

// Performance optimization settings
conf.set("fs.s3a.block.size", "134217728") // 128MB
conf.set("fs.s3a.threads.max", s3MaxConnections.toString)
conf.set("fs.s3a.threads.core", (s3MaxConnections / 2).toString)
conf.set("fs.s3a.connection.maximum", (s3MaxConnections + 32).toString)
conf.set("fs.s3a.connection.timeout", "30000")
conf.set("fs.s3a.socket.timeout", "30000")
conf.set("fs.s3a.retry.limit", "3")
}
conf
}

def getFileSystem(path: Path): FileSystem = {
if (notEmpty(s3FileSystemType)) {
val conf = getConf()
val fileSystem = new S3AFileSystem()
try {
fileSystem.initialize(
new URI(
s"s3a://${s3BucketName}/"
),
conf
)
fileSystem
} catch {
case e: Exception =>
// Close the filesystem if initialization failed
try {
fileSystem.close()
} catch {
case _: Exception => // Ignore close errors
}
throw new RuntimeException(
s"Failed to initialize S3 FileSystem for bucket $s3BucketName: ${e.getMessage}",
e
)
}
} else {
val conf = getConf()
path.getFileSystem(conf)
}
}

def getFilePath(path: String): Path = {
if (notEmpty(s3FileSystemType)) {
if (path.startsWith("s3a://")) {
new Path(path)
} else {
val finalPath = s"s3a://${s3BucketName}/${s3RootPath}/${path}"
new Path(new URI(finalPath))
}
} else {
new Path(path)
}
}
}

object MilvusS3Option {
def apply(options: CaseInsensitiveStringMap): MilvusS3Option = {
new MilvusS3Option(
options.get(Constants.LogReaderTypeParamName),
options.get(Constants.S3FileSystemTypeName),
options.getOrDefault(Constants.S3BucketName, "a-bucket"),
options.getOrDefault(Constants.S3RootPath, "files"),
options.getOrDefault(Constants.S3Endpoint, "localhost:9000"),
options.getOrDefault(Constants.S3AccessKey, "minioadmin"),
options.getOrDefault(Constants.S3SecretKey, "minioadmin"),
options.getOrDefault(Constants.S3UseSSL, "false").toBoolean,
options.getOrDefault(Constants.S3PathStyleAccess, "true").toBoolean,
options.getOrDefault(MilvusOption.MilvusCollectionPKType, ""),
options.getOrDefault(Constants.S3MaxConnections, "32").toInt,
options.getOrDefault(Constants.S3PreloadPoolSize, "4").toInt
)
}
}
32 changes: 0 additions & 32 deletions src/main/scala/binlog/LogObject.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,38 +40,6 @@ object Constants {
val LogReaderTypeDelete = "delete"
val LogReaderFieldIDs = "reader.field.ids"

// s3 config
val S3FileSystemTypeName = "s3.fs" // default: s3a://
val S3Endpoint = "s3.endpoint"
val S3BucketName = "s3.bucket"
val S3RootPath = "s3.rootPath"
val S3AccessKey = "s3.user"
val S3SecretKey = "s3.password"
val S3UseSSL = "s3.useSSL"
val S3PathStyleAccess = "s3.pathStyleAccess"
val S3MaxConnections = "s3.maxConnections"
val S3PreloadPoolSize = "s3.preloadPoolSize"

// FFI (Storage V2) filesystem property keys
val FsAddress = "fs.address"
val FsBucketName = "fs.bucket_name"
val FsAccessKeyId = "fs.access_key_id"
val FsAccessKeyValue = "fs.access_key_value"
val FsRootPath = "fs.root_path"
val FsStorageType = "fs.storage_type"
val FsCloudProvider = "fs.cloud_provider"
val FsIamEndpoint = "fs.iam_endpoint"
val FsLogLevel = "fs.log_level"
val FsRegion = "fs.region"
val FsUseSSL = "fs.use_ssl"
val FsSslCaCert = "fs.ssl_ca_cert"
val FsUseIam = "fs.use_iam"
val FsUseVirtualHost = "fs.use_virtual_host"
val FsRequestTimeoutMs = "fs.request_timeout_ms"
val FsGcpNativeWithoutAuth = "fs.gcp_native_without_auth"
val FsGcpCredentialJson = "fs.gcp_credential_json"
val FsUseCustomPartUpload = "fs.use_custom_part_upload"

val TimestampFieldID = "1"

def readMagicNumber(buffer: ByteBuffer) = {
Expand Down
Loading