File tree 1 file changed +11
-0
lines changed
spark-sql-application/src/main/scala/org/apache/spark/sql
1 file changed +11
-0
lines changed Original file line number Diff line number Diff line change @@ -17,6 +17,7 @@ import org.opensearch.flint.common.scheduler.model.LangType
17
17
import org .opensearch .flint .core .metrics .{MetricConstants , MetricsSparkListener , MetricsUtil }
18
18
import org .opensearch .flint .spark .FlintSpark
19
19
20
+ import org .apache .spark .SparkUpgradeException
20
21
import org .apache .spark .internal .Logging
21
22
import org .apache .spark .sql .flint .config .FlintSparkConf
22
23
import org .apache .spark .sql .util .ShuffleCleaner
@@ -153,6 +154,11 @@ case class JobOperator(
153
154
}
154
155
})
155
156
} catch {
157
+ case e : SparkUpgradeException =>
158
+ incrementCounter(MetricConstants .RESULT_WRITER_FAILED_METRIC )
159
+ throwableHandler.recordThrowable(
160
+ s " Failed to write to result. Cause=' ${sanitizeSparkUpgradeErrorMessage(e.getMessage)}' " ,
161
+ e)
156
162
case t : Throwable =>
157
163
incrementCounter(MetricConstants .RESULT_WRITER_FAILED_METRIC )
158
164
throwableHandler.recordThrowable(
@@ -320,6 +326,11 @@ case class JobOperator(
320
326
metricName
321
327
}
322
328
329
+ private def sanitizeSparkUpgradeErrorMessage (errorMessage : String ): String = {
330
+ val pattern = " You may get a different result due to the upgrading to Spark[^\\ n]*:\\ n" .r
331
+ pattern.replaceAllIn(errorMessage, " " ).trim
332
+ }
333
+
323
334
private def incrementCounter (metricName : String ): Unit = {
324
335
MetricsUtil .incrementCounter(resolveMetricName(metricName))
325
336
}
You can’t perform that action at this time.
0 commit comments