Skip to content

Commit ed9f4af

Browse files
Schema evolution
1 parent 6e2085d commit ed9f4af

File tree

3 files changed

+110
-4
lines changed

3 files changed

+110
-4
lines changed

sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,10 @@ abstract class InMemoryBaseTable(
7575

7676
override def columns(): Array[Column] = tableColumns
7777

78+
private[catalog] def updateColumns(newColumns: Array[Column]): Unit = {
79+
tableColumns = newColumns
80+
}
81+
7882
override def version(): String = tableVersion.toString
7983

8084
def setVersion(version: String): Unit = {

sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/txns.scala

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -71,9 +71,9 @@ class TxnTable(val delegate: InMemoryRowLevelOperationTable)
7171
delegate.properties,
7272
delegate.constraints) {
7373

74-
// TODO: Revise schema evolution.
75-
alterTableWithData(delegate.data, schema)
74+
withData(delegate.data)
7675

76+
// Keep initial version to detect any changes during the transaction.
7777
private val initialVersion: String = version()
7878

7979
// A tracker of filters used in each scan.
@@ -86,8 +86,8 @@ class TxnTable(val delegate: InMemoryRowLevelOperationTable)
8686
def commit(): Unit = {
8787
if (version() != initialVersion) {
8888
delegate.dataMap.clear()
89-
// TODO: Revise schema evolution.
90-
delegate.alterTableWithData(data, delegate.schema)
89+
delegate.alterTableWithData(data, schema)
90+
delegate.updateColumns(columns()) // Evolve schema if needed.
9191
delegate.replacedPartitions = replacedPartitions
9292
delegate.lastWriteInfo = lastWriteInfo
9393
delegate.lastWriteLog = lastWriteLog
@@ -124,6 +124,10 @@ class TxnTableCatalog(delegate: InMemoryRowLevelOperationTableCatalog) extends T
124124
}
125125

126126
override def alterTable(ident: Identifier, changes: TableChange*): Table = {
127+
// TODO: This evicts the staged TxnTable, losing any in-flight DML changes. The correct
128+
// approach is to apply only the schema change to the existing TxnTable so that the ongoing
129+
// DML can observe the new schema and reconcile at commit time. Concurrent DDL + DML is not
130+
// supported in this test catalog for now.
127131
val newDelegateTable = delegate.alterTable(ident, changes: _*)
128132
tables.remove(ident) // Load again.
129133
newDelegateTable

sql/core/src/test/scala/org/apache/spark/sql/connector/AppendDataTransactionSuite.scala

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -399,4 +399,102 @@ class AppendDataTransactionSuite extends RowLevelOperationSuiteBase {
399399
Row(1, 100, "hr"),
400400
Row(2, 200, "software")))
401401
}
402+
403+
test("SQL INSERT WITH SCHEMA EVOLUTION adds new column with transactional checks") {
404+
createAndInitTable("pk INT NOT NULL, salary INT, dep STRING",
405+
"""{ "pk": 1, "salary": 100, "dep": "hr" }
406+
|{ "pk": 2, "salary": 200, "dep": "software" }
407+
|""".stripMargin)
408+
409+
sql(
410+
s"""CREATE TABLE $sourceNameAsString
411+
|(pk INT NOT NULL, salary INT, dep STRING, active BOOLEAN)""".stripMargin)
412+
sql(s"INSERT INTO $sourceNameAsString VALUES (3, 300, 'hr', true), (4, 400, 'software', false)")
413+
414+
val (txn, txnTables) = executeTransaction {
415+
sql(s"INSERT WITH SCHEMA EVOLUTION INTO $tableNameAsString SELECT * FROM $sourceNameAsString")
416+
}
417+
418+
assert(txn.currentState === Committed)
419+
assert(txn.isClosed)
420+
421+
// the new column must be visible in the committed delegate's schema
422+
assert(table.schema.fieldNames.toSeq === Seq("pk", "salary", "dep", "active"))
423+
424+
checkAnswer(
425+
sql(s"SELECT * FROM $tableNameAsString"),
426+
Seq(
427+
Row(1, 100, "hr", null), // pre-existing rows: active is null
428+
Row(2, 200, "software", null),
429+
Row(3, 300, "hr", true), // inserted with active
430+
Row(4, 400, "software", false)))
431+
}
432+
433+
for (isDynamic <- Seq(false, true))
434+
test(s"SQL INSERT OVERWRITE WITH SCHEMA EVOLUTION adds new column with transactional checks " +
435+
s"isDynamic: $isDynamic") {
436+
createAndInitTable("pk INT NOT NULL, salary INT, dep STRING",
437+
"""{ "pk": 1, "salary": 100, "dep": "hr" }
438+
|{ "pk": 2, "salary": 200, "dep": "software" }
439+
|{ "pk": 3, "salary": 300, "dep": "hr" }
440+
|""".stripMargin)
441+
442+
sql(
443+
s"""CREATE TABLE $sourceNameAsString
444+
|(pk INT NOT NULL, salary INT, dep STRING, active BOOLEAN)""".stripMargin)
445+
sql(s"INSERT INTO $sourceNameAsString VALUES (11, 999, 'hr', true), (12, 888, 'hr', false)")
446+
447+
val insertOverwrite = if (isDynamic) {
448+
s"""INSERT WITH SCHEMA EVOLUTION OVERWRITE TABLE $tableNameAsString
449+
|SELECT * FROM $sourceNameAsString
450+
|""".stripMargin
451+
} else {
452+
s"""INSERT WITH SCHEMA EVOLUTION OVERWRITE TABLE $tableNameAsString
453+
|PARTITION (dep = 'hr')
454+
|SELECT pk, salary, active FROM $sourceNameAsString
455+
|""".stripMargin
456+
}
457+
458+
val confValue = if (isDynamic) PartitionOverwriteMode.DYNAMIC else PartitionOverwriteMode.STATIC
459+
val (txn, _) = withSQLConf(SQLConf.PARTITION_OVERWRITE_MODE.key -> confValue.toString) {
460+
executeTransaction { sql(insertOverwrite) }
461+
}
462+
463+
assert(txn.currentState === Committed)
464+
assert(txn.isClosed)
465+
466+
// the new column must be visible in the committed delegate's schema
467+
assert(table.schema.fieldNames.contains("active"))
468+
469+
checkAnswer(
470+
sql(s"SELECT * FROM $tableNameAsString"),
471+
Seq(
472+
Row(2, 200, "software", null), // unchanged (different partition)
473+
Row(11, 999, "hr", true), // overwrote hr partition
474+
Row(12, 888, "hr", false)))
475+
}
476+
477+
test("SQL INSERT WITH SCHEMA EVOLUTION analysis failure aborts transaction") {
478+
createAndInitTable("pk INT NOT NULL, salary INT, dep STRING",
479+
"""{ "pk": 1, "salary": 100, "dep": "hr" }
480+
|{ "pk": 2, "salary": 200, "dep": "software" }
481+
|""".stripMargin)
482+
483+
sql(
484+
s"""CREATE TABLE $sourceNameAsString
485+
|(pk INT NOT NULL, salary INT, dep STRING, active BOOLEAN)""".stripMargin)
486+
487+
val e = intercept[AnalysisException] {
488+
sql(
489+
s"""INSERT WITH SCHEMA EVOLUTION INTO $tableNameAsString
490+
|SELECT nonexistent_col FROM $sourceNameAsString
491+
|""".stripMargin)
492+
}
493+
494+
assert(e.getMessage.contains("nonexistent_col"))
495+
assert(catalog.lastTransaction.currentState === Aborted)
496+
assert(catalog.lastTransaction.isClosed)
497+
// schema must be unchanged after the aborted transaction
498+
assert(table.schema.fieldNames.toSeq === Seq("pk", "salary", "dep"))
499+
}
402500
}

0 commit comments

Comments
 (0)