diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index eb0d701004..1fae8d937e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -52,6 +52,15 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat } } + private def refreshCache(r: DataSourceV2Relation)(): Unit = { + session.sharedState.cacheManager.recacheByPlan(session, r) + } + + private def invalidateCache(r: ResolvedTable)(): Unit = { + val v2Relation = DataSourceV2Relation.create(r.table, Some(r.catalog), Some(r.identifier)) + session.sharedState.cacheManager.uncacheQuery(session, v2Relation, cascade = true) + } + override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case PhysicalOperation(project, filters, relation @ DataSourceV2ScanRelation(_, V1ScanWrapper(scan, translated, pushed), output)) => @@ -128,7 +137,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat } case RefreshTable(r: ResolvedTable) => - RefreshTableExec(session, r.catalog, r.table, r.identifier) :: Nil + RefreshTableExec(r.catalog, r.identifier, invalidateCache(r)) :: Nil case ReplaceTable(catalog, ident, schema, parts, props, orCreate) => val propsWithOwner = CatalogV2Util.withDefaultOwnership(props) @@ -172,9 +181,9 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat case AppendData(r: DataSourceV2Relation, query, writeOptions, _) => r.table.asWritable match { case v1 if v1.supports(TableCapability.V1_BATCH_WRITE) => - AppendDataExecV1(v1, writeOptions.asOptions, query, r) :: Nil + AppendDataExecV1(v1, writeOptions.asOptions, query, refreshCache(r)) :: Nil case v2 => - AppendDataExec(session, v2, r, writeOptions.asOptions, planLater(query)) :: Nil + AppendDataExec(v2, writeOptions.asOptions, planLater(query), refreshCache(r)) :: Nil } case OverwriteByExpression(r: DataSourceV2Relation, deleteExpr, query, writeOptions, _) => @@ -186,15 +195,16 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat }.toArray r.table.asWritable match { case v1 if v1.supports(TableCapability.V1_BATCH_WRITE) => - OverwriteByExpressionExecV1(v1, filters, writeOptions.asOptions, query, r) :: Nil + OverwriteByExpressionExecV1(v1, filters, writeOptions.asOptions, + query, refreshCache(r)) :: Nil case v2 => - OverwriteByExpressionExec(session, v2, r, filters, - writeOptions.asOptions, planLater(query)) :: Nil + OverwriteByExpressionExec(v2, filters, + writeOptions.asOptions, planLater(query), refreshCache(r)) :: Nil } case OverwritePartitionsDynamic(r: DataSourceV2Relation, query, writeOptions, _) => OverwritePartitionsDynamicExec( - session, r.table.asWritable, r, writeOptions.asOptions, planLater(query)) :: Nil + r.table.asWritable, writeOptions.asOptions, planLater(query), refreshCache(r)) :: Nil case DeleteFromTable(relation, condition) => relation match { @@ -232,7 +242,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat throw new AnalysisException("Describing columns is not supported for v2 tables.") case DropTable(r: ResolvedTable, ifExists, purge) => - DropTableExec(session, r.catalog, r.table, r.identifier, ifExists, purge) :: Nil + DropTableExec(r.catalog, r.identifier, ifExists, purge, invalidateCache(r)) :: Nil case _: NoopDropTable => LocalTableScanExec(Nil, Nil) :: Nil diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropTableExec.scala index 068475fc56..f89b890967 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropTableExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropTableExec.scala @@ -17,27 +17,24 @@ package org.apache.spark.sql.execution.datasources.v2 -import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.NoSuchTableException import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog} +import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog} /** * Physical plan node for dropping a table. */ case class DropTableExec( - session: SparkSession, catalog: TableCatalog, - table: Table, ident: Identifier, ifExists: Boolean, - purge: Boolean) extends V2CommandExec { + purge: Boolean, + invalidateCache: () => Unit) extends V2CommandExec { override def run(): Seq[InternalRow] = { if (catalog.tableExists(ident)) { - val v2Relation = DataSourceV2Relation.create(table, Some(catalog), Some(ident)) - session.sharedState.cacheManager.uncacheQuery(session, v2Relation, cascade = true) + invalidateCache() catalog.dropTable(ident, purge) } else if (!ifExists) { throw new NoSuchTableException(ident) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RefreshTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RefreshTableExec.scala index 52836de5a9..994583c1e3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RefreshTableExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RefreshTableExec.scala @@ -17,23 +17,20 @@ package org.apache.spark.sql.execution.datasources.v2 -import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog} +import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog} case class RefreshTableExec( - session: SparkSession, catalog: TableCatalog, - table: Table, - ident: Identifier) extends V2CommandExec { + ident: Identifier, + invalidateCache: () => Unit) extends V2CommandExec { override protected def run(): Seq[InternalRow] = { catalog.invalidateTable(ident) // invalidate all caches referencing the given table // TODO(SPARK-33437): re-cache the table itself once we support caching a DSv2 table - val v2Relation = DataSourceV2Relation.create(table, Some(catalog), Some(ident)) - session.sharedState.cacheManager.uncacheQuery(session, v2Relation, cascade = true) + invalidateCache() Seq.empty } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala index af7721588e..9d2cea9fba 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala @@ -38,10 +38,10 @@ case class AppendDataExecV1( table: SupportsWrite, writeOptions: CaseInsensitiveStringMap, plan: LogicalPlan, - v2Relation: DataSourceV2Relation) extends V1FallbackWriters { + refreshCache: () => Unit) extends V1FallbackWriters { override protected def run(): Seq[InternalRow] = { - writeWithV1(newWriteBuilder().buildForV1Write(), Some(v2Relation)) + writeWithV1(newWriteBuilder().buildForV1Write(), refreshCache = refreshCache) } } @@ -61,7 +61,7 @@ case class OverwriteByExpressionExecV1( deleteWhere: Array[Filter], writeOptions: CaseInsensitiveStringMap, plan: LogicalPlan, - v2Relation: DataSourceV2Relation) extends V1FallbackWriters { + refreshCache: () => Unit) extends V1FallbackWriters { private def isTruncate(filters: Array[Filter]): Boolean = { filters.length == 1 && filters(0).isInstanceOf[AlwaysTrue] @@ -70,10 +70,11 @@ case class OverwriteByExpressionExecV1( override protected def run(): Seq[InternalRow] = { newWriteBuilder() match { case builder: SupportsTruncate if isTruncate(deleteWhere) => - writeWithV1(builder.truncate().asV1Builder.buildForV1Write(), Some(v2Relation)) + writeWithV1(builder.truncate().asV1Builder.buildForV1Write(), refreshCache = refreshCache) case builder: SupportsOverwrite => - writeWithV1(builder.overwrite(deleteWhere).asV1Builder.buildForV1Write(), Some(v2Relation)) + writeWithV1(builder.overwrite(deleteWhere).asV1Builder.buildForV1Write(), + refreshCache = refreshCache) case _ => throw new SparkException(s"Table does not support overwrite by expression: $table") @@ -116,11 +117,11 @@ trait SupportsV1Write extends SparkPlan { protected def writeWithV1( relation: InsertableRelation, - v2Relation: Option[DataSourceV2Relation] = None): Seq[InternalRow] = { + refreshCache: () => Unit = () => ()): Seq[InternalRow] = { val session = sqlContext.sparkSession // The `plan` is already optimized, we should not analyze and optimize it again. relation.insert(AlreadyOptimized.dataFrame(session, plan), overwrite = false) - v2Relation.foreach(r => session.sharedState.cacheManager.recacheByPlan(session, r)) + refreshCache() Nil } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala index 1648134d0a..47aad2bcb2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala @@ -213,15 +213,14 @@ case class AtomicReplaceTableAsSelectExec( * Rows in the output data set are appended. */ case class AppendDataExec( - session: SparkSession, table: SupportsWrite, - relation: DataSourceV2Relation, writeOptions: CaseInsensitiveStringMap, - query: SparkPlan) extends V2TableWriteExec with BatchWriteHelper { + query: SparkPlan, + refreshCache: () => Unit) extends V2TableWriteExec with BatchWriteHelper { override protected def run(): Seq[InternalRow] = { val writtenRows = writeWithV2(newWriteBuilder().buildForBatch()) - session.sharedState.cacheManager.recacheByPlan(session, relation) + refreshCache() writtenRows } } @@ -237,12 +236,11 @@ case class AppendDataExec( * AlwaysTrue to delete all rows. */ case class OverwriteByExpressionExec( - session: SparkSession, table: SupportsWrite, - relation: DataSourceV2Relation, deleteWhere: Array[Filter], writeOptions: CaseInsensitiveStringMap, - query: SparkPlan) extends V2TableWriteExec with BatchWriteHelper { + query: SparkPlan, + refreshCache: () => Unit) extends V2TableWriteExec with BatchWriteHelper { private def isTruncate(filters: Array[Filter]): Boolean = { filters.length == 1 && filters(0).isInstanceOf[AlwaysTrue] @@ -259,7 +257,7 @@ case class OverwriteByExpressionExec( case _ => throw new SparkException(s"Table does not support overwrite by expression: $table") } - session.sharedState.cacheManager.recacheByPlan(session, relation) + refreshCache() writtenRows } } @@ -275,11 +273,10 @@ case class OverwriteByExpressionExec( * are not modified. */ case class OverwritePartitionsDynamicExec( - session: SparkSession, table: SupportsWrite, - relation: DataSourceV2Relation, writeOptions: CaseInsensitiveStringMap, - query: SparkPlan) extends V2TableWriteExec with BatchWriteHelper { + query: SparkPlan, + refreshCache: () => Unit) extends V2TableWriteExec with BatchWriteHelper { override protected def run(): Seq[InternalRow] = { val writtenRows = newWriteBuilder() match { @@ -289,7 +286,7 @@ case class OverwritePartitionsDynamicExec( case _ => throw new SparkException(s"Table does not support dynamic partition overwrite: $table") } - session.sharedState.cacheManager.recacheByPlan(session, relation) + refreshCache() writtenRows } }