[SPARK-15051][SQL] Create a TypedColumn alias

## What changes were proposed in this pull request?

Currently when we create an alias against a TypedColumn from user-defined Aggregator(for example: agg(aggSum.toColumn as "a")), spark is using the alias' function from Column( as), the alias function will return a column contains a TypedAggregateExpression, which is unresolved because the inputDeserializer is not defined. Later the aggregator function (agg) will inject the inputDeserializer back to the TypedAggregateExpression, but only if the aggregate columns are TypedColumn, in the above case, the TypedAggregateExpression will remain unresolved because it is under column and caused the
problem reported by this jira [15051](https://issues.apache.org/jira/browse/SPARK-15051?jql=project%20%3D%20SPARK).

This PR propose to create an alias function for TypedColumn,  it will return a TypedColumn. It is using the similar code path  as Column's alia function.

For the spark build in aggregate function, like max, it is working with alias, for example

val df1 = Seq(1 -> "a", 2 -> "b", 3 -> "b").toDF("i", "j")
checkAnswer(df1.agg(max("j") as "b"), Row(3) :: Nil)

Thanks for comments.

## How was this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)

Add test cases in DatasetAggregatorSuite.scala
run the sql related queries against this patch.

Author: Kevin Yu <qyu@us.ibm.com>

Closes #12893 from kevinyu98/spark-15051.
This commit is contained in:
Kevin Yu 2016-05-07 11:13:48 +08:00 committed by Wenchen Fan
parent a21a3bbe69
commit 607a27a0d1
2 changed files with 21 additions and 6 deletions

View file

@ -68,6 +68,18 @@ class TypedColumn[-T, U](
}
new TypedColumn[T, U](newExpr, encoder)
}
/**
* Gives the TypedColumn a name (alias).
* If the current TypedColumn has metadata associated with it, this metadata will be propagated
* to the new column.
*
* @group expr_ops
* @since 2.0.0
*/
override def name(alias: String): TypedColumn[T, U] =
new TypedColumn[T, U](super.name(alias).expr, encoder)
}
/**
@ -910,12 +922,7 @@ class Column(protected[sql] val expr: Expression) extends Logging {
* @group expr_ops
* @since 1.3.0
*/
def as(alias: Symbol): Column = withExpr {
expr match {
case ne: NamedExpression => Alias(expr, alias.name)(explicitMetadata = Some(ne.metadata))
case other => Alias(other, alias.name)()
}
}
def as(alias: Symbol): Column = name(alias.name)
/**
* Gives the column an alias with metadata.

View file

@ -232,4 +232,12 @@ class DatasetAggregatorSuite extends QueryTest with SharedSQLContext {
"a" -> Seq(1, 2)
)
}
test("spark-15051 alias of aggregator in DataFrame/Dataset[Row]") {
val df1 = Seq(1 -> "a", 2 -> "b", 3 -> "b").toDF("i", "j")
checkAnswer(df1.agg(RowAgg.toColumn as "b"), Row(6) :: Nil)
val df2 = Seq(1 -> "a", 2 -> "b", 3 -> "b").toDF("i", "j")
checkAnswer(df2.agg(RowAgg.toColumn as "b").select("b"), Row(6) :: Nil)
}
}