diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJobExecutor.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJobExecutor.scala index 83f62c6eb..4afbb1c03 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJobExecutor.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJobExecutor.scala @@ -108,6 +108,15 @@ trait FlintJobExecutor { } }""".stripMargin + // Fast refresh index setting for OpenSearch index. Eliminates index refresh as a source + // of latency for ad-hoc queries. + val resultIndexSettings = + """{ + "index": { + "refresh_interval": "1s" + } + }""".stripMargin + // Define the data schema val schema = StructType( Seq( @@ -199,7 +208,7 @@ trait FlintJobExecutor { if (osClient.doesIndexExist(resultIndex)) { writeData(resultData, resultIndex, refreshPolicy) } else { - createResultIndex(osClient, resultIndex, resultIndexMapping) + createResultIndex(osClient, resultIndex, resultIndexMapping, resultIndexSettings) writeData(resultData, resultIndex, refreshPolicy) } } @@ -375,7 +384,7 @@ trait FlintJobExecutor { case e: IllegalStateException if e.getCause != null && e.getCause.getMessage.contains("index_not_found_exception") => - createResultIndex(osClient, resultIndex, resultIndexMapping) + createResultIndex(osClient, resultIndex, resultIndexMapping, resultIndexSettings) case e: InterruptedException => val error = s"Interrupted by the main thread: ${e.getMessage}" Thread.currentThread().interrupt() // Preserve the interrupt status @@ -391,10 +400,11 @@ trait FlintJobExecutor { def createResultIndex( osClient: OSClient, resultIndex: String, - mapping: String): Either[String, Unit] = { + mapping: String, + settings: String): Either[String, Unit] = { try { logInfo(s"create $resultIndex") - osClient.createIndex(resultIndex, mapping) + osClient.createIndex(resultIndex, mapping, settings) logInfo(s"create $resultIndex successfully") Right(()) } catch { diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/OSClient.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/OSClient.scala index 422cfc947..75f2053c4 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/OSClient.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/OSClient.scala @@ -61,16 +61,18 @@ class OSClient(val flintOptions: FlintOptions) extends Logging { * the name of the index * @param mapping * the mapping of the index + * @param settings + * * the index settings as a JSON string * @return * use Either for representing success or failure. A Right value indicates success, while a * Left value indicates an error. */ - def createIndex(osIndexName: String, mapping: String): Unit = { + def createIndex(osIndexName: String, mapping: String, settings: String): Unit = { logInfo(s"create $osIndexName") using(flintClient.createClient()) { client => val request = new CreateIndexRequest(osIndexName) - request.mapping(mapping, XContentType.JSON) + request.mapping(mapping, XContentType.JSON).settings(settings, XContentType.JSON) try { client.createIndex(request, RequestOptions.DEFAULT)