Skip to content

Commit 961d5a3

Browse files
committed
Update FlintSpark to use progress trackers
Signed-off-by: Simeon Widdis <[email protected]>
1 parent 3ae335b commit 961d5a3

File tree

12 files changed

+85
-42
lines changed

12 files changed

+85
-42
lines changed

flint-commons/src/main/scala/org/opensearch/flint/common/metadata/FlintMetadata.scala

+11-2
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,9 @@ case class FlintMetadata(
4141
*/
4242
latestLogEntry: Option[FlintMetadataLogEntry] = None,
4343
/** Optional Flint index settings. TODO: move elsewhere? */
44-
indexSettings: Option[String]) {
44+
indexSettings: Option[String],
45+
/** Information on the current progress of the index job */
46+
currentProgress: Option[Map[String, Double]]) {
4547

4648
require(version != null, "version is required")
4749
require(name != null, "name is required")
@@ -69,6 +71,7 @@ object FlintMetadata {
6971
private var latestId: Option[String] = None
7072
private var latestLogEntry: Option[FlintMetadataLogEntry] = None
7173
private var indexSettings: Option[String] = None
74+
private var currentProgress: Option[Map[String, Double]] = None
7275

7376
def version(version: FlintVersion): this.type = {
7477
this.version = version
@@ -131,6 +134,11 @@ object FlintMetadata {
131134
this
132135
}
133136

137+
def currentProgress(currentProgress: Map[String, Double]): this.type = {
138+
this.currentProgress = Some(currentProgress)
139+
this
140+
}
141+
134142
// Build method to create the FlintMetadata instance
135143
def build(): FlintMetadata = {
136144
FlintMetadata(
@@ -144,7 +152,8 @@ object FlintMetadata {
144152
schema = schema,
145153
indexSettings = indexSettings,
146154
latestId = latestId,
147-
latestLogEntry = latestLogEntry)
155+
latestLogEntry = latestLogEntry,
156+
currentProgress = currentProgress)
148157
}
149158
}
150159
}

flint-core/src/main/scala/org/opensearch/flint/core/metrics/ProgressListener.scala

+11-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,17 @@ package org.opensearch.flint.core.metrics
77

88
import org.apache.spark.scheduler.{SparkListenerStageSubmitted, SparkListenerTaskEnd}
99

10-
case class Progress(bytesCompleted: Long, estimatedBytesTotal: Double, estimatedBytesPerSecond: Double) {}
10+
case class Progress(
11+
bytesCompleted: Long,
12+
estimatedBytesTotal: Double,
13+
estimatedBytesPerSecond: Double) {
14+
def asMap(): Map[String, Double] = {
15+
Map(
16+
("bytes_read", bytesCompleted.asInstanceOf[Double]),
17+
("est_bytes_total", estimatedBytesTotal),
18+
("est_bytes_per_second", estimatedBytesPerSecond))
19+
}
20+
}
1121

1222
/**
1323
* Collect and emit metrics by listening spark events

flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala

+7-2
Original file line numberDiff line numberDiff line change
@@ -556,11 +556,16 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w
556556
.transientLog(latest => {
557557
val currentTime = System.currentTimeMillis()
558558
val updatedLatest = latest
559-
.copy(state = REFRESHING, createTime = currentTime, lastRefreshStartTime = currentTime, progress = indexRefresh.progress())
559+
.copy(state = REFRESHING, createTime = currentTime, lastRefreshStartTime = currentTime)
560560
flintMetadataCacheWriter
561561
.updateMetadataCache(
562562
indexName,
563-
index.metadata.copy(latestLogEntry = Some(updatedLatest)))
563+
index.metadata.copy(
564+
latestLogEntry = Some(updatedLatest),
565+
currentProgress = indexRefresh.progress() match {
566+
case Some(p) => Some(p.asMap())
567+
case None => None
568+
}))
564569
updatedLatest
565570
})
566571
.finalLog(latest => {

flint-spark-integration/src/main/scala/org/opensearch/flint/spark/metadatacache/FlintMetadataCache.scala flint-spark-integration/src/main/scala/org/opensearch/flint/spark/metadatacache/ExportedFlintMetadata.scala

+13-8
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,17 @@ import scala.collection.JavaConverters.mapAsScalaMapConverter
99

1010
import org.opensearch.flint.common.metadata.FlintMetadata
1111
import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry
12+
import org.opensearch.flint.core.metrics.Progress
1213
import org.opensearch.flint.spark.FlintSparkIndexOptions
1314
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.{getSourceTablesFromMetadata, MV_INDEX_TYPE}
1415
import org.opensearch.flint.spark.scheduler.util.IntervalSchedulerParser
1516

1617
/**
17-
* Flint metadata cache defines metadata required to store in read cache for frontend user to
18-
* access.
18+
* Select fields that are exported as part of the mapping `_meta` object under `properties`.
19+
* Useful for providing front-end features that need specific data without needing to go through
20+
* the full async query system.
1921
*/
20-
case class FlintMetadataCache(
22+
case class ExportedFlintMetadata(
2123
metadataCacheVersion: String,
2224
/** Refresh interval for Flint index with auto refresh. Unit: seconds */
2325
refreshInterval: Option[Int],
@@ -26,7 +28,9 @@ case class FlintMetadataCache(
2628
/** Source query for MV */
2729
sourceQuery: Option[String],
2830
/** Timestamp when Flint index is last refreshed. Unit: milliseconds */
29-
lastRefreshTime: Option[Long]) {
31+
lastRefreshTime: Option[Long],
32+
/** Information on the current progress of the index job */
33+
currentProgress: Option[Map[String, Double]]) {
3034

3135
/**
3236
* Convert FlintMetadataCache to a map. Skips a field if its value is not defined.
@@ -47,11 +51,11 @@ case class FlintMetadataCache(
4751
}
4852
}
4953

50-
object FlintMetadataCache {
54+
object ExportedFlintMetadata {
5155

5256
val metadataCacheVersion = "1.0"
5357

54-
def apply(metadata: FlintMetadata): FlintMetadataCache = {
58+
def apply(metadata: FlintMetadata): ExportedFlintMetadata = {
5559
val indexOptions = FlintSparkIndexOptions(
5660
metadata.options.asScala.mapValues(_.asInstanceOf[String]).toMap)
5761
val refreshInterval = if (indexOptions.autoRefresh()) {
@@ -76,11 +80,12 @@ object FlintMetadataCache {
7680
case timestamp => Some(timestamp)
7781
}
7882
}
79-
FlintMetadataCache(
83+
ExportedFlintMetadata(
8084
metadataCacheVersion,
8185
refreshInterval,
8286
sourceTables,
8387
sourceQuery,
84-
lastRefreshTime)
88+
lastRefreshTime,
89+
None)
8590
}
8691
}

flint-spark-integration/src/main/scala/org/opensearch/flint/spark/metadatacache/FlintMetadataCacheWriter.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ package org.opensearch.flint.spark.metadatacache
88
import org.opensearch.flint.common.metadata.{FlintIndexMetadataService, FlintMetadata}
99

1010
/**
11-
* Writes {@link FlintMetadataCache} to a storage of choice. This is different from {@link
11+
* Writes {@link ExportedFlintMetadata} to a storage of choice. This is different from {@link
1212
* FlintIndexMetadataService} which persists the full index metadata to a storage for single
1313
* source of truth.
1414
*/

flint-spark-integration/src/main/scala/org/opensearch/flint/spark/metadatacache/FlintOpenSearchMetadataCacheWriter.scala

+3-2
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ import org.opensearch.flint.core.storage.OpenSearchClientUtils
2121
import org.apache.spark.internal.Logging
2222

2323
/**
24-
* Writes {@link FlintMetadataCache} to index mappings `_meta` field for frontend user to access.
24+
* Writes {@link ExportedFlintMetadata} to index mappings `_meta` field for frontend user to
25+
* access.
2526
*/
2627
class FlintOpenSearchMetadataCacheWriter(options: FlintOptions)
2728
extends FlintMetadataCacheWriter
@@ -34,7 +35,7 @@ class FlintOpenSearchMetadataCacheWriter(options: FlintOptions)
3435
try {
3536
client = OpenSearchClientUtils.createClient(options)
3637
val indexMapping = getIndexMapping(client, osIndexName)
37-
val metadataCacheProperties = FlintMetadataCache(metadata).toMap.asJava
38+
val metadataCacheProperties = ExportedFlintMetadata(metadata).toMap.asJava
3839
mergeMetadataCacheProperties(indexMapping, metadataCacheProperties)
3940
val serialized = buildJson(builder => {
4041
builder.field("_meta", indexMapping.get("_meta"))

flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/AutoIndexRefresh.scala

+17-14
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,12 @@
66
package org.opensearch.flint.spark.refresh
77

88
import java.util.Collections
9-
import org.opensearch.flint.core.metrics.{MetricsSparkListener, ProgressListener, WithSparkListeners}
9+
10+
import org.opensearch.flint.core.metrics.{MetricsSparkListener, Progress, ProgressListener, WithSparkListeners}
1011
import org.opensearch.flint.spark.{FlintSparkIndex, FlintSparkIndexOptions, FlintSparkValidationHelper}
11-
import org.opensearch.flint.spark.FlintSparkIndex.{StreamingRefresh, quotedTableName}
12+
import org.opensearch.flint.spark.FlintSparkIndex.{quotedTableName, StreamingRefresh}
1213
import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.RefreshMode.{AUTO, RefreshMode}
14+
1315
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
1416
import org.apache.spark.sql.flint.FlintDataSourceV2.FLINT_DATASOURCE
1517
import org.apache.spark.sql.flint.config.FlintSparkConf
@@ -29,6 +31,7 @@ class AutoIndexRefresh(indexName: String, index: FlintSparkIndex)
2931
with FlintSparkValidationHelper {
3032

3133
override def refreshMode: RefreshMode = AUTO
34+
val progressTracker: ProgressListener = ProgressListener()
3235

3336
override def validate(spark: SparkSession): Unit = {
3437
// Incremental refresh cannot enabled at the same time
@@ -66,18 +69,16 @@ class AutoIndexRefresh(indexName: String, index: FlintSparkIndex)
6669
// Flint index has specialized logic and capability for incremental refresh
6770
case refresh: StreamingRefresh =>
6871
logInfo("Start refreshing index in streaming style")
69-
val jobContext = WithSparkListeners(spark, List(MetricsSparkListener(), ProgressListener()))
70-
71-
val job = jobContext.run(() =>
72-
refresh
73-
.buildStream(spark)
74-
.writeStream
75-
.queryName(indexName)
76-
.format(FLINT_DATASOURCE)
77-
.options(flintSparkConf.properties)
78-
.addSinkOptions(options, flintSparkConf)
79-
.start(indexName))
80-
val progress = jobContext.listeners(1).asInstanceOf[ProgressListener]
72+
val job =
73+
WithSparkListeners(spark, List(MetricsSparkListener(), progressTracker)).run(() =>
74+
refresh
75+
.buildStream(spark)
76+
.writeStream
77+
.queryName(indexName)
78+
.format(FLINT_DATASOURCE)
79+
.options(flintSparkConf.properties)
80+
.addSinkOptions(options, flintSparkConf)
81+
.start(indexName))
8182
Some(job.id.toString)
8283

8384
// Otherwise, fall back to foreachBatch + batch refresh
@@ -99,6 +100,8 @@ class AutoIndexRefresh(indexName: String, index: FlintSparkIndex)
99100
}
100101
}
101102

103+
override def progress(): Option[Progress] = Some(progressTracker.currentProgress())
104+
102105
// Using Scala implicit class to avoid breaking method chaining of Spark data frame fluent API
103106
private implicit class FlintDataStreamWriter(val dataStream: DataStreamWriter[Row]) {
104107

flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/FlintSparkIndexRefresh.scala

+5-4
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,13 @@
55

66
package org.opensearch.flint.spark.refresh
77

8+
import org.opensearch.flint.core.metrics.Progress
89
import org.opensearch.flint.spark.FlintSparkIndex
910
import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.RefreshMode.RefreshMode
11+
1012
import org.apache.spark.internal.Logging
1113
import org.apache.spark.sql.SparkSession
1214
import org.apache.spark.sql.flint.config.FlintSparkConf
13-
import org.opensearch.flint.core.metrics.Progress
1415

1516
/**
1617
* Flint Spark index refresh that sync index data with source in style defined by concrete
@@ -53,9 +54,9 @@ trait FlintSparkIndexRefresh extends Logging {
5354
/**
5455
* For refresh types supporting progress recording, return the progress.
5556
*
56-
* Either always or never set depending on the specific implementation. If the job hasn't started yet, the progress
57-
* should be filled with zeroes, not None. This is necessary for the front-end to determine whether to render a
58-
* progress bar at all.
57+
* Either always or never set depending on the specific implementation. If the job hasn't
58+
* started yet, the progress should be Some(zeroes), not None. This is necessary for the
59+
* front-end to determine whether to render a progress bar at all.
5960
*/
6061
def progress(): Option[Progress]
6162
}

flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/FullIndexRefresh.scala

+3
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
package org.opensearch.flint.spark.refresh
77

8+
import org.opensearch.flint.core.metrics.Progress
89
import org.opensearch.flint.spark.FlintSparkIndex
910
import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.RefreshMode.{FULL, RefreshMode}
1011

@@ -47,4 +48,6 @@ class FullIndexRefresh(
4748
.save(indexName)
4849
None
4950
}
51+
52+
override def progress(): Option[Progress] = None
5053
}

flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/IncrementalIndexRefresh.scala

+3
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
package org.opensearch.flint.spark.refresh
77

8+
import org.opensearch.flint.core.metrics.Progress
89
import org.opensearch.flint.spark.{FlintSparkIndex, FlintSparkValidationHelper}
910
import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.RefreshMode.{INCREMENTAL, RefreshMode}
1011
import org.opensearch.flint.spark.refresh.util.RefreshMetricsAspect
@@ -62,4 +63,6 @@ class IncrementalIndexRefresh(val indexName: String, index: FlintSparkIndex)
6263
None
6364
}
6465
}
66+
67+
override def progress(): Option[Progress] = None
6568
}

flint-spark-integration/src/test/scala/org/opensearch/flint/spark/metadatacache/FlintMetadataCacheSuite.scala

+8-8
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,8 @@ class FlintMetadataCacheSuite extends AnyFlatSpec with Matchers {
4646
.deserialize(content)
4747
.copy(latestLogEntry = Some(flintMetadataLogEntry))
4848

49-
val metadataCache = FlintMetadataCache(metadata)
50-
metadataCache.metadataCacheVersion shouldBe FlintMetadataCache.metadataCacheVersion
49+
val metadataCache = ExportedFlintMetadata(metadata)
50+
metadataCache.metadataCacheVersion shouldBe ExportedFlintMetadata.metadataCacheVersion
5151
metadataCache.refreshInterval.get shouldBe 600
5252
metadataCache.sourceTables shouldBe Array("spark_catalog.default.test_table")
5353
metadataCache.lastRefreshTime.get shouldBe 1234567890123L
@@ -75,8 +75,8 @@ class FlintMetadataCacheSuite extends AnyFlatSpec with Matchers {
7575
.deserialize(content)
7676
.copy(latestLogEntry = Some(flintMetadataLogEntry))
7777

78-
val metadataCache = FlintMetadataCache(metadata)
79-
metadataCache.metadataCacheVersion shouldBe FlintMetadataCache.metadataCacheVersion
78+
val metadataCache = ExportedFlintMetadata(metadata)
79+
metadataCache.metadataCacheVersion shouldBe ExportedFlintMetadata.metadataCacheVersion
8080
metadataCache.refreshInterval.get shouldBe 600
8181
metadataCache.sourceTables shouldBe Array("spark_catalog.default.test_table")
8282
metadataCache.lastRefreshTime.get shouldBe 1234567890123L
@@ -112,8 +112,8 @@ class FlintMetadataCacheSuite extends AnyFlatSpec with Matchers {
112112
.deserialize(content)
113113
.copy(latestLogEntry = Some(flintMetadataLogEntry))
114114

115-
val metadataCache = FlintMetadataCache(metadata)
116-
metadataCache.metadataCacheVersion shouldBe FlintMetadataCache.metadataCacheVersion
115+
val metadataCache = ExportedFlintMetadata(metadata)
116+
metadataCache.metadataCacheVersion shouldBe ExportedFlintMetadata.metadataCacheVersion
117117
metadataCache.refreshInterval.get shouldBe 600
118118
metadataCache.sourceTables shouldBe Array(
119119
"spark_catalog.default.test_table",
@@ -144,8 +144,8 @@ class FlintMetadataCacheSuite extends AnyFlatSpec with Matchers {
144144
.deserialize(content)
145145
.copy(latestLogEntry = Some(flintMetadataLogEntry.copy(lastRefreshCompleteTime = 0L)))
146146

147-
val metadataCache = FlintMetadataCache(metadata)
148-
metadataCache.metadataCacheVersion shouldBe FlintMetadataCache.metadataCacheVersion
147+
val metadataCache = ExportedFlintMetadata(metadata)
148+
metadataCache.metadataCacheVersion shouldBe ExportedFlintMetadata.metadataCacheVersion
149149
metadataCache.refreshInterval shouldBe empty
150150
metadataCache.sourceTables shouldBe Array("spark_catalog.default.test_table")
151151
metadataCache.sourceQuery shouldBe empty

spark-sql-application/src/main/scala/org/apache/spark/sql/JobOperator.scala

+3
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,16 @@ package org.apache.spark.sql
77

88
import java.util.concurrent.{ThreadPoolExecutor, TimeUnit}
99
import java.util.concurrent.atomic.AtomicInteger
10+
1011
import scala.concurrent.{ExecutionContext, Future, TimeoutException}
1112
import scala.concurrent.duration.{Duration, MINUTES}
1213
import scala.util.{Failure, Success, Try}
14+
1315
import org.opensearch.flint.common.model.FlintStatement
1416
import org.opensearch.flint.common.scheduler.model.LangType
1517
import org.opensearch.flint.core.metrics.{MetricConstants, MetricsSparkListener, MetricsUtil, ProgressListener}
1618
import org.opensearch.flint.spark.FlintSpark
19+
1720
import org.apache.spark.internal.Logging
1821
import org.apache.spark.sql.flint.config.FlintSparkConf
1922
import org.apache.spark.sql.util.ShuffleCleaner

0 commit comments

Comments
 (0)