From 9e6882feca0800d5d4f9920886cb5dae73bbe1d4 Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Fri, 25 Sep 2020 06:50:24 +0000 Subject: [PATCH] [SPARK-32885][SS] Add DataStreamReader.table API ### What changes were proposed in this pull request? This pr aims to add a new `table` API in DataStreamReader, which is similar to the table API in DataFrameReader. ### Why are the changes needed? Users can directly use this API to get a Streaming DataFrame on a table. Below is a simple example: Application 1 for initializing and starting the streaming job: ``` val path = "/home/yuanjian.li/runtime/to_be_deleted" val tblName = "my_table" // Write some data to `my_table` spark.range(3).write.format("parquet").option("path", path).saveAsTable(tblName) // Read the table as a streaming source, write result to destination directory val table = spark.readStream.table(tblName) table.writeStream.format("parquet").option("checkpointLocation", "/home/yuanjian.li/runtime/to_be_deleted_ck").start("/home/yuanjian.li/runtime/to_be_deleted_2") ``` Application 2 for appending new data: ``` // Append new data into the path spark.range(5).write.format("parquet").option("path", "/home/yuanjian.li/runtime/to_be_deleted").mode("append").save() ``` Check result: ``` // The desitination directory should contains all written data spark.read.parquet("/home/yuanjian.li/runtime/to_be_deleted_2").show() ``` ### Does this PR introduce _any_ user-facing change? Yes, a new API added. ### How was this patch tested? New UT added and integrated testing. Closes #29756 from xuanyuanking/SPARK-32885. Authored-by: Yuanjian Li Signed-off-by: Wenchen Fan --- .../sql/catalyst/analysis/Analyzer.scala | 85 +++++-- .../catalyst/analysis/CTESubstitution.scala | 2 +- .../sql/catalyst/analysis/ResolveHints.scala | 4 +- .../sql/catalyst/analysis/unresolved.scala | 11 +- .../sql/catalyst/catalog/interface.scala | 3 +- .../streaming/StreamingRelationV2.scala | 4 +- .../spark/sql/connector/catalog/V1Table.scala | 8 + .../spark/sql/execution/command/views.scala | 2 +- .../datasources/DataSourceStrategy.scala | 41 ++- .../streaming/MicroBatchExecution.scala | 2 +- .../continuous/ContinuousExecution.scala | 2 +- .../sql/execution/streaming/memory.scala | 2 + .../sql/streaming/DataStreamReader.scala | 21 +- .../sql-tests/results/explain-aqe.sql.out | 2 +- .../sql-tests/results/explain.sql.out | 2 +- .../connector/TableCapabilityCheckSuite.scala | 2 + .../test/DataStreamTableAPISuite.scala | 234 ++++++++++++++++++ .../apache/spark/sql/hive/test/TestHive.scala | 2 +- 18 files changed, 391 insertions(+), 38 deletions(-) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 7d591eeea2..6e1f371b1a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -36,6 +36,7 @@ import org.apache.spark.sql.catalyst.expressions.objects._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ +import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2 import org.apache.spark.sql.catalyst.trees.TreeNodeRef import org.apache.spark.sql.catalyst.util.toPrettySQL import org.apache.spark.sql.connector.catalog._ @@ -846,9 +847,9 @@ class Analyzer( */ object ResolveTempViews extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { - case u @ UnresolvedRelation(ident, _) => - lookupTempView(ident).getOrElse(u) - case i @ InsertIntoStatement(UnresolvedRelation(ident, _), _, _, _, _) => + case u @ UnresolvedRelation(ident, _, isStreaming) => + lookupTempView(ident, isStreaming).getOrElse(u) + case i @ InsertIntoStatement(UnresolvedRelation(ident, _, false), _, _, _, _) => lookupTempView(ident) .map(view => i.copy(table = view)) .getOrElse(i) @@ -861,15 +862,22 @@ class Analyzer( lookupTempView(ident).map(_ => ResolvedView(ident.asIdentifier)).getOrElse(u) } - def lookupTempView(identifier: Seq[String]): Option[LogicalPlan] = { + def lookupTempView( + identifier: Seq[String], isStreaming: Boolean = false): Option[LogicalPlan] = { // Permanent View can't refer to temp views, no need to lookup at all. if (isResolvingView) return None - identifier match { + val tmpView = identifier match { case Seq(part1) => v1SessionCatalog.lookupTempView(part1) case Seq(part1, part2) => v1SessionCatalog.lookupGlobalTempView(part1, part2) case _ => None } + + if (isStreaming && tmpView.nonEmpty && !tmpView.get.isStreaming) { + throw new AnalysisException(s"${identifier.quoted} is not a temp view of streaming " + + s"logical plan, please use batch API such as `DataFrameReader.table` to read it.") + } + tmpView } } @@ -895,10 +903,13 @@ class Analyzer( object ResolveTables extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = ResolveTempViews(plan).resolveOperatorsUp { case u: UnresolvedRelation => - lookupV2Relation(u.multipartIdentifier, u.options) - .map { rel => - val ident = rel.identifier.get - SubqueryAlias(rel.catalog.get.name +: ident.namespace :+ ident.name, rel) + lookupV2Relation(u.multipartIdentifier, u.options, u.isStreaming) + .map { relation => + val (catalog, ident) = relation match { + case ds: DataSourceV2Relation => (ds.catalog, ds.identifier.get) + case s: StreamingRelationV2 => (s.catalog, s.identifier.get) + } + SubqueryAlias(catalog.get.name +: ident.namespace :+ ident.name, relation) }.getOrElse(u) case u @ UnresolvedTable(NonSessionCatalogAndIdentifier(catalog, ident)) => @@ -911,8 +922,9 @@ class Analyzer( .map(ResolvedTable(catalog.asTableCatalog, ident, _)) .getOrElse(u) - case i @ InsertIntoStatement(u: UnresolvedRelation, _, _, _, _) if i.query.resolved => - lookupV2Relation(u.multipartIdentifier, u.options) + case i @ InsertIntoStatement(u @ UnresolvedRelation(_, _, false), _, _, _, _) + if i.query.resolved => + lookupV2Relation(u.multipartIdentifier, u.options, false) .map(v2Relation => i.copy(table = v2Relation)) .getOrElse(i) @@ -930,12 +942,18 @@ class Analyzer( */ private def lookupV2Relation( identifier: Seq[String], - options: CaseInsensitiveStringMap): Option[DataSourceV2Relation] = + options: CaseInsensitiveStringMap, + isStreaming: Boolean): Option[LogicalPlan] = expandRelationName(identifier) match { case NonSessionCatalogAndIdentifier(catalog, ident) => CatalogV2Util.loadTable(catalog, ident) match { case Some(table) => - Some(DataSourceV2Relation.create(table, Some(catalog), Some(ident), options)) + if (isStreaming) { + Some(StreamingRelationV2(None, table.name, table, options, + table.schema.toAttributes, Some(catalog), Some(ident), None)) + } else { + Some(DataSourceV2Relation.create(table, Some(catalog), Some(ident), options)) + } case None => None } case _ => None @@ -976,8 +994,8 @@ class Analyzer( def apply(plan: LogicalPlan): LogicalPlan = ResolveTempViews(plan).resolveOperatorsUp { case i @ InsertIntoStatement(table, _, _, _, _) if i.query.resolved => val relation = table match { - case u: UnresolvedRelation => - lookupRelation(u.multipartIdentifier, u.options).getOrElse(u) + case u @ UnresolvedRelation(_, _, false) => + lookupRelation(u.multipartIdentifier, u.options, false).getOrElse(u) case other => other } @@ -988,7 +1006,8 @@ class Analyzer( } case u: UnresolvedRelation => - lookupRelation(u.multipartIdentifier, u.options).map(resolveViews).getOrElse(u) + lookupRelation(u.multipartIdentifier, u.options, u.isStreaming) + .map(resolveViews).getOrElse(u) case u @ UnresolvedTable(identifier) => lookupTableOrView(identifier).map { @@ -1020,16 +1039,40 @@ class Analyzer( // 3) If a v1 table is found, create a v1 relation. Otherwise, create a v2 relation. private def lookupRelation( identifier: Seq[String], - options: CaseInsensitiveStringMap): Option[LogicalPlan] = { + options: CaseInsensitiveStringMap, + isStreaming: Boolean): Option[LogicalPlan] = { expandRelationName(identifier) match { case SessionCatalogAndIdentifier(catalog, ident) => lazy val loaded = CatalogV2Util.loadTable(catalog, ident).map { case v1Table: V1Table => - v1SessionCatalog.getRelation(v1Table.v1Table, options) + if (isStreaming) { + if (v1Table.v1Table.tableType == CatalogTableType.VIEW) { + throw new AnalysisException(s"${identifier.quoted} is a permanent view, " + + "which is not supported by streaming reading API such as " + + "`DataStreamReader.table` yet.") + } + SubqueryAlias( + catalog.name +: ident.asMultipartIdentifier, + UnresolvedCatalogRelation(v1Table.v1Table, options, isStreaming = true)) + } else { + v1SessionCatalog.getRelation(v1Table.v1Table, options) + } case table => - SubqueryAlias( - catalog.name +: ident.asMultipartIdentifier, - DataSourceV2Relation.create(table, Some(catalog), Some(ident), options)) + if (isStreaming) { + val v1Fallback = table match { + case withFallback: V2TableWithV1Fallback => + Some(UnresolvedCatalogRelation(withFallback.v1Table, isStreaming = true)) + case _ => None + } + SubqueryAlias( + catalog.name +: ident.asMultipartIdentifier, + StreamingRelationV2(None, table.name, table, options, table.schema.toAttributes, + Some(catalog), Some(ident), v1Fallback)) + } else { + SubqueryAlias( + catalog.name +: ident.asMultipartIdentifier, + DataSourceV2Relation.create(table, Some(catalog), Some(ident), options)) + } } val key = catalog.name +: ident.namespace :+ ident.name AnalysisContext.get.relationCache.get(key).map(_.transform { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala index b177aa8dd0..8d3b04c202 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala @@ -171,7 +171,7 @@ object CTESubstitution extends Rule[LogicalPlan] { plan: LogicalPlan, cteRelations: Seq[(String, LogicalPlan)]): LogicalPlan = plan resolveOperatorsUp { - case u @ UnresolvedRelation(Seq(table), _) => + case u @ UnresolvedRelation(Seq(table), _, _) => cteRelations.find(r => plan.conf.resolver(r._1, table)).map(_._2).getOrElse(u) case other => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala index 1f0de78b69..c0a9414d61 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala @@ -105,7 +105,7 @@ object ResolveHints { val newNode = CurrentOrigin.withOrigin(plan.origin) { plan match { - case ResolvedHint(u @ UnresolvedRelation(ident, _), hint) + case ResolvedHint(u @ UnresolvedRelation(ident, _, _), hint) if matchedIdentifierInHint(ident) => ResolvedHint(u, createHintInfo(hintName).merge(hint, hintErrorHandler)) @@ -113,7 +113,7 @@ object ResolveHints { if matchedIdentifierInHint(extractIdentifier(r)) => ResolvedHint(r, createHintInfo(hintName).merge(hint, hintErrorHandler)) - case UnresolvedRelation(ident, _) if matchedIdentifierInHint(ident) => + case UnresolvedRelation(ident, _, _) if matchedIdentifierInHint(ident) => ResolvedHint(plan, createHintInfo(hintName)) case r: SubqueryAlias if matchedIdentifierInHint(extractIdentifier(r)) => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala index 62000ac0ef..49861f9172 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala @@ -45,7 +45,8 @@ class UnresolvedException[TreeType <: TreeNode[_]](tree: TreeType, function: Str */ case class UnresolvedRelation( multipartIdentifier: Seq[String], - options: CaseInsensitiveStringMap = CaseInsensitiveStringMap.empty()) + options: CaseInsensitiveStringMap = CaseInsensitiveStringMap.empty(), + override val isStreaming: Boolean = false) extends LeafNode with NamedRelation { import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ @@ -60,6 +61,14 @@ case class UnresolvedRelation( } object UnresolvedRelation { + def apply( + tableIdentifier: TableIdentifier, + extraOptions: CaseInsensitiveStringMap, + isStreaming: Boolean): UnresolvedRelation = { + UnresolvedRelation( + tableIdentifier.database.toSeq :+ tableIdentifier.table, extraOptions, isStreaming) + } + def apply(tableIdentifier: TableIdentifier): UnresolvedRelation = UnresolvedRelation(tableIdentifier.database.toSeq :+ tableIdentifier.table) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index db01999ab9..9c93691ca3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -643,7 +643,8 @@ object CatalogTypes { */ case class UnresolvedCatalogRelation( tableMeta: CatalogTable, - options: CaseInsensitiveStringMap = CaseInsensitiveStringMap.empty()) extends LeafNode { + options: CaseInsensitiveStringMap = CaseInsensitiveStringMap.empty(), + override val isStreaming: Boolean = false) extends LeafNode { assert(tableMeta.identifier.database.isDefined) override lazy val resolved: Boolean = false override def output: Seq[Attribute] = Nil diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/StreamingRelationV2.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/StreamingRelationV2.scala index 92c4926c3a..6a059025a7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/StreamingRelationV2.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/StreamingRelationV2.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.streaming import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} -import org.apache.spark.sql.connector.catalog.{Table, TableProvider} +import org.apache.spark.sql.connector.catalog.{CatalogPlugin, Identifier, Table, TableProvider} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -37,6 +37,8 @@ case class StreamingRelationV2( table: Table, extraOptions: CaseInsensitiveStringMap, output: Seq[Attribute], + catalog: Option[CatalogPlugin], + identifier: Option[Identifier], v1Relation: Option[LogicalPlan]) extends LeafNode with MultiInstanceRelation { override lazy val resolved = v1Relation.forall(_.resolved) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/V1Table.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/V1Table.scala index 70fc9689e6..9aed550ff9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/V1Table.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/V1Table.scala @@ -80,3 +80,11 @@ private[sql] case class V1Table(v1Table: CatalogTable) extends Table { override def toString: String = s"V1Table($name)" } + +/** + * A V2 table with V1 fallback support. This is used to fallback to V1 table when the V2 one + * doesn't implement specific capabilities but V1 already has. + */ +private[sql] trait V2TableWithV1Fallback extends Table { + def v1Table: CatalogTable +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala index cc2a4a6b3c..94f34a9b39 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala @@ -174,7 +174,7 @@ case class CreateViewCommand( def verify(child: LogicalPlan) { child.collect { // Disallow creating permanent views based on temporary views. - case UnresolvedRelation(nameParts, _) if catalog.isTempView(nameParts) => + case UnresolvedRelation(nameParts, _, _) if catalog.isTempView(nameParts) => throw new AnalysisException(s"Not allowed to create a permanent view $name by " + s"referencing a temporary view ${nameParts.quoted}. " + "Please create a temp view instead by CREATE TEMP VIEW") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 1f8cfee308..86e8571927 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -37,8 +37,12 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning.ScanOperation import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoStatement, LogicalPlan, Project} import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2 +import org.apache.spark.sql.connector.catalog.SupportsRead +import org.apache.spark.sql.connector.catalog.TableCapability._ import org.apache.spark.sql.execution.{RowDataSourceScanExec, SparkPlan} import org.apache.spark.sql.execution.command._ +import org.apache.spark.sql.execution.streaming.StreamingRelation import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.StoreAssignmentPolicy import org.apache.spark.sql.sources._ @@ -260,19 +264,48 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] }) } + private def getStreamingRelation( + table: CatalogTable, + extraOptions: CaseInsensitiveStringMap): StreamingRelation = { + val dsOptions = DataSourceUtils.generateDatasourceOptions(extraOptions, table) + val dataSource = DataSource( + sparkSession, + className = table.provider.get, + userSpecifiedSchema = Some(table.schema), + options = dsOptions) + StreamingRelation(dataSource) + } + + override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { - case i @ InsertIntoStatement(UnresolvedCatalogRelation(tableMeta, options), _, _, _, _) + case i @ InsertIntoStatement(UnresolvedCatalogRelation(tableMeta, options, false), _, _, _, _) if DDLUtils.isDatasourceTable(tableMeta) => i.copy(table = readDataSourceTable(tableMeta, options)) - case i @ InsertIntoStatement(UnresolvedCatalogRelation(tableMeta, _), _, _, _, _) => + case i @ InsertIntoStatement(UnresolvedCatalogRelation(tableMeta, _, false), _, _, _, _) => i.copy(table = DDLUtils.readHiveTable(tableMeta)) - case UnresolvedCatalogRelation(tableMeta, options) if DDLUtils.isDatasourceTable(tableMeta) => + case UnresolvedCatalogRelation(tableMeta, options, false) + if DDLUtils.isDatasourceTable(tableMeta) => readDataSourceTable(tableMeta, options) - case UnresolvedCatalogRelation(tableMeta, _) => + case UnresolvedCatalogRelation(tableMeta, _, false) => DDLUtils.readHiveTable(tableMeta) + + case UnresolvedCatalogRelation(tableMeta, extraOptions, true) => + getStreamingRelation(tableMeta, extraOptions) + + case s @ StreamingRelationV2( + _, _, table, extraOptions, _, _, _, Some(UnresolvedCatalogRelation(tableMeta, _, true))) => + import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._ + val v1Relation = getStreamingRelation(tableMeta, extraOptions) + if (table.isInstanceOf[SupportsRead] + && table.supportsAny(MICRO_BATCH_READ, CONTINUOUS_READ)) { + s.copy(v1Relation = Some(v1Relation)) + } else { + // Fallback to V1 relation + v1Relation + } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala index 5a91b24a08..aad212cc13 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala @@ -90,7 +90,7 @@ class MicroBatchExecution( StreamingExecutionRelation(source, output)(sparkSession) }) - case s @ StreamingRelationV2(src, srcName, table: SupportsRead, options, output, v1) => + case s @ StreamingRelationV2(src, srcName, table: SupportsRead, options, output, _, _, v1) => val dsStr = if (src.nonEmpty) s"[${src.get}]" else "" val v2Disabled = disabledSources.contains(src.getOrElse(None).getClass.getCanonicalName) if (!v2Disabled && table.supports(TableCapability.MICRO_BATCH_READ)) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala index 12198f735c..6eb28d4c66 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala @@ -65,7 +65,7 @@ class ContinuousExecution( var nextSourceId = 0 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._ val _logicalPlan = analyzedPlan.transform { - case s @ StreamingRelationV2(ds, sourceName, table: SupportsRead, options, output, _) => + case s @ StreamingRelationV2(ds, sourceName, table: SupportsRead, options, output, _, _, _) => val dsStr = if (ds.nonEmpty) s"[${ds.get}]" else "" if (!table.supports(TableCapability.CONTINUOUS_READ)) { throw new UnsupportedOperationException( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala index c6ba0da6ef..ee1cb127a3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala @@ -83,6 +83,8 @@ abstract class MemoryStreamBase[A : Encoder](sqlContext: SQLContext) extends Spa new MemoryStreamTable(this), CaseInsensitiveStringMap.empty(), attributes, + None, + None, None) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala index 93a48946fb..9bc4acd49a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala @@ -24,6 +24,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.annotation.Evolving import org.apache.spark.internal.Logging import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, SparkSession} +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.connector.catalog.{SupportsRead, TableProvider} @@ -231,7 +232,8 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo Dataset.ofRows( sparkSession, StreamingRelationV2( - Some(provider), source, table, dsOptions, table.schema.toAttributes, v1Relation)) + Some(provider), source, table, dsOptions, + table.schema.toAttributes, None, None, v1Relation)) // fallback to v1 // TODO (SPARK-27483): we should move this fallback logic to an analyzer rule. @@ -475,6 +477,23 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo format("parquet").load(path) } + /** + * Define a Streaming DataFrame on a Table. The DataSource corresponding to the table should + * support streaming mode. + * @param tableName The name of the table + * @since 3.1.0 + */ + def table(tableName: String): DataFrame = { + require(tableName != null, "The table name can't be null") + val identifier = sparkSession.sessionState.sqlParser.parseMultipartIdentifier(tableName) + Dataset.ofRows( + sparkSession, + UnresolvedRelation( + identifier, + new CaseInsensitiveStringMap(extraOptions.toMap.asJava), + isStreaming = true)) + } + /** * Loads text files and returns a `DataFrame` whose schema starts with a string column named * "value", and followed by partitioned columns if there are any. diff --git a/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out b/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out index 5a59ffa038..3a850160b4 100644 --- a/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out @@ -693,7 +693,7 @@ Output: [] Arguments: `default`.`explain_view`, SELECT key, val FROM explain_temp1, false, false, PersistedView (3) UnresolvedRelation -Arguments: [explain_temp1], [] +Arguments: [explain_temp1], [], false (4) Project Arguments: ['key, 'val] diff --git a/sql/core/src/test/resources/sql-tests/results/explain.sql.out b/sql/core/src/test/resources/sql-tests/results/explain.sql.out index f28c408407..6b3b71f85c 100644 --- a/sql/core/src/test/resources/sql-tests/results/explain.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/explain.sql.out @@ -827,7 +827,7 @@ Output: [] Arguments: `default`.`explain_view`, SELECT key, val FROM explain_temp1, false, false, PersistedView (3) UnresolvedRelation -Arguments: [explain_temp1], [] +Arguments: [explain_temp1], [], false (4) Project Arguments: ['key, 'val] diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/TableCapabilityCheckSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/TableCapabilityCheckSuite.scala index 1d016496df..2d75a35215 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/TableCapabilityCheckSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/TableCapabilityCheckSuite.scala @@ -46,6 +46,8 @@ class TableCapabilityCheckSuite extends AnalysisSuite with SharedSparkSession { table, CaseInsensitiveStringMap.empty(), TableCapabilityCheckSuite.schema.toAttributes, + None, + None, v1Relation) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala new file mode 100644 index 0000000000..788452dace --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming.test + +import java.util + +import scala.collection.JavaConverters._ + +import org.scalatest.BeforeAndAfter + +import org.apache.spark.sql.{AnalysisException, Row} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException +import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType} +import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2 +import org.apache.spark.sql.connector.{FakeV2Provider, InMemoryTableCatalog} +import org.apache.spark.sql.connector.catalog.{Identifier, SupportsRead, Table, TableCapability, V2TableWithV1Fallback} +import org.apache.spark.sql.connector.expressions.Transform +import org.apache.spark.sql.connector.read.ScanBuilder +import org.apache.spark.sql.execution.streaming.{MemoryStream, MemoryStreamScanBuilder} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.streaming.StreamTest +import org.apache.spark.sql.streaming.sources.FakeScanBuilder +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +class DataStreamTableAPISuite extends StreamTest with BeforeAndAfter { + import testImplicits._ + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + + before { + spark.conf.set("spark.sql.catalog.testcat", classOf[InMemoryTableCatalog].getName) + spark.conf.set("spark.sql.catalog.teststream", classOf[InMemoryStreamTableCatalog].getName) + } + + after { + spark.sessionState.catalogManager.reset() + spark.sessionState.conf.clear() + } + + test("table API with file source") { + Seq("parquet", "").foreach { source => + withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> source) { + withTempDir { tempDir => + val tblName = "my_table" + val dir = tempDir.getAbsolutePath + withTable(tblName) { + spark.range(3).write.format("parquet").option("path", dir).saveAsTable(tblName) + + testStream(spark.readStream.table(tblName))( + ProcessAllAvailable(), + CheckAnswer(Row(0), Row(1), Row(2)) + ) + } + } + } + } + } + + test("read non-exist table") { + intercept[AnalysisException] { + spark.readStream.table("non_exist_table") + }.message.contains("Table not found") + } + + test("stream table API with temp view") { + val tblName = "my_table" + val stream = MemoryStream[Int] + withTable(tblName) { + stream.toDF().createOrReplaceTempView(tblName) + + testStream(spark.readStream.table(tblName)) ( + AddData(stream, 1, 2, 3), + CheckLastBatch(1, 2, 3), + AddData(stream, 4, 5), + CheckLastBatch(4, 5) + ) + } + } + + test("stream table API with non-streaming temp view") { + val tblName = "my_table" + withTable(tblName) { + spark.range(3).createOrReplaceTempView(tblName) + intercept[AnalysisException] { + spark.readStream.table(tblName) + }.message.contains("is not a temp view of streaming logical plan") + } + } + + test("read table without streaming capability support") { + val tableIdentifer = "testcat.table_name" + + spark.sql(s"CREATE TABLE $tableIdentifer (id bigint, data string) USING foo") + + intercept[AnalysisException] { + spark.readStream.table(tableIdentifer) + }.message.contains("does not support either micro-batch or continuous scan") + } + + test("read table with custom catalog") { + val tblName = "teststream.table_name" + withTable(tblName) { + spark.sql(s"CREATE TABLE $tblName (data int) USING foo") + val stream = MemoryStream[Int] + val testCatalog = spark.sessionState.catalogManager.catalog("teststream").asTableCatalog + val table = testCatalog.loadTable(Identifier.of(Array(), "table_name")) + table.asInstanceOf[InMemoryStreamTable].setStream(stream) + + testStream(spark.readStream.table(tblName)) ( + AddData(stream, 1, 2, 3), + CheckLastBatch(1, 2, 3), + AddData(stream, 4, 5), + CheckLastBatch(4, 5) + ) + } + } + + test("read table with custom catalog & namespace") { + spark.sql("CREATE NAMESPACE teststream.ns") + + val tblName = "teststream.ns.table_name" + withTable(tblName) { + spark.sql(s"CREATE TABLE $tblName (data int) USING foo") + val stream = MemoryStream[Int] + val testCatalog = spark.sessionState.catalogManager.catalog("teststream").asTableCatalog + val table = testCatalog.loadTable(Identifier.of(Array("ns"), "table_name")) + table.asInstanceOf[InMemoryStreamTable].setStream(stream) + + testStream(spark.readStream.table(tblName)) ( + AddData(stream, 1, 2, 3), + CheckLastBatch(1, 2, 3), + AddData(stream, 4, 5), + CheckLastBatch(4, 5) + ) + } + } + + test("fallback to V1 relation") { + val tblName = DataStreamTableAPISuite.V1FallbackTestTableName + spark.conf.set(SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION.key, + classOf[InMemoryStreamTableCatalog].getName) + val v2Source = classOf[FakeV2Provider].getName + withTempDir { tempDir => + withTable(tblName) { + spark.sql(s"CREATE TABLE $tblName (data int) USING $v2Source") + + // Check the StreamingRelationV2 has been replaced by StreamingRelation + val plan = spark.readStream.option("path", tempDir.getCanonicalPath).table(tblName) + .queryExecution.analyzed.collectFirst { + case d: StreamingRelationV2 => d + } + assert(plan.isEmpty) + } + } + } +} + +object DataStreamTableAPISuite { + val V1FallbackTestTableName = "fallbackV1Test" +} + +class InMemoryStreamTable(override val name: String) extends Table with SupportsRead { + var stream: MemoryStream[Int] = _ + + def setStream(inputData: MemoryStream[Int]): Unit = stream = inputData + + override def schema(): StructType = stream.fullSchema() + + override def capabilities(): util.Set[TableCapability] = { + Set(TableCapability.MICRO_BATCH_READ, TableCapability.CONTINUOUS_READ).asJava + } + + override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = { + new MemoryStreamScanBuilder(stream) + } +} + +class NonStreamV2Table(override val name: String) + extends Table with SupportsRead with V2TableWithV1Fallback { + override def schema(): StructType = StructType(Nil) + override def capabilities(): util.Set[TableCapability] = Set(TableCapability.BATCH_READ).asJava + override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = new FakeScanBuilder + + override def v1Table: CatalogTable = { + CatalogTable( + identifier = + TableIdentifier(DataStreamTableAPISuite.V1FallbackTestTableName, Some("default")), + tableType = CatalogTableType.MANAGED, + storage = CatalogStorageFormat.empty, + owner = null, + schema = schema(), + provider = Some("parquet")) + } +} + + +class InMemoryStreamTableCatalog extends InMemoryTableCatalog { + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + + override def createTable( + ident: Identifier, + schema: StructType, + partitions: Array[Transform], + properties: util.Map[String, String]): Table = { + if (tables.containsKey(ident)) { + throw new TableAlreadyExistsException(ident) + } + + val table = if (ident.name() == DataStreamTableAPISuite.V1FallbackTestTableName) { + new NonStreamV2Table(s"$name.${ident.quoted}") + } else { + new InMemoryStreamTable(s"$name.${ident.quoted}") + } + tables.put(ident, table) + namespaces.putIfAbsent(ident.namespace.toList, Map()) + table + } +} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala index 497dda4e22..accfcb8d9d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -601,7 +601,7 @@ private[hive] class TestHiveQueryExecution( // Make sure any test tables referenced are loaded. val referencedTables = describedTables ++ - logical.collect { case UnresolvedRelation(ident, _) => ident.asTableIdentifier } + logical.collect { case UnresolvedRelation(ident, _, _) => ident.asTableIdentifier } val resolver = sparkSession.sessionState.conf.resolver val referencedTestTables = referencedTables.flatMap { tbl => val testTableOpt = sparkSession.testTables.keys.find(resolver(_, tbl.table))