[SPARK-32885][SS] Add DataStreamReader.table API

### What changes were proposed in this pull request?
This pr aims to add a new `table` API in DataStreamReader, which is similar to the table API in DataFrameReader.

### Why are the changes needed?
Users can directly use this API to get a Streaming DataFrame on a table. Below is a simple example:

Application 1 for initializing and starting the streaming job:

```
val path = "/home/yuanjian.li/runtime/to_be_deleted"
val tblName = "my_table"

// Write some data to `my_table`
spark.range(3).write.format("parquet").option("path", path).saveAsTable(tblName)

// Read the table as a streaming source, write result to destination directory
val table = spark.readStream.table(tblName)
table.writeStream.format("parquet").option("checkpointLocation", "/home/yuanjian.li/runtime/to_be_deleted_ck").start("/home/yuanjian.li/runtime/to_be_deleted_2")
```

Application 2 for appending new data:

```
// Append new data into the path
spark.range(5).write.format("parquet").option("path", "/home/yuanjian.li/runtime/to_be_deleted").mode("append").save()
```

Check result:
```
// The desitination directory should contains all written data
spark.read.parquet("/home/yuanjian.li/runtime/to_be_deleted_2").show()
```

### Does this PR introduce _any_ user-facing change?
Yes, a new API added.

### How was this patch tested?
New UT added and integrated testing.

Closes #29756 from xuanyuanking/SPARK-32885.

Authored-by: Yuanjian Li <yuanjian.li@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
Yuanjian Li 2020-09-25 06:50:24 +00:00 committed by Wenchen Fan
parent f2fc966674
commit 9e6882feca
18 changed files with 391 additions and 38 deletions

View file

@ -36,6 +36,7 @@ import org.apache.spark.sql.catalyst.expressions.objects._
import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2
import org.apache.spark.sql.catalyst.trees.TreeNodeRef import org.apache.spark.sql.catalyst.trees.TreeNodeRef
import org.apache.spark.sql.catalyst.util.toPrettySQL import org.apache.spark.sql.catalyst.util.toPrettySQL
import org.apache.spark.sql.connector.catalog._ import org.apache.spark.sql.connector.catalog._
@ -846,9 +847,9 @@ class Analyzer(
*/ */
object ResolveTempViews extends Rule[LogicalPlan] { object ResolveTempViews extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
case u @ UnresolvedRelation(ident, _) => case u @ UnresolvedRelation(ident, _, isStreaming) =>
lookupTempView(ident).getOrElse(u) lookupTempView(ident, isStreaming).getOrElse(u)
case i @ InsertIntoStatement(UnresolvedRelation(ident, _), _, _, _, _) => case i @ InsertIntoStatement(UnresolvedRelation(ident, _, false), _, _, _, _) =>
lookupTempView(ident) lookupTempView(ident)
.map(view => i.copy(table = view)) .map(view => i.copy(table = view))
.getOrElse(i) .getOrElse(i)
@ -861,15 +862,22 @@ class Analyzer(
lookupTempView(ident).map(_ => ResolvedView(ident.asIdentifier)).getOrElse(u) lookupTempView(ident).map(_ => ResolvedView(ident.asIdentifier)).getOrElse(u)
} }
def lookupTempView(identifier: Seq[String]): Option[LogicalPlan] = { def lookupTempView(
identifier: Seq[String], isStreaming: Boolean = false): Option[LogicalPlan] = {
// Permanent View can't refer to temp views, no need to lookup at all. // Permanent View can't refer to temp views, no need to lookup at all.
if (isResolvingView) return None if (isResolvingView) return None
identifier match { val tmpView = identifier match {
case Seq(part1) => v1SessionCatalog.lookupTempView(part1) case Seq(part1) => v1SessionCatalog.lookupTempView(part1)
case Seq(part1, part2) => v1SessionCatalog.lookupGlobalTempView(part1, part2) case Seq(part1, part2) => v1SessionCatalog.lookupGlobalTempView(part1, part2)
case _ => None case _ => None
} }
if (isStreaming && tmpView.nonEmpty && !tmpView.get.isStreaming) {
throw new AnalysisException(s"${identifier.quoted} is not a temp view of streaming " +
s"logical plan, please use batch API such as `DataFrameReader.table` to read it.")
}
tmpView
} }
} }
@ -895,10 +903,13 @@ class Analyzer(
object ResolveTables extends Rule[LogicalPlan] { object ResolveTables extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = ResolveTempViews(plan).resolveOperatorsUp { def apply(plan: LogicalPlan): LogicalPlan = ResolveTempViews(plan).resolveOperatorsUp {
case u: UnresolvedRelation => case u: UnresolvedRelation =>
lookupV2Relation(u.multipartIdentifier, u.options) lookupV2Relation(u.multipartIdentifier, u.options, u.isStreaming)
.map { rel => .map { relation =>
val ident = rel.identifier.get val (catalog, ident) = relation match {
SubqueryAlias(rel.catalog.get.name +: ident.namespace :+ ident.name, rel) case ds: DataSourceV2Relation => (ds.catalog, ds.identifier.get)
case s: StreamingRelationV2 => (s.catalog, s.identifier.get)
}
SubqueryAlias(catalog.get.name +: ident.namespace :+ ident.name, relation)
}.getOrElse(u) }.getOrElse(u)
case u @ UnresolvedTable(NonSessionCatalogAndIdentifier(catalog, ident)) => case u @ UnresolvedTable(NonSessionCatalogAndIdentifier(catalog, ident)) =>
@ -911,8 +922,9 @@ class Analyzer(
.map(ResolvedTable(catalog.asTableCatalog, ident, _)) .map(ResolvedTable(catalog.asTableCatalog, ident, _))
.getOrElse(u) .getOrElse(u)
case i @ InsertIntoStatement(u: UnresolvedRelation, _, _, _, _) if i.query.resolved => case i @ InsertIntoStatement(u @ UnresolvedRelation(_, _, false), _, _, _, _)
lookupV2Relation(u.multipartIdentifier, u.options) if i.query.resolved =>
lookupV2Relation(u.multipartIdentifier, u.options, false)
.map(v2Relation => i.copy(table = v2Relation)) .map(v2Relation => i.copy(table = v2Relation))
.getOrElse(i) .getOrElse(i)
@ -930,12 +942,18 @@ class Analyzer(
*/ */
private def lookupV2Relation( private def lookupV2Relation(
identifier: Seq[String], identifier: Seq[String],
options: CaseInsensitiveStringMap): Option[DataSourceV2Relation] = options: CaseInsensitiveStringMap,
isStreaming: Boolean): Option[LogicalPlan] =
expandRelationName(identifier) match { expandRelationName(identifier) match {
case NonSessionCatalogAndIdentifier(catalog, ident) => case NonSessionCatalogAndIdentifier(catalog, ident) =>
CatalogV2Util.loadTable(catalog, ident) match { CatalogV2Util.loadTable(catalog, ident) match {
case Some(table) => case Some(table) =>
Some(DataSourceV2Relation.create(table, Some(catalog), Some(ident), options)) if (isStreaming) {
Some(StreamingRelationV2(None, table.name, table, options,
table.schema.toAttributes, Some(catalog), Some(ident), None))
} else {
Some(DataSourceV2Relation.create(table, Some(catalog), Some(ident), options))
}
case None => None case None => None
} }
case _ => None case _ => None
@ -976,8 +994,8 @@ class Analyzer(
def apply(plan: LogicalPlan): LogicalPlan = ResolveTempViews(plan).resolveOperatorsUp { def apply(plan: LogicalPlan): LogicalPlan = ResolveTempViews(plan).resolveOperatorsUp {
case i @ InsertIntoStatement(table, _, _, _, _) if i.query.resolved => case i @ InsertIntoStatement(table, _, _, _, _) if i.query.resolved =>
val relation = table match { val relation = table match {
case u: UnresolvedRelation => case u @ UnresolvedRelation(_, _, false) =>
lookupRelation(u.multipartIdentifier, u.options).getOrElse(u) lookupRelation(u.multipartIdentifier, u.options, false).getOrElse(u)
case other => other case other => other
} }
@ -988,7 +1006,8 @@ class Analyzer(
} }
case u: UnresolvedRelation => case u: UnresolvedRelation =>
lookupRelation(u.multipartIdentifier, u.options).map(resolveViews).getOrElse(u) lookupRelation(u.multipartIdentifier, u.options, u.isStreaming)
.map(resolveViews).getOrElse(u)
case u @ UnresolvedTable(identifier) => case u @ UnresolvedTable(identifier) =>
lookupTableOrView(identifier).map { lookupTableOrView(identifier).map {
@ -1020,16 +1039,40 @@ class Analyzer(
// 3) If a v1 table is found, create a v1 relation. Otherwise, create a v2 relation. // 3) If a v1 table is found, create a v1 relation. Otherwise, create a v2 relation.
private def lookupRelation( private def lookupRelation(
identifier: Seq[String], identifier: Seq[String],
options: CaseInsensitiveStringMap): Option[LogicalPlan] = { options: CaseInsensitiveStringMap,
isStreaming: Boolean): Option[LogicalPlan] = {
expandRelationName(identifier) match { expandRelationName(identifier) match {
case SessionCatalogAndIdentifier(catalog, ident) => case SessionCatalogAndIdentifier(catalog, ident) =>
lazy val loaded = CatalogV2Util.loadTable(catalog, ident).map { lazy val loaded = CatalogV2Util.loadTable(catalog, ident).map {
case v1Table: V1Table => case v1Table: V1Table =>
v1SessionCatalog.getRelation(v1Table.v1Table, options) if (isStreaming) {
if (v1Table.v1Table.tableType == CatalogTableType.VIEW) {
throw new AnalysisException(s"${identifier.quoted} is a permanent view, " +
"which is not supported by streaming reading API such as " +
"`DataStreamReader.table` yet.")
}
SubqueryAlias(
catalog.name +: ident.asMultipartIdentifier,
UnresolvedCatalogRelation(v1Table.v1Table, options, isStreaming = true))
} else {
v1SessionCatalog.getRelation(v1Table.v1Table, options)
}
case table => case table =>
SubqueryAlias( if (isStreaming) {
catalog.name +: ident.asMultipartIdentifier, val v1Fallback = table match {
DataSourceV2Relation.create(table, Some(catalog), Some(ident), options)) case withFallback: V2TableWithV1Fallback =>
Some(UnresolvedCatalogRelation(withFallback.v1Table, isStreaming = true))
case _ => None
}
SubqueryAlias(
catalog.name +: ident.asMultipartIdentifier,
StreamingRelationV2(None, table.name, table, options, table.schema.toAttributes,
Some(catalog), Some(ident), v1Fallback))
} else {
SubqueryAlias(
catalog.name +: ident.asMultipartIdentifier,
DataSourceV2Relation.create(table, Some(catalog), Some(ident), options))
}
} }
val key = catalog.name +: ident.namespace :+ ident.name val key = catalog.name +: ident.namespace :+ ident.name
AnalysisContext.get.relationCache.get(key).map(_.transform { AnalysisContext.get.relationCache.get(key).map(_.transform {

View file

@ -171,7 +171,7 @@ object CTESubstitution extends Rule[LogicalPlan] {
plan: LogicalPlan, plan: LogicalPlan,
cteRelations: Seq[(String, LogicalPlan)]): LogicalPlan = cteRelations: Seq[(String, LogicalPlan)]): LogicalPlan =
plan resolveOperatorsUp { plan resolveOperatorsUp {
case u @ UnresolvedRelation(Seq(table), _) => case u @ UnresolvedRelation(Seq(table), _, _) =>
cteRelations.find(r => plan.conf.resolver(r._1, table)).map(_._2).getOrElse(u) cteRelations.find(r => plan.conf.resolver(r._1, table)).map(_._2).getOrElse(u)
case other => case other =>

View file

@ -105,7 +105,7 @@ object ResolveHints {
val newNode = CurrentOrigin.withOrigin(plan.origin) { val newNode = CurrentOrigin.withOrigin(plan.origin) {
plan match { plan match {
case ResolvedHint(u @ UnresolvedRelation(ident, _), hint) case ResolvedHint(u @ UnresolvedRelation(ident, _, _), hint)
if matchedIdentifierInHint(ident) => if matchedIdentifierInHint(ident) =>
ResolvedHint(u, createHintInfo(hintName).merge(hint, hintErrorHandler)) ResolvedHint(u, createHintInfo(hintName).merge(hint, hintErrorHandler))
@ -113,7 +113,7 @@ object ResolveHints {
if matchedIdentifierInHint(extractIdentifier(r)) => if matchedIdentifierInHint(extractIdentifier(r)) =>
ResolvedHint(r, createHintInfo(hintName).merge(hint, hintErrorHandler)) ResolvedHint(r, createHintInfo(hintName).merge(hint, hintErrorHandler))
case UnresolvedRelation(ident, _) if matchedIdentifierInHint(ident) => case UnresolvedRelation(ident, _, _) if matchedIdentifierInHint(ident) =>
ResolvedHint(plan, createHintInfo(hintName)) ResolvedHint(plan, createHintInfo(hintName))
case r: SubqueryAlias if matchedIdentifierInHint(extractIdentifier(r)) => case r: SubqueryAlias if matchedIdentifierInHint(extractIdentifier(r)) =>

View file

@ -45,7 +45,8 @@ class UnresolvedException[TreeType <: TreeNode[_]](tree: TreeType, function: Str
*/ */
case class UnresolvedRelation( case class UnresolvedRelation(
multipartIdentifier: Seq[String], multipartIdentifier: Seq[String],
options: CaseInsensitiveStringMap = CaseInsensitiveStringMap.empty()) options: CaseInsensitiveStringMap = CaseInsensitiveStringMap.empty(),
override val isStreaming: Boolean = false)
extends LeafNode with NamedRelation { extends LeafNode with NamedRelation {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
@ -60,6 +61,14 @@ case class UnresolvedRelation(
} }
object UnresolvedRelation { object UnresolvedRelation {
def apply(
tableIdentifier: TableIdentifier,
extraOptions: CaseInsensitiveStringMap,
isStreaming: Boolean): UnresolvedRelation = {
UnresolvedRelation(
tableIdentifier.database.toSeq :+ tableIdentifier.table, extraOptions, isStreaming)
}
def apply(tableIdentifier: TableIdentifier): UnresolvedRelation = def apply(tableIdentifier: TableIdentifier): UnresolvedRelation =
UnresolvedRelation(tableIdentifier.database.toSeq :+ tableIdentifier.table) UnresolvedRelation(tableIdentifier.database.toSeq :+ tableIdentifier.table)
} }

View file

@ -643,7 +643,8 @@ object CatalogTypes {
*/ */
case class UnresolvedCatalogRelation( case class UnresolvedCatalogRelation(
tableMeta: CatalogTable, tableMeta: CatalogTable,
options: CaseInsensitiveStringMap = CaseInsensitiveStringMap.empty()) extends LeafNode { options: CaseInsensitiveStringMap = CaseInsensitiveStringMap.empty(),
override val isStreaming: Boolean = false) extends LeafNode {
assert(tableMeta.identifier.database.isDefined) assert(tableMeta.identifier.database.isDefined)
override lazy val resolved: Boolean = false override lazy val resolved: Boolean = false
override def output: Seq[Attribute] = Nil override def output: Seq[Attribute] = Nil

View file

@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.streaming
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
import org.apache.spark.sql.connector.catalog.{Table, TableProvider} import org.apache.spark.sql.connector.catalog.{CatalogPlugin, Identifier, Table, TableProvider}
import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.sql.util.CaseInsensitiveStringMap
@ -37,6 +37,8 @@ case class StreamingRelationV2(
table: Table, table: Table,
extraOptions: CaseInsensitiveStringMap, extraOptions: CaseInsensitiveStringMap,
output: Seq[Attribute], output: Seq[Attribute],
catalog: Option[CatalogPlugin],
identifier: Option[Identifier],
v1Relation: Option[LogicalPlan]) v1Relation: Option[LogicalPlan])
extends LeafNode with MultiInstanceRelation { extends LeafNode with MultiInstanceRelation {
override lazy val resolved = v1Relation.forall(_.resolved) override lazy val resolved = v1Relation.forall(_.resolved)

View file

@ -80,3 +80,11 @@ private[sql] case class V1Table(v1Table: CatalogTable) extends Table {
override def toString: String = s"V1Table($name)" override def toString: String = s"V1Table($name)"
} }
/**
* A V2 table with V1 fallback support. This is used to fallback to V1 table when the V2 one
* doesn't implement specific capabilities but V1 already has.
*/
private[sql] trait V2TableWithV1Fallback extends Table {
def v1Table: CatalogTable
}

View file

@ -174,7 +174,7 @@ case class CreateViewCommand(
def verify(child: LogicalPlan) { def verify(child: LogicalPlan) {
child.collect { child.collect {
// Disallow creating permanent views based on temporary views. // Disallow creating permanent views based on temporary views.
case UnresolvedRelation(nameParts, _) if catalog.isTempView(nameParts) => case UnresolvedRelation(nameParts, _, _) if catalog.isTempView(nameParts) =>
throw new AnalysisException(s"Not allowed to create a permanent view $name by " + throw new AnalysisException(s"Not allowed to create a permanent view $name by " +
s"referencing a temporary view ${nameParts.quoted}. " + s"referencing a temporary view ${nameParts.quoted}. " +
"Please create a temp view instead by CREATE TEMP VIEW") "Please create a temp view instead by CREATE TEMP VIEW")

View file

@ -37,8 +37,12 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.ScanOperation import org.apache.spark.sql.catalyst.planning.ScanOperation
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoStatement, LogicalPlan, Project} import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoStatement, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2
import org.apache.spark.sql.connector.catalog.SupportsRead
import org.apache.spark.sql.connector.catalog.TableCapability._
import org.apache.spark.sql.execution.{RowDataSourceScanExec, SparkPlan} import org.apache.spark.sql.execution.{RowDataSourceScanExec, SparkPlan}
import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.streaming.StreamingRelation
import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.StoreAssignmentPolicy import org.apache.spark.sql.internal.SQLConf.StoreAssignmentPolicy
import org.apache.spark.sql.sources._ import org.apache.spark.sql.sources._
@ -260,19 +264,48 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]
}) })
} }
private def getStreamingRelation(
table: CatalogTable,
extraOptions: CaseInsensitiveStringMap): StreamingRelation = {
val dsOptions = DataSourceUtils.generateDatasourceOptions(extraOptions, table)
val dataSource = DataSource(
sparkSession,
className = table.provider.get,
userSpecifiedSchema = Some(table.schema),
options = dsOptions)
StreamingRelation(dataSource)
}
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case i @ InsertIntoStatement(UnresolvedCatalogRelation(tableMeta, options), _, _, _, _) case i @ InsertIntoStatement(UnresolvedCatalogRelation(tableMeta, options, false), _, _, _, _)
if DDLUtils.isDatasourceTable(tableMeta) => if DDLUtils.isDatasourceTable(tableMeta) =>
i.copy(table = readDataSourceTable(tableMeta, options)) i.copy(table = readDataSourceTable(tableMeta, options))
case i @ InsertIntoStatement(UnresolvedCatalogRelation(tableMeta, _), _, _, _, _) => case i @ InsertIntoStatement(UnresolvedCatalogRelation(tableMeta, _, false), _, _, _, _) =>
i.copy(table = DDLUtils.readHiveTable(tableMeta)) i.copy(table = DDLUtils.readHiveTable(tableMeta))
case UnresolvedCatalogRelation(tableMeta, options) if DDLUtils.isDatasourceTable(tableMeta) => case UnresolvedCatalogRelation(tableMeta, options, false)
if DDLUtils.isDatasourceTable(tableMeta) =>
readDataSourceTable(tableMeta, options) readDataSourceTable(tableMeta, options)
case UnresolvedCatalogRelation(tableMeta, _) => case UnresolvedCatalogRelation(tableMeta, _, false) =>
DDLUtils.readHiveTable(tableMeta) DDLUtils.readHiveTable(tableMeta)
case UnresolvedCatalogRelation(tableMeta, extraOptions, true) =>
getStreamingRelation(tableMeta, extraOptions)
case s @ StreamingRelationV2(
_, _, table, extraOptions, _, _, _, Some(UnresolvedCatalogRelation(tableMeta, _, true))) =>
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
val v1Relation = getStreamingRelation(tableMeta, extraOptions)
if (table.isInstanceOf[SupportsRead]
&& table.supportsAny(MICRO_BATCH_READ, CONTINUOUS_READ)) {
s.copy(v1Relation = Some(v1Relation))
} else {
// Fallback to V1 relation
v1Relation
}
} }
} }

View file

@ -90,7 +90,7 @@ class MicroBatchExecution(
StreamingExecutionRelation(source, output)(sparkSession) StreamingExecutionRelation(source, output)(sparkSession)
}) })
case s @ StreamingRelationV2(src, srcName, table: SupportsRead, options, output, v1) => case s @ StreamingRelationV2(src, srcName, table: SupportsRead, options, output, _, _, v1) =>
val dsStr = if (src.nonEmpty) s"[${src.get}]" else "" val dsStr = if (src.nonEmpty) s"[${src.get}]" else ""
val v2Disabled = disabledSources.contains(src.getOrElse(None).getClass.getCanonicalName) val v2Disabled = disabledSources.contains(src.getOrElse(None).getClass.getCanonicalName)
if (!v2Disabled && table.supports(TableCapability.MICRO_BATCH_READ)) { if (!v2Disabled && table.supports(TableCapability.MICRO_BATCH_READ)) {

View file

@ -65,7 +65,7 @@ class ContinuousExecution(
var nextSourceId = 0 var nextSourceId = 0
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._ import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
val _logicalPlan = analyzedPlan.transform { val _logicalPlan = analyzedPlan.transform {
case s @ StreamingRelationV2(ds, sourceName, table: SupportsRead, options, output, _) => case s @ StreamingRelationV2(ds, sourceName, table: SupportsRead, options, output, _, _, _) =>
val dsStr = if (ds.nonEmpty) s"[${ds.get}]" else "" val dsStr = if (ds.nonEmpty) s"[${ds.get}]" else ""
if (!table.supports(TableCapability.CONTINUOUS_READ)) { if (!table.supports(TableCapability.CONTINUOUS_READ)) {
throw new UnsupportedOperationException( throw new UnsupportedOperationException(

View file

@ -83,6 +83,8 @@ abstract class MemoryStreamBase[A : Encoder](sqlContext: SQLContext) extends Spa
new MemoryStreamTable(this), new MemoryStreamTable(this),
CaseInsensitiveStringMap.empty(), CaseInsensitiveStringMap.empty(),
attributes, attributes,
None,
None,
None) None)
} }

View file

@ -24,6 +24,7 @@ import scala.collection.JavaConverters._
import org.apache.spark.annotation.Evolving import org.apache.spark.annotation.Evolving
import org.apache.spark.internal.Logging import org.apache.spark.internal.Logging
import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, SparkSession} import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2 import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.connector.catalog.{SupportsRead, TableProvider} import org.apache.spark.sql.connector.catalog.{SupportsRead, TableProvider}
@ -231,7 +232,8 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
Dataset.ofRows( Dataset.ofRows(
sparkSession, sparkSession,
StreamingRelationV2( StreamingRelationV2(
Some(provider), source, table, dsOptions, table.schema.toAttributes, v1Relation)) Some(provider), source, table, dsOptions,
table.schema.toAttributes, None, None, v1Relation))
// fallback to v1 // fallback to v1
// TODO (SPARK-27483): we should move this fallback logic to an analyzer rule. // TODO (SPARK-27483): we should move this fallback logic to an analyzer rule.
@ -475,6 +477,23 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
format("parquet").load(path) format("parquet").load(path)
} }
/**
* Define a Streaming DataFrame on a Table. The DataSource corresponding to the table should
* support streaming mode.
* @param tableName The name of the table
* @since 3.1.0
*/
def table(tableName: String): DataFrame = {
require(tableName != null, "The table name can't be null")
val identifier = sparkSession.sessionState.sqlParser.parseMultipartIdentifier(tableName)
Dataset.ofRows(
sparkSession,
UnresolvedRelation(
identifier,
new CaseInsensitiveStringMap(extraOptions.toMap.asJava),
isStreaming = true))
}
/** /**
* Loads text files and returns a `DataFrame` whose schema starts with a string column named * Loads text files and returns a `DataFrame` whose schema starts with a string column named
* "value", and followed by partitioned columns if there are any. * "value", and followed by partitioned columns if there are any.

View file

@ -693,7 +693,7 @@ Output: []
Arguments: `default`.`explain_view`, SELECT key, val FROM explain_temp1, false, false, PersistedView Arguments: `default`.`explain_view`, SELECT key, val FROM explain_temp1, false, false, PersistedView
(3) UnresolvedRelation (3) UnresolvedRelation
Arguments: [explain_temp1], [] Arguments: [explain_temp1], [], false
(4) Project (4) Project
Arguments: ['key, 'val] Arguments: ['key, 'val]

View file

@ -827,7 +827,7 @@ Output: []
Arguments: `default`.`explain_view`, SELECT key, val FROM explain_temp1, false, false, PersistedView Arguments: `default`.`explain_view`, SELECT key, val FROM explain_temp1, false, false, PersistedView
(3) UnresolvedRelation (3) UnresolvedRelation
Arguments: [explain_temp1], [] Arguments: [explain_temp1], [], false
(4) Project (4) Project
Arguments: ['key, 'val] Arguments: ['key, 'val]

View file

@ -46,6 +46,8 @@ class TableCapabilityCheckSuite extends AnalysisSuite with SharedSparkSession {
table, table,
CaseInsensitiveStringMap.empty(), CaseInsensitiveStringMap.empty(),
TableCapabilityCheckSuite.schema.toAttributes, TableCapabilityCheckSuite.schema.toAttributes,
None,
None,
v1Relation) v1Relation)
} }

View file

@ -0,0 +1,234 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.streaming.test
import java.util
import scala.collection.JavaConverters._
import org.scalatest.BeforeAndAfter
import org.apache.spark.sql.{AnalysisException, Row}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2
import org.apache.spark.sql.connector.{FakeV2Provider, InMemoryTableCatalog}
import org.apache.spark.sql.connector.catalog.{Identifier, SupportsRead, Table, TableCapability, V2TableWithV1Fallback}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.connector.read.ScanBuilder
import org.apache.spark.sql.execution.streaming.{MemoryStream, MemoryStreamScanBuilder}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.StreamTest
import org.apache.spark.sql.streaming.sources.FakeScanBuilder
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
class DataStreamTableAPISuite extends StreamTest with BeforeAndAfter {
import testImplicits._
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
before {
spark.conf.set("spark.sql.catalog.testcat", classOf[InMemoryTableCatalog].getName)
spark.conf.set("spark.sql.catalog.teststream", classOf[InMemoryStreamTableCatalog].getName)
}
after {
spark.sessionState.catalogManager.reset()
spark.sessionState.conf.clear()
}
test("table API with file source") {
Seq("parquet", "").foreach { source =>
withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> source) {
withTempDir { tempDir =>
val tblName = "my_table"
val dir = tempDir.getAbsolutePath
withTable(tblName) {
spark.range(3).write.format("parquet").option("path", dir).saveAsTable(tblName)
testStream(spark.readStream.table(tblName))(
ProcessAllAvailable(),
CheckAnswer(Row(0), Row(1), Row(2))
)
}
}
}
}
}
test("read non-exist table") {
intercept[AnalysisException] {
spark.readStream.table("non_exist_table")
}.message.contains("Table not found")
}
test("stream table API with temp view") {
val tblName = "my_table"
val stream = MemoryStream[Int]
withTable(tblName) {
stream.toDF().createOrReplaceTempView(tblName)
testStream(spark.readStream.table(tblName)) (
AddData(stream, 1, 2, 3),
CheckLastBatch(1, 2, 3),
AddData(stream, 4, 5),
CheckLastBatch(4, 5)
)
}
}
test("stream table API with non-streaming temp view") {
val tblName = "my_table"
withTable(tblName) {
spark.range(3).createOrReplaceTempView(tblName)
intercept[AnalysisException] {
spark.readStream.table(tblName)
}.message.contains("is not a temp view of streaming logical plan")
}
}
test("read table without streaming capability support") {
val tableIdentifer = "testcat.table_name"
spark.sql(s"CREATE TABLE $tableIdentifer (id bigint, data string) USING foo")
intercept[AnalysisException] {
spark.readStream.table(tableIdentifer)
}.message.contains("does not support either micro-batch or continuous scan")
}
test("read table with custom catalog") {
val tblName = "teststream.table_name"
withTable(tblName) {
spark.sql(s"CREATE TABLE $tblName (data int) USING foo")
val stream = MemoryStream[Int]
val testCatalog = spark.sessionState.catalogManager.catalog("teststream").asTableCatalog
val table = testCatalog.loadTable(Identifier.of(Array(), "table_name"))
table.asInstanceOf[InMemoryStreamTable].setStream(stream)
testStream(spark.readStream.table(tblName)) (
AddData(stream, 1, 2, 3),
CheckLastBatch(1, 2, 3),
AddData(stream, 4, 5),
CheckLastBatch(4, 5)
)
}
}
test("read table with custom catalog & namespace") {
spark.sql("CREATE NAMESPACE teststream.ns")
val tblName = "teststream.ns.table_name"
withTable(tblName) {
spark.sql(s"CREATE TABLE $tblName (data int) USING foo")
val stream = MemoryStream[Int]
val testCatalog = spark.sessionState.catalogManager.catalog("teststream").asTableCatalog
val table = testCatalog.loadTable(Identifier.of(Array("ns"), "table_name"))
table.asInstanceOf[InMemoryStreamTable].setStream(stream)
testStream(spark.readStream.table(tblName)) (
AddData(stream, 1, 2, 3),
CheckLastBatch(1, 2, 3),
AddData(stream, 4, 5),
CheckLastBatch(4, 5)
)
}
}
test("fallback to V1 relation") {
val tblName = DataStreamTableAPISuite.V1FallbackTestTableName
spark.conf.set(SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION.key,
classOf[InMemoryStreamTableCatalog].getName)
val v2Source = classOf[FakeV2Provider].getName
withTempDir { tempDir =>
withTable(tblName) {
spark.sql(s"CREATE TABLE $tblName (data int) USING $v2Source")
// Check the StreamingRelationV2 has been replaced by StreamingRelation
val plan = spark.readStream.option("path", tempDir.getCanonicalPath).table(tblName)
.queryExecution.analyzed.collectFirst {
case d: StreamingRelationV2 => d
}
assert(plan.isEmpty)
}
}
}
}
object DataStreamTableAPISuite {
val V1FallbackTestTableName = "fallbackV1Test"
}
class InMemoryStreamTable(override val name: String) extends Table with SupportsRead {
var stream: MemoryStream[Int] = _
def setStream(inputData: MemoryStream[Int]): Unit = stream = inputData
override def schema(): StructType = stream.fullSchema()
override def capabilities(): util.Set[TableCapability] = {
Set(TableCapability.MICRO_BATCH_READ, TableCapability.CONTINUOUS_READ).asJava
}
override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
new MemoryStreamScanBuilder(stream)
}
}
class NonStreamV2Table(override val name: String)
extends Table with SupportsRead with V2TableWithV1Fallback {
override def schema(): StructType = StructType(Nil)
override def capabilities(): util.Set[TableCapability] = Set(TableCapability.BATCH_READ).asJava
override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = new FakeScanBuilder
override def v1Table: CatalogTable = {
CatalogTable(
identifier =
TableIdentifier(DataStreamTableAPISuite.V1FallbackTestTableName, Some("default")),
tableType = CatalogTableType.MANAGED,
storage = CatalogStorageFormat.empty,
owner = null,
schema = schema(),
provider = Some("parquet"))
}
}
class InMemoryStreamTableCatalog extends InMemoryTableCatalog {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
override def createTable(
ident: Identifier,
schema: StructType,
partitions: Array[Transform],
properties: util.Map[String, String]): Table = {
if (tables.containsKey(ident)) {
throw new TableAlreadyExistsException(ident)
}
val table = if (ident.name() == DataStreamTableAPISuite.V1FallbackTestTableName) {
new NonStreamV2Table(s"$name.${ident.quoted}")
} else {
new InMemoryStreamTable(s"$name.${ident.quoted}")
}
tables.put(ident, table)
namespaces.putIfAbsent(ident.namespace.toList, Map())
table
}
}

View file

@ -601,7 +601,7 @@ private[hive] class TestHiveQueryExecution(
// Make sure any test tables referenced are loaded. // Make sure any test tables referenced are loaded.
val referencedTables = val referencedTables =
describedTables ++ describedTables ++
logical.collect { case UnresolvedRelation(ident, _) => ident.asTableIdentifier } logical.collect { case UnresolvedRelation(ident, _, _) => ident.asTableIdentifier }
val resolver = sparkSession.sessionState.conf.resolver val resolver = sparkSession.sessionState.conf.resolver
val referencedTestTables = referencedTables.flatMap { tbl => val referencedTestTables = referencedTables.flatMap { tbl =>
val testTableOpt = sparkSession.testTables.keys.find(resolver(_, tbl.table)) val testTableOpt = sparkSession.testTables.keys.find(resolver(_, tbl.table))