Skip to content

Commit 41dc2d2

Browse files
author
fangbo.0511
committed
Add columns back fill for spark 3.4
1 parent b4fe441 commit 41dc2d2

File tree

13 files changed

+420
-103
lines changed

13 files changed

+420
-103
lines changed

lance-spark-3.4_2.12/pom.xml

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,18 @@
3131
<artifactId>lance-spark-base_${scala.compat.version}</artifactId>
3232
<version>${lance-spark.version}</version>
3333
</dependency>
34+
<dependency>
35+
<groupId>org.antlr</groupId>
36+
<artifactId>antlr4</artifactId>
37+
<version>${antlr4.version}</version>
38+
<scope>provided</scope>
39+
</dependency>
40+
<dependency>
41+
<groupId>org.antlr</groupId>
42+
<artifactId>antlr4-runtime</artifactId>
43+
<version>${antlr4.version}</version>
44+
<scope>provided</scope>
45+
</dependency>
3446
<dependency>
3547
<groupId>com.lancedb</groupId>
3648
<artifactId>lance-spark-base_${scala.compat.version}</artifactId>
@@ -56,6 +68,22 @@
5668
</testResource>
5769
</testResources>
5870
<plugins>
71+
<plugin>
72+
<groupId>org.antlr</groupId>
73+
<artifactId>antlr4-maven-plugin</artifactId>
74+
<executions>
75+
<execution>
76+
<goals>
77+
<goal>antlr4</goal>
78+
</goals>
79+
</execution>
80+
</executions>
81+
<configuration>
82+
<visitor>true</visitor>
83+
<listener>true</listener>
84+
<sourceDirectory>../lance-spark-base_2.12/src/main/antlr4</sourceDirectory>
85+
</configuration>
86+
</plugin>
5987
<plugin>
6088
<groupId>org.codehaus.mojo</groupId>
6189
<artifactId>build-helper-maven-plugin</artifactId>
@@ -71,6 +99,8 @@
7199
<sources>
72100
<source>../lance-spark-base_2.12/src/main/java</source>
73101
<source>src/main/java</source>
102+
<source>src/main/scala</source>
103+
<source>${project.build.directory}/generated-sources/antlr4</source>
74104
</sources>
75105
</configuration>
76106
</execution>
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.lancedb.lance.spark.extensions
15+
16+
import org.apache.spark.sql.SparkSessionExtensions
17+
import org.apache.spark.sql.catalyst.parser.extensions.LanceSparkSqlExtensionsParser
18+
import org.apache.spark.sql.execution.datasources.v2.ExtendedDataSourceV2Strategy
19+
20+
class LanceSparkSessionExtensions extends (SparkSessionExtensions => Unit) {
21+
22+
override def apply(extensions: SparkSessionExtensions): Unit = {
23+
// parser extensions
24+
extensions.injectParser { case (_, parser) => new LanceSparkSqlExtensionsParser(parser) }
25+
26+
extensions.injectPlannerStrategy(ExtendedDataSourceV2Strategy(_))
27+
}
28+
}
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package org.apache.spark.sql.catalyst.parser.extensions
15+
16+
import org.antlr.v4.runtime._
17+
import org.antlr.v4.runtime.atn.PredictionMode
18+
import org.antlr.v4.runtime.misc.{Interval, ParseCancellationException}
19+
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
20+
import org.apache.spark.sql.catalyst.expressions.Expression
21+
import org.apache.spark.sql.catalyst.parser.ParserInterface
22+
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
23+
import org.apache.spark.sql.types.{DataType, StructType}
24+
25+
class LanceSparkSqlExtensionsParser(delegate: ParserInterface) extends ParserInterface {
26+
27+
private lazy val astBuilder = new LanceSqlExtensionsAstBuilder(delegate)
28+
29+
/**
30+
* Parse a string to a DataType.
31+
*/
32+
override def parseDataType(sqlText: String): DataType = {
33+
delegate.parseDataType(sqlText)
34+
}
35+
36+
/**
37+
* Parse a string to a raw DataType without CHAR/VARCHAR replacement.
38+
*/
39+
def parseRawDataType(sqlText: String): DataType = throw new UnsupportedOperationException()
40+
41+
/**
42+
* Parse a string to an Expression.
43+
*/
44+
override def parseExpression(sqlText: String): Expression = {
45+
delegate.parseExpression(sqlText)
46+
}
47+
48+
/**
49+
* Parse a string to a TableIdentifier.
50+
*/
51+
override def parseTableIdentifier(sqlText: String): TableIdentifier = {
52+
delegate.parseTableIdentifier(sqlText)
53+
}
54+
55+
/**
56+
* Parse a string to a FunctionIdentifier.
57+
*/
58+
override def parseFunctionIdentifier(sqlText: String): FunctionIdentifier = {
59+
delegate.parseFunctionIdentifier(sqlText)
60+
}
61+
62+
/**
63+
* Parse a string to a multi-part identifier.
64+
*/
65+
override def parseMultipartIdentifier(sqlText: String): Seq[String] = {
66+
delegate.parseMultipartIdentifier(sqlText)
67+
}
68+
69+
/**
70+
* Creates StructType for a given SQL string, which is a comma separated list of field
71+
* definitions which will preserve the correct Hive metadata.
72+
*/
73+
override def parseTableSchema(sqlText: String): StructType = {
74+
delegate.parseTableSchema(sqlText)
75+
}
76+
77+
/**
78+
* Parse a string to a LogicalPlan.
79+
*/
80+
override def parsePlan(sqlText: String): LogicalPlan = {
81+
try {
82+
delegate.parsePlan(sqlText)
83+
} catch {
84+
case _: Exception => parse(sqlText)
85+
}
86+
}
87+
88+
override def parseQuery(sqlText: String): LogicalPlan = {
89+
delegate.parsePlan(sqlText)
90+
}
91+
92+
protected def parse(command: String): LogicalPlan = {
93+
val lexer =
94+
new LanceSqlExtensionsLexer(new UpperCaseCharStream(CharStreams.fromString(command)))
95+
lexer.removeErrorListeners()
96+
97+
val tokenStream = new CommonTokenStream(lexer)
98+
val parser = new LanceSqlExtensionsParser(tokenStream)
99+
parser.removeErrorListeners()
100+
101+
try {
102+
// first, try parsing with potentially faster SLL mode
103+
parser.getInterpreter.setPredictionMode(PredictionMode.SLL)
104+
astBuilder.visit(parser.singleStatement()).asInstanceOf[LogicalPlan]
105+
} catch {
106+
case _: ParseCancellationException =>
107+
// if we fail, parse with LL mode
108+
tokenStream.seek(0) // rewind input stream
109+
parser.reset()
110+
111+
// Try Again.
112+
parser.getInterpreter.setPredictionMode(PredictionMode.LL)
113+
astBuilder.visit(parser.singleStatement()).asInstanceOf[LogicalPlan]
114+
}
115+
}
116+
}
117+
118+
/* Copied from Apache Spark's to avoid dependency on Spark Internals */
119+
class UpperCaseCharStream(wrapped: CodePointCharStream) extends CharStream {
120+
override def consume(): Unit = wrapped.consume
121+
122+
override def getSourceName(): String = wrapped.getSourceName
123+
124+
override def index(): Int = wrapped.index
125+
126+
override def mark(): Int = wrapped.mark
127+
128+
override def release(marker: Int): Unit = wrapped.release(marker)
129+
130+
override def seek(where: Int): Unit = wrapped.seek(where)
131+
132+
override def size(): Int = wrapped.size
133+
134+
override def getText(interval: Interval): String = wrapped.getText(interval)
135+
136+
// scalastyle:off
137+
override def LA(i: Int): Int = {
138+
val la = wrapped.LA(i)
139+
if (la == 0 || la == IntStream.EOF) la
140+
else Character.toUpperCase(la)
141+
}
142+
// scalastyle:on
143+
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package org.apache.spark.sql.catalyst.parser.extensions
15+
16+
import org.apache.spark.sql.catalyst.analysis.{UnresolvedIdentifier, UnresolvedRelation}
17+
import org.apache.spark.sql.catalyst.parser.ParserInterface
18+
import org.apache.spark.sql.catalyst.plans.logical.{AddColumnsBackfill, LogicalPlan}
19+
20+
import scala.jdk.CollectionConverters._
21+
22+
class LanceSqlExtensionsAstBuilder(delegate: ParserInterface)
23+
extends LanceSqlExtensionsBaseVisitor[AnyRef] {
24+
25+
override def visitSingleStatement(ctx: LanceSqlExtensionsParser.SingleStatementContext)
26+
: LogicalPlan = {
27+
visit(ctx.statement).asInstanceOf[LogicalPlan]
28+
}
29+
30+
override def visitAddColumnsBackfill(ctx: LanceSqlExtensionsParser.AddColumnsBackfillContext)
31+
: AddColumnsBackfill = {
32+
val table = UnresolvedIdentifier(visitMultipartIdentifier(ctx.multipartIdentifier()))
33+
val columnNames = visitColumnList(ctx.columnList())
34+
val source = UnresolvedRelation(Seq(ctx.identifier().getText))
35+
AddColumnsBackfill(table, columnNames, source)
36+
}
37+
38+
override def visitMultipartIdentifier(ctx: LanceSqlExtensionsParser.MultipartIdentifierContext)
39+
: Seq[String] = {
40+
ctx.parts.asScala.map(_.getText).toSeq
41+
}
42+
43+
/**
44+
* Visit identifier list.
45+
*/
46+
override def visitColumnList(ctx: LanceSqlExtensionsParser.ColumnListContext): Seq[String] = {
47+
ctx.columns.asScala.map(_.getText).toSeq
48+
}
49+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.lancedb.lance.spark.update;
15+
16+
public class AddColumnsBackfillTest extends BaseAddColumnsBackfillTest {
17+
// All test methods are inherited from BaseAddColumnsBackfillTest
18+
}

lance-spark-3.4_2.13/pom.xml

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,18 @@
3131
<artifactId>lance-spark-base_${scala.compat.version}</artifactId>
3232
<version>${lance-spark.version}</version>
3333
</dependency>
34+
<dependency>
35+
<groupId>org.antlr</groupId>
36+
<artifactId>antlr4</artifactId>
37+
<version>${antlr4.version}</version>
38+
<scope>provided</scope>
39+
</dependency>
40+
<dependency>
41+
<groupId>org.antlr</groupId>
42+
<artifactId>antlr4-runtime</artifactId>
43+
<version>${antlr4.version}</version>
44+
<scope>provided</scope>
45+
</dependency>
3446
<dependency>
3547
<groupId>com.lancedb</groupId>
3648
<artifactId>lance-spark-base_${scala.compat.version}</artifactId>
@@ -61,6 +73,22 @@
6173
</testResource>
6274
</testResources>
6375
<plugins>
76+
<plugin>
77+
<groupId>org.antlr</groupId>
78+
<artifactId>antlr4-maven-plugin</artifactId>
79+
<executions>
80+
<execution>
81+
<goals>
82+
<goal>antlr4</goal>
83+
</goals>
84+
</execution>
85+
</executions>
86+
<configuration>
87+
<visitor>true</visitor>
88+
<listener>true</listener>
89+
<sourceDirectory>../lance-spark-base_2.12/src/main/antlr4</sourceDirectory>
90+
</configuration>
91+
</plugin>
6492
<plugin>
6593
<groupId>org.codehaus.mojo</groupId>
6694
<artifactId>build-helper-maven-plugin</artifactId>
@@ -76,6 +104,8 @@
76104
<sources>
77105
<source>../lance-spark-base_2.12/src/main/java</source>
78106
<source>../lance-spark-3.4_2.12/src/main/java</source>
107+
<source>../lance-spark-3.4_2.12/src/main/scala</source>
108+
<source>${project.build.directory}/generated-sources/antlr4</source>
79109
</sources>
80110
</configuration>
81111
</execution>

lance-spark-3.5_2.12/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@
8181
<configuration>
8282
<visitor>true</visitor>
8383
<listener>true</listener>
84-
<sourceDirectory>src/main/antlr4</sourceDirectory>
84+
<sourceDirectory>../lance-spark-base_2.12/src/main/antlr4</sourceDirectory>
8585
</configuration>
8686
</plugin>
8787
<plugin>

0 commit comments

Comments
 (0)