[SPARK-31255][SQL] Add SupportsMetadataColumns to DSv2

### What changes were proposed in this pull request?

This adds support for metadata columns to DataSourceV2. If a source implements `SupportsMetadataColumns` it must also implement `SupportsPushDownRequiredColumns` to support projecting those columns.

The analyzer is updated to resolve metadata columns from `LogicalPlan.metadataOutput`, and this adds a rule that will add metadata columns to the output of `DataSourceV2Relation` if one is used.

### Why are the changes needed?

This is the solution discussed for exposing additional data in the Kafka source. It is also needed for a generic `MERGE INTO` plan.

### Does this PR introduce any user-facing change?

Yes. Users can project additional columns from sources that implement the new API. This also updates `DescribeTableExec` to show metadata columns.

### How was this patch tested?

Will include new unit tests.

Closes #28027 from rdblue/add-dsv2-metadata-columns.

Authored-by: Ryan Blue <blue@apache.org>
Signed-off-by: Burak Yavuz <brkyvz@gmail.com>
This commit is contained in:
Ryan Blue 2020-11-18 14:07:51 -08:00 committed by Burak Yavuz
parent 27cd945c15
commit 1df69f7e32
11 changed files with 296 additions and 22 deletions

View file

@ -0,0 +1,58 @@
package org.apache.spark.sql.connector.catalog;
import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.connector.expressions.Transform;
import org.apache.spark.sql.types.DataType;
/**
* Interface for a metadata column.
* <p>
* A metadata column can expose additional metadata about a row. For example, rows from Kafka can
* use metadata columns to expose a message's topic, partition number, and offset.
* <p>
* A metadata column could also be the result of a transform applied to a value in the row. For
* example, a partition value produced by bucket(id, 16) could be exposed by a metadata column. In
* this case, {@link #transform()} should return a non-null {@link Transform} that produced the
* metadata column's values.
*/
@Evolving
public interface MetadataColumn {
/**
* The name of this metadata column.
*
* @return a String name
*/
String name();
/**
* The data type of values in this metadata column.
*
* @return a {@link DataType}
*/
DataType dataType();
/**
* @return whether values produced by this metadata column may be null
*/
default boolean isNullable() {
return true;
}
/**
* Documentation for this metadata column, or null.
*
* @return a documentation String
*/
default String comment() {
return null;
}
/**
* The {@link Transform} used to produce this metadata column from data rows, or null.
*
* @return a {@link Transform} used to produce the column's values, or null if there isn't one
*/
default Transform transform() {
return null;
}
}

View file

@ -0,0 +1,37 @@
package org.apache.spark.sql.connector.catalog;
import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.connector.read.SupportsPushDownRequiredColumns;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
/**
* An interface for exposing data columns for a table that are not in the table schema. For example,
* a file source could expose a "file" column that contains the path of the file that contained each
* row.
* <p>
* The columns returned by {@link #metadataColumns()} may be passed as {@link StructField} in
* requested projections. Sources that implement this interface and column projection using
* {@link SupportsPushDownRequiredColumns} must accept metadata fields passed to
* {@link SupportsPushDownRequiredColumns#pruneColumns(StructType)}.
* <p>
* If a table column and a metadata column have the same name, the metadata column will never be
* requested. It is recommended that Table implementations reject data column name that conflict
* with metadata column names.
*/
@Evolving
public interface SupportsMetadataColumns extends Table {
/**
* Metadata columns that are supported by this {@link Table}.
* <p>
* The columns returned by this method may be passed as {@link StructField} in requested
* projections using {@link SupportsPushDownRequiredColumns#pruneColumns(StructType)}.
* <p>
* If a table column and a metadata column have the same name, the metadata column will never be
* requested and is ignored. It is recommended that Table implementations reject data column names
* that conflict with metadata column names.
*
* @return an array of {@link MetadataColumn}
*/
MetadataColumn[] metadataColumns();
}

View file

@ -221,6 +221,7 @@ class Analyzer(override val catalogManager: CatalogManager)
ResolveRelations ::
ResolveTables ::
ResolvePartitionSpec ::
AddMetadataColumns ::
ResolveReferences ::
ResolveCreateNamedStruct ::
ResolveDeserializer ::
@ -916,6 +917,29 @@ class Analyzer(override val catalogManager: CatalogManager)
}
}
/**
* Adds metadata columns to output for child relations when nodes are missing resolved attributes.
*
* References to metadata columns are resolved using columns from [[LogicalPlan.metadataOutput]],
* but the relation's output does not include the metadata columns until the relation is replaced
* using [[DataSourceV2Relation.withMetadataColumns()]]. Unless this rule adds metadata to the
* relation's output, the analyzer will detect that nothing produces the columns.
*
* This rule only adds metadata columns when a node is resolved but is missing input from its
* children. This ensures that metadata columns are not added to the plan unless they are used. By
* checking only resolved nodes, this ensures that * expansion is already done so that metadata
* columns are not accidentally selected by *.
*/
object AddMetadataColumns extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp {
case node if node.resolved && node.children.nonEmpty && node.missingInput.nonEmpty =>
node resolveOperatorsUp {
case rel: DataSourceV2Relation =>
rel.withMetadataColumns()
}
}
}
/**
* Resolve table relations with concrete relations from v2 catalog.
*

View file

@ -33,6 +33,9 @@ abstract class LogicalPlan
with QueryPlanConstraints
with Logging {
/** Metadata fields that can be projected from this node */
def metadataOutput: Seq[Attribute] = children.flatMap(_.metadataOutput)
/** Returns true if this subtree has data from a streaming data source. */
def isStreaming: Boolean = children.exists(_.isStreaming)
@ -86,7 +89,8 @@ abstract class LogicalPlan
}
}
private[this] lazy val childAttributes = AttributeSeq(children.flatMap(_.output))
private[this] lazy val childAttributes =
AttributeSeq(children.flatMap(c => c.output ++ c.metadataOutput))
private[this] lazy val outputAttributes = AttributeSeq(output)

View file

@ -886,6 +886,12 @@ case class SubqueryAlias(
val qualifierList = identifier.qualifier :+ alias
child.output.map(_.withQualifier(qualifierList))
}
override def metadataOutput: Seq[Attribute] = {
val qualifierList = identifier.qualifier :+ alias
child.metadataOutput.map(_.withQualifier(qualifierList))
}
override def doCanonicalize(): LogicalPlan = child.canonicalized
}

View file

@ -21,7 +21,9 @@ import scala.collection.JavaConverters._
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.{PartitionSpec, ResolvedPartitionSpec, UnresolvedPartitionSpec}
import org.apache.spark.sql.connector.catalog.{SupportsAtomicPartitionManagement, SupportsDelete, SupportsPartitionManagement, SupportsRead, SupportsWrite, Table, TableCapability}
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.connector.catalog.{MetadataColumn, SupportsAtomicPartitionManagement, SupportsDelete, SupportsPartitionManagement, SupportsRead, SupportsWrite, Table, TableCapability}
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
object DataSourceV2Implicits {
@ -78,6 +80,18 @@ object DataSourceV2Implicits {
def supportsAny(capabilities: TableCapability*): Boolean = capabilities.exists(supports)
}
implicit class MetadataColumnsHelper(metadata: Array[MetadataColumn]) {
def asStruct: StructType = {
val fields = metadata.map { metaCol =>
val field = StructField(metaCol.name, metaCol.dataType, metaCol.isNullable)
Option(metaCol.comment).map(field.withComment).getOrElse(field)
}
StructType(fields)
}
def toAttributes: Seq[AttributeReference] = asStruct.toAttributes
}
implicit class OptionsHelper(options: Map[String, String]) {
def asOptions: CaseInsensitiveStringMap = {
new CaseInsensitiveStringMap(options.asJava)

View file

@ -21,10 +21,11 @@ import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, NamedRelat
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.connector.catalog.{CatalogPlugin, Identifier, Table, TableCapability}
import org.apache.spark.sql.connector.catalog.{CatalogPlugin, Identifier, MetadataColumn, SupportsMetadataColumns, Table, TableCapability}
import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, Statistics => V2Statistics, SupportsReportStatistics}
import org.apache.spark.sql.connector.read.streaming.{Offset, SparkDataStream}
import org.apache.spark.sql.connector.write.WriteBuilder
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.Utils
@ -48,6 +49,21 @@ case class DataSourceV2Relation(
import DataSourceV2Implicits._
override lazy val metadataOutput: Seq[AttributeReference] = table match {
case hasMeta: SupportsMetadataColumns =>
val resolve = SQLConf.get.resolver
val outputNames = outputSet.map(_.name)
def isOutputColumn(col: MetadataColumn): Boolean = {
outputNames.exists(name => resolve(col.name, name))
}
// filter out metadata columns that have names conflicting with output columns. if the table
// has a column "line" and the table can produce a metadata column called "line", then the
// data column should be returned, not the metadata column.
hasMeta.metadataColumns.filterNot(isOutputColumn).toAttributes
case _ =>
Nil
}
override def name: String = table.name()
override def skipSchemaResolution: Boolean = table.supports(TableCapability.ACCEPT_ANY_SCHEMA)
@ -78,6 +94,14 @@ case class DataSourceV2Relation(
override def newInstance(): DataSourceV2Relation = {
copy(output = output.map(_.newInstance()))
}
def withMetadataColumns(): DataSourceV2Relation = {
if (metadataOutput.nonEmpty) {
DataSourceV2Relation(table, output ++ metadataOutput, catalog, identifier, options)
} else {
this
}
}
}
/**

View file

@ -27,6 +27,7 @@ import scala.collection.mutable
import org.scalatest.Assertions._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, JoinedRow}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.connector.catalog._
import org.apache.spark.sql.connector.expressions.{BucketTransform, DaysTransform, HoursTransform, IdentityTransform, MonthsTransform, Transform, YearsTransform}
@ -34,8 +35,9 @@ import org.apache.spark.sql.connector.read._
import org.apache.spark.sql.connector.write._
import org.apache.spark.sql.connector.write.streaming.{StreamingDataWriterFactory, StreamingWrite}
import org.apache.spark.sql.sources.{And, EqualTo, Filter, IsNotNull}
import org.apache.spark.sql.types.{DataType, DateType, StructType, TimestampType}
import org.apache.spark.sql.types.{DataType, DateType, StringType, StructField, StructType, TimestampType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.unsafe.types.UTF8String
/**
* A simple in-memory table. Rows are stored as a buffered group produced by each output task.
@ -45,7 +47,24 @@ class InMemoryTable(
val schema: StructType,
override val partitioning: Array[Transform],
override val properties: util.Map[String, String])
extends Table with SupportsRead with SupportsWrite with SupportsDelete {
extends Table with SupportsRead with SupportsWrite with SupportsDelete
with SupportsMetadataColumns {
private object PartitionKeyColumn extends MetadataColumn {
override def name: String = "_partition"
override def dataType: DataType = StringType
override def comment: String = "Partition key used to store the row"
}
private object IndexColumn extends MetadataColumn {
override def name: String = "index"
override def dataType: DataType = StringType
override def comment: String = "Metadata column used to conflict with a data column"
}
// purposely exposes a metadata column that conflicts with a data column in some tests
override val metadataColumns: Array[MetadataColumn] = Array(IndexColumn, PartitionKeyColumn)
private val metadataColumnNames = metadataColumns.map(_.name).toSet -- schema.map(_.name)
private val allowUnsupportedTransforms =
properties.getOrDefault("allow-unsupported-transforms", "false").toBoolean
@ -146,7 +165,7 @@ class InMemoryTable(
val key = getKey(row)
dataMap += dataMap.get(key)
.map(key -> _.withRow(row))
.getOrElse(key -> new BufferedRows().withRow(row))
.getOrElse(key -> new BufferedRows(key.toArray.mkString("/")).withRow(row))
})
this
}
@ -160,17 +179,38 @@ class InMemoryTable(
TableCapability.TRUNCATE).asJava
override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
() => new InMemoryBatchScan(data.map(_.asInstanceOf[InputPartition]))
new InMemoryScanBuilder(schema)
}
class InMemoryBatchScan(data: Array[InputPartition]) extends Scan with Batch {
class InMemoryScanBuilder(tableSchema: StructType) extends ScanBuilder
with SupportsPushDownRequiredColumns {
private var schema: StructType = tableSchema
override def build: Scan =
new InMemoryBatchScan(data.map(_.asInstanceOf[InputPartition]), schema)
override def pruneColumns(requiredSchema: StructType): Unit = {
// if metadata columns are projected, return the table schema and metadata columns
val hasMetadataColumns = requiredSchema.map(_.name).exists(metadataColumnNames.contains)
if (hasMetadataColumns) {
schema = StructType(tableSchema ++ metadataColumnNames
.flatMap(name => metadataColumns.find(_.name == name))
.map(col => StructField(col.name, col.dataType, col.isNullable)))
}
}
}
class InMemoryBatchScan(data: Array[InputPartition], schema: StructType) extends Scan with Batch {
override def readSchema(): StructType = schema
override def toBatch: Batch = this
override def planInputPartitions(): Array[InputPartition] = data
override def createReaderFactory(): PartitionReaderFactory = BufferedRowsReaderFactory
override def createReaderFactory(): PartitionReaderFactory = {
val metadataColumns = schema.map(_.name).filter(metadataColumnNames.contains)
new BufferedRowsReaderFactory(metadataColumns)
}
}
override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
@ -340,7 +380,8 @@ object InMemoryTable {
}
}
class BufferedRows extends WriterCommitMessage with InputPartition with Serializable {
class BufferedRows(
val key: String = "") extends WriterCommitMessage with InputPartition with Serializable {
val rows = new mutable.ArrayBuffer[InternalRow]()
def withRow(row: InternalRow): BufferedRows = {
@ -349,13 +390,24 @@ class BufferedRows extends WriterCommitMessage with InputPartition with Serializ
}
}
private object BufferedRowsReaderFactory extends PartitionReaderFactory {
private class BufferedRowsReaderFactory(
metadataColumns: Seq[String]) extends PartitionReaderFactory {
override def createReader(partition: InputPartition): PartitionReader[InternalRow] = {
new BufferedRowsReader(partition.asInstanceOf[BufferedRows])
new BufferedRowsReader(partition.asInstanceOf[BufferedRows], metadataColumns)
}
}
private class BufferedRowsReader(partition: BufferedRows) extends PartitionReader[InternalRow] {
private class BufferedRowsReader(
partition: BufferedRows,
metadataColumns: Seq[String]) extends PartitionReader[InternalRow] {
private def addMetadata(row: InternalRow): InternalRow = {
val metadataRow = new GenericInternalRow(metadataColumns.map {
case "index" => index
case "_partition" => UTF8String.fromString(partition.key)
}.toArray)
new JoinedRow(row, metadataRow)
}
private var index: Int = -1
override def next(): Boolean = {
@ -363,7 +415,7 @@ private class BufferedRowsReader(partition: BufferedRows) extends PartitionReade
index < partition.rows.length
}
override def get(): InternalRow = partition.rows(index)
override def get(): InternalRow = addMetadata(partition.rows(index))
override def close(): Unit = {}
}

View file

@ -23,7 +23,7 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericRowWithSchema}
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Table, TableCatalog}
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, SupportsMetadataColumns, Table, TableCatalog}
import org.apache.spark.sql.types.StructType
case class DescribeTableExec(
@ -41,6 +41,7 @@ case class DescribeTableExec(
addPartitioning(rows)
if (isExtended) {
addMetadataColumns(rows)
addTableDetails(rows)
}
rows.toSeq
@ -72,6 +73,19 @@ case class DescribeTableExec(
}
}
private def addMetadataColumns(rows: ArrayBuffer[InternalRow]): Unit = table match {
case hasMeta: SupportsMetadataColumns if hasMeta.metadataColumns.nonEmpty =>
rows += emptyRow()
rows += toCatalystRow("# Metadata Columns", "", "")
rows ++= hasMeta.metadataColumns.map { column =>
toCatalystRow(
column.name,
column.dataType.simpleString,
Option(column.comment()).getOrElse(""))
}
case _ =>
}
private def addPartitioning(rows: ArrayBuffer[InternalRow]): Unit = {
rows += emptyRow()
rows += toCatalystRow("# Partitioning", "", "")

View file

@ -96,13 +96,11 @@ object PushDownUtils extends PredicateHelper {
val exprs = projects ++ filters
val requiredColumns = AttributeSet(exprs.flatMap(_.references))
val neededOutput = relation.output.filter(requiredColumns.contains)
if (neededOutput != relation.output) {
r.pruneColumns(neededOutput.toStructType)
val scan = r.build()
// always project, in case the relation's output has been updated and doesn't match
// the underlying table schema
scan -> toOutputAttrs(scan.readSchema(), relation)
} else {
r.build() -> relation.output
}
case _ => scanBuilder.build() -> relation.output
}

View file

@ -139,6 +139,10 @@ class DataSourceV2SQLSuite
Array("# Partitioning", "", ""),
Array("Part 0", "id", ""),
Array("", "", ""),
Array("# Metadata Columns", "", ""),
Array("index", "string", "Metadata column used to conflict with a data column"),
Array("_partition", "string", "Partition key used to store the row"),
Array("", "", ""),
Array("# Detailed Table Information", "", ""),
Array("Name", "testcat.table_name", ""),
Array("Comment", "this is a test table", ""),
@ -2470,6 +2474,45 @@ class DataSourceV2SQLSuite
}
}
test("SPARK-31255: Project a metadata column") {
val t1 = s"${catalogAndNamespace}table"
withTable(t1) {
sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format " +
"PARTITIONED BY (bucket(4, id), id)")
sql(s"INSERT INTO $t1 VALUES (1, 'a'), (2, 'b'), (3, 'c')")
checkAnswer(
spark.sql(s"SELECT id, data, _partition FROM $t1"),
Seq(Row(1, "a", "3/1"), Row(2, "b", "2/2"), Row(3, "c", "2/3")))
}
}
test("SPARK-31255: Projects data column when metadata column has the same name") {
val t1 = s"${catalogAndNamespace}table"
withTable(t1) {
sql(s"CREATE TABLE $t1 (index bigint, data string) USING $v2Format " +
"PARTITIONED BY (bucket(4, index), index)")
sql(s"INSERT INTO $t1 VALUES (3, 'c'), (2, 'b'), (1, 'a')")
checkAnswer(
spark.sql(s"SELECT index, data, _partition FROM $t1"),
Seq(Row(3, "c", "2/3"), Row(2, "b", "2/2"), Row(1, "a", "3/1")))
}
}
test("SPARK-31255: * expansion does not include metadata columns") {
val t1 = s"${catalogAndNamespace}table"
withTable(t1) {
sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format " +
"PARTITIONED BY (bucket(4, id), id)")
sql(s"INSERT INTO $t1 VALUES (3, 'c'), (2, 'b'), (1, 'a')")
checkAnswer(
spark.sql(s"SELECT * FROM $t1"),
Seq(Row(3, "c"), Row(2, "b"), Row(1, "a")))
}
}
private def testNotSupportedV2Command(sqlCommand: String, sqlParams: String): Unit = {
val e = intercept[AnalysisException] {
sql(s"$sqlCommand $sqlParams")