Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding fast refresh setting during index creation #1074

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -108,6 +108,15 @@ trait FlintJobExecutor {
}
}""".stripMargin

// Fast refresh index setting for OpenSearch index. Eliminates index refresh as a source
// of latency for ad-hoc queries.
val resultIndexSettings =
"""{
"index": {
"refresh_interval": "1s"
}
}""".stripMargin

// Define the data schema
val schema = StructType(
Seq(
@@ -199,7 +208,7 @@ trait FlintJobExecutor {
if (osClient.doesIndexExist(resultIndex)) {
writeData(resultData, resultIndex, refreshPolicy)
} else {
createResultIndex(osClient, resultIndex, resultIndexMapping)
createResultIndex(osClient, resultIndex, resultIndexMapping, resultIndexSettings)
writeData(resultData, resultIndex, refreshPolicy)
}
}
@@ -375,7 +384,7 @@ trait FlintJobExecutor {
case e: IllegalStateException
if e.getCause != null &&
e.getCause.getMessage.contains("index_not_found_exception") =>
createResultIndex(osClient, resultIndex, resultIndexMapping)
createResultIndex(osClient, resultIndex, resultIndexMapping, resultIndexSettings)
case e: InterruptedException =>
val error = s"Interrupted by the main thread: ${e.getMessage}"
Thread.currentThread().interrupt() // Preserve the interrupt status
@@ -391,10 +400,11 @@ trait FlintJobExecutor {
def createResultIndex(
osClient: OSClient,
resultIndex: String,
mapping: String): Either[String, Unit] = {
mapping: String,
settings: String): Either[String, Unit] = {
try {
logInfo(s"create $resultIndex")
osClient.createIndex(resultIndex, mapping)
osClient.createIndex(resultIndex, mapping, settings)
logInfo(s"create $resultIndex successfully")
Right(())
} catch {
Original file line number Diff line number Diff line change
@@ -61,16 +61,18 @@ class OSClient(val flintOptions: FlintOptions) extends Logging {
* the name of the index
* @param mapping
* the mapping of the index
* @param settings
* * the index settings as a JSON string
* @return
* use Either for representing success or failure. A Right value indicates success, while a
* Left value indicates an error.
*/
def createIndex(osIndexName: String, mapping: String): Unit = {
def createIndex(osIndexName: String, mapping: String, settings: String): Unit = {
logInfo(s"create $osIndexName")

using(flintClient.createClient()) { client =>
val request = new CreateIndexRequest(osIndexName)
request.mapping(mapping, XContentType.JSON)
request.mapping(mapping, XContentType.JSON).settings(settings, XContentType.JSON)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what would happen if settings passed in is None?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The short answer: a similar thing that would happen if the mapping passed in is None.

This approach closely models after how OS index mappings are passed in during index creation. There is only one caller to OSClient.createIndex : FlintJobExecutor.scala. Similar to index mapping, a fixed index setting string is declared at the top of this file as an effective const. This value is then passed into the OS.createIndex call. As such, the index settings inherits this guarantee from the index mapping approach that the passed in value will not be None.


try {
client.createIndex(request, RequestOptions.DEFAULT)