Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.connector.catalog;

import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.connector.catalog.transactions.Transaction;
import org.apache.spark.sql.connector.catalog.transactions.TransactionInfo;

/**
* A {@link CatalogPlugin} that supports transactions.
* <p>
* Catalogs that implement this interface opt in to transactional query execution. A catalog
* implementing this interface is responsible for starting transactions.
*
* @since 4.2.0
*/
@Evolving
public interface TransactionalCatalogPlugin extends CatalogPlugin {

/**
* Begins a new transaction and returns a {@link Transaction} representing it.
*
* @param info metadata about the transaction being started.
*/
Transaction beginTransaction(TransactionInfo info);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.connector.catalog.transactions;

import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.connector.catalog.CatalogPlugin;
import org.apache.spark.sql.connector.catalog.TransactionalCatalogPlugin;

import java.io.Closeable;

/**
* Represents a transaction.
* <p>
* Spark begins a transaction with {@link TransactionalCatalogPlugin#beginTransaction} and
* executes read/write operations against the transaction's catalog. On success, Spark
* calls {@link #commit()}; on failure, Spark calls {@link #abort()}. In both cases Spark
* subsequently calls {@link #close()} to release resources.
*
* @since 4.2.0
*/
@Evolving
public interface Transaction extends Closeable {

/**
* Returns the catalog associated with this transaction. This catalog is responsible for tracking
* read/write operations that occur within the boundaries of a transaction. This allows
* connectors to perform conflict resolution at commit time.
*/
CatalogPlugin catalog();

/**
* Commits the transaction. All writes performed under it become visible to other readers.
* <p>
* The connector is responsible for detecting and resolving conflicting commits or throwing
* an exception if resolution is not possible.
* <p>
* This method will be called exactly once per transaction. Spark calls {@link #close()}
* immediately after this method returns.
*
* @throws IllegalStateException if the transaction has already been committed or aborted.
*/
void commit();

/**
* Aborts the transaction, discarding any staged changes.
* <p>
* This method must be idempotent. If the transaction has already been committed or aborted,
* invoking it must have no effect.
* <p>
* Spark calls {@link #close()} immediately after this method returns.
*/
void abort();

/**
* Releases any resources held by this transaction.
* <p>
* Spark always calls this method after {@link #commit()} or {@link #abort()}, regardless of
* whether those methods succeed or not.
* <p>
* This method must be idempotent. If the transaction has already been closed,
* invoking it must have no effect.
*/
@Override
void close();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.connector.catalog.transactions;

import org.apache.spark.annotation.Evolving;

/**
* Metadata about a transaction.
*
* @since 4.2.0
*/
@Evolving
public interface TransactionInfo {
/**
* Returns a unique identifier for this transaction.
*/
String id();
}
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,30 @@ class Analyzer(
}
}

/**
* Returns a copy of this analyzer that uses the given [[CatalogManager]] for all catalog
* lookups. All other configuration (extended rules, checks, etc.) is preserved. Used by
* [[QueryExecution]] to create a per-query analyzer for transactional operations for
* transaction-aware catalog resolution.
*/
def withCatalogManager(newCatalogManager: CatalogManager): Analyzer = {
val self = this
new Analyzer(newCatalogManager, sharedRelationCache) {
override val hintResolutionRules: Seq[Rule[LogicalPlan]] = self.hintResolutionRules
override val extendedResolutionRules: Seq[Rule[LogicalPlan]] = self.extendedResolutionRules
override val postHocResolutionRules: Seq[Rule[LogicalPlan]] = self.postHocResolutionRules
override val extendedCheckRules: Seq[LogicalPlan => Unit] = self.extendedCheckRules
override val singlePassResolverExtensions: Seq[ResolverExtension] =
self.singlePassResolverExtensions
override val singlePassMetadataResolverExtensions: Seq[ResolverExtension] =
self.singlePassMetadataResolverExtensions
override val singlePassPostHocResolutionRules: Seq[Rule[LogicalPlan]] =
self.singlePassPostHocResolutionRules
override val singlePassExtendedResolutionChecks: Seq[LogicalPlan => Unit] =
self.singlePassExtendedResolutionChecks
}
}

override def execute(plan: LogicalPlan): LogicalPlan = {
AnalysisContext.withNewAnalysisContext {
executeSameContext(plan)
Expand Down Expand Up @@ -437,7 +461,9 @@ class Analyzer(
Batch("Simple Sanity Check", Once,
LookupFunctions),
Batch("Keep Legacy Outputs", Once,
KeepLegacyOutputs)
KeepLegacyOutputs),
Batch("Unresolve Relations", Once,
new UnresolveTransactionRelations(catalogManager))
)

override def batches: Seq[Batch] = earlyBatches ++ Seq(
Expand Down Expand Up @@ -1005,9 +1031,10 @@ class Analyzer(
}
}

// Resolve V2TableReference nodes in a plan. V2TableReference is only created for temp views
// (via V2TableReference.createForTempView), so we only need to resolve it when returning
// the plan of temp views (in resolveViews and unwrapRelationPlan).
// Resolve V2TableReference nodes created for:
// 1 Temp views (via createForTempView).
// 2. Transaction references (via createForTransaction). These are resolved by a
// separate analysis batch in the transaction-aware analyzer instance.
private def resolveTableReferences(plan: LogicalPlan): LogicalPlan = {
plan.resolveOperatorsUp {
case r: V2TableReference => relationResolution.resolveReference(r)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,11 @@ class RelationResolution(
}

private def getOrLoadRelation(ref: V2TableReference): LogicalPlan = {
// Skip cache when a transaction is active.
if (catalogManager.transaction.isDefined) {
return loadRelation(ref)
}

val key = toCacheKey(ref.catalog, ref.identifier)
relationCache.get(key) match {
case Some(cached) =>
Expand All @@ -403,9 +408,18 @@ class RelationResolution(
}

private def loadRelation(ref: V2TableReference): LogicalPlan = {
val table = ref.catalog.loadTable(ref.identifier)
// Resolve catalog. When a transaction is active we return the transaction
// aware catalog instance.
val resolvedCatalog = catalogManager.catalog(ref.catalog.name).asTableCatalog
val table = resolvedCatalog.loadTable(ref.identifier)
V2TableReferenceUtils.validateLoadedTable(table, ref)
ref.toRelation(table)
// Create relation with resolved Catalog.
DataSourceV2Relation(
table = table,
output = ref.output,
catalog = Some(resolvedCatalog),
identifier = Some(ref.identifier),
options = ref.options)
}

private def adaptCachedRelation(cached: LogicalPlan, ref: V2TableReference): LogicalPlan = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, TransactionalWrite}
import org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.allowInvokingTransformsInAnalyzer
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, LookupCatalog}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation

/**
* When a transaction is active, converts resolved [[DataSourceV2Relation]] nodes back to
* [[V2TableReference]] placeholders for all relations loaded by a catalog with the same
* name as the transaction catalog.
*
* This forces re-resolution of those relations against the transaction's catalog, which
* intercepts [[TableCatalog#loadTable]] calls to track which tables are read as part of
* the transaction.
*/
class UnresolveTransactionRelations(val catalogManager: CatalogManager)
extends Rule[LogicalPlan] with LookupCatalog {

override def apply(plan: LogicalPlan): LogicalPlan =
catalogManager.transaction match {
case Some(transaction) =>
allowInvokingTransformsInAnalyzer {
plan.transform {
case tw: TransactionalWrite =>
unresolveRelations(tw, transaction.catalog)
}
}
case _ => plan
}

private def unresolveRelations(
plan: LogicalPlan,
catalog: CatalogPlugin): LogicalPlan = {
plan transform {
case r: DataSourceV2Relation if isLoadedFromCatalog(r, catalog) =>
V2TableReference.createForTransaction(r)
}
}

private def isLoadedFromCatalog(
relation: DataSourceV2Relation,
catalog: CatalogPlugin): Boolean = {
relation.catalog.exists(_.name == catalog.name) && relation.identifier.isDefined
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.catalyst.analysis.V2TableReference.Context
import org.apache.spark.sql.catalyst.analysis.V2TableReference.TableInfo
import org.apache.spark.sql.catalyst.analysis.V2TableReference.TemporaryViewContext
import org.apache.spark.sql.catalyst.analysis.V2TableReference.TransactionContext
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.plans.logical.LeafNode
import org.apache.spark.sql.catalyst.plans.logical.Statistics
Expand All @@ -37,7 +38,7 @@ import org.apache.spark.sql.connector.catalog.V2TableUtil
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.sql.util.SchemaValidationMode.ALLOW_NEW_TOP_LEVEL_FIELDS
import org.apache.spark.sql.util.SchemaValidationMode.{ALLOW_NEW_TOP_LEVEL_FIELDS, PROHIBIT_CHANGES}
import org.apache.spark.util.ArrayImplicits._

/**
Expand Down Expand Up @@ -84,11 +85,19 @@ private[sql] object V2TableReference {

sealed trait Context
case class TemporaryViewContext(viewName: Seq[String]) extends Context
/** Context for relations that are re-resolved through a transaction catalog. */
case object TransactionContext extends Context

def createForTempView(relation: DataSourceV2Relation, viewName: Seq[String]): V2TableReference = {
create(relation, TemporaryViewContext(viewName))
}

// V2TableReference nodes in the transaction context are produced by
// UnresolveTransactionRelations which unresolves already resolved relations.
def createForTransaction(relation: DataSourceV2Relation): V2TableReference = {
create(relation, TransactionContext)
}

private def create(relation: DataSourceV2Relation, context: Context): V2TableReference = {
val ref = V2TableReference(
relation.catalog.get.asTableCatalog,
Expand All @@ -110,11 +119,32 @@ private[sql] object V2TableReferenceUtils extends SQLConfHelper {
ref.context match {
case ctx: TemporaryViewContext =>
validateLoadedTableInTempView(table, ref, ctx)
case TransactionContext =>
validateLoadedTableInTransaction(table, ref)
case ctx =>
throw SparkException.internalError(s"Unknown table ref context: ${ctx.getClass.getName}")
}
}

private def validateLoadedTableInTransaction(table: Table, ref: V2TableReference): Unit = {
// Do not allow schema evolution to pre-analysed dataframes that are later used in
// transactional writes. This is because the entire plans was built based on the original schema
// and any schema change would make the plan structurally invalid. This is inline with the
// semantics of SPARK-54444.
val dataErrors = V2TableUtil.validateCapturedColumns(
table = table,
originCols = ref.info.columns,
mode = PROHIBIT_CHANGES)
if (dataErrors.nonEmpty) {
throw QueryCompilationErrors.columnsChangedAfterAnalysis(ref.name, dataErrors)
}

val metaErrors = V2TableUtil.validateCapturedMetadataColumns(table, ref.info.metadataColumns)
if (metaErrors.nonEmpty) {
throw QueryCompilationErrors.metadataColumnsChangedAfterAnalysis(ref.name, metaErrors)
}
}

private def validateLoadedTableInTempView(
table: Table,
ref: V2TableReference,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,11 @@ case class InsertIntoStatement(
byName: Boolean = false,
replaceCriteriaOpt: Option[InsertReplaceCriteria] = None,
withSchemaEvolution: Boolean = false)
extends UnaryParsedStatement {
// Extends TransactionalWrite so that QueryExecution can detect a potential transaction on the
// unresolved logical plan before analysis runs. InsertIntoStatement is shared between V1 and V2
// inserts, but the LookupCatalog.TransactionalWrite extractor only matches when the target
// catalog implements TransactionalCatalogPlugin, so V1 inserts are never assigned a transaction.
extends UnaryParsedStatement with TransactionalWrite {

require(overwrite || !ifPartitionNotExists,
"IF NOT EXISTS is only valid in INSERT OVERWRITE")
Expand Down
Loading