[SPARK-20576][SQL] Support generic hint function in Dataset/DataFrame

## What changes were proposed in this pull request?
We allow users to specify hints (currently only "broadcast" is supported) in SQL and DataFrame. However, while SQL has a standard hint format (/*+ ... */), DataFrame doesn't have one and sometimes users are confused that they can't find how to apply a broadcast hint. This ticket adds a generic hint function on DataFrame that allows using the same hint on DataFrames as well as SQL.

As an example, after this patch, the following will apply a broadcast hint on a DataFrame using the new hint function:

```
df1.join(df2.hint("broadcast"))
```

## How was this patch tested?
Added a test case in DataFrameJoinSuite.

Author: Reynold Xin <rxin@databricks.com>

Closes #17839 from rxin/SPARK-20576.
This commit is contained in:
Reynold Xin 2017-05-03 09:22:25 -07:00
parent 27f543b15f
commit 527fc5d0c9
3 changed files with 40 additions and 2 deletions

View file

@ -86,7 +86,13 @@ object ResolveHints {
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
case h: Hint if BROADCAST_HINT_NAMES.contains(h.name.toUpperCase(Locale.ROOT)) =>
applyBroadcastHint(h.child, h.parameters.toSet)
if (h.parameters.isEmpty) {
// If there is no table alias specified, turn the entire subtree into a BroadcastHint.
BroadcastHint(h.child)
} else {
// Otherwise, find within the subtree query plans that should be broadcasted.
applyBroadcastHint(h.child, h.parameters.toSet)
}
}
}

View file

@ -1160,6 +1160,22 @@ class Dataset[T] private[sql](
*/
def apply(colName: String): Column = col(colName)
/**
* Specifies some hint on the current Dataset. As an example, the following code specifies
* that one of the plan can be broadcasted:
*
* {{{
* df1.join(df2.hint("broadcast"))
* }}}
*
* @group basic
* @since 2.2.0
*/
@scala.annotation.varargs
def hint(name: String, parameters: String*): Dataset[T] = withTypedPlan {
Hint(name, parameters, logicalPlan)
}
/**
* Selects column based on the column name and return it as a [[Column]].
*

View file

@ -151,7 +151,7 @@ class DataFrameJoinSuite extends QueryTest with SharedSQLContext {
Row(1, 1, 1, 1) :: Row(2, 1, 2, 2) :: Nil)
}
test("broadcast join hint") {
test("broadcast join hint using broadcast function") {
val df1 = Seq((1, "1"), (2, "2")).toDF("key", "value")
val df2 = Seq((1, "1"), (2, "2")).toDF("key", "value")
@ -174,6 +174,22 @@ class DataFrameJoinSuite extends QueryTest with SharedSQLContext {
}
}
test("broadcast join hint using Dataset.hint") {
// make sure a giant join is not broadcastable
val plan1 =
spark.range(10e10.toLong)
.join(spark.range(10e10.toLong), "id")
.queryExecution.executedPlan
assert(plan1.collect { case p: BroadcastHashJoinExec => p }.size == 0)
// now with a hint it should be broadcasted
val plan2 =
spark.range(10e10.toLong)
.join(spark.range(10e10.toLong).hint("broadcast"), "id")
.queryExecution.executedPlan
assert(plan2.collect { case p: BroadcastHashJoinExec => p }.size == 1)
}
test("join - outer join conversion") {
val df = Seq((1, 2, "1"), (3, 4, "3")).toDF("int", "int2", "str").as("a")
val df2 = Seq((1, 3, "1"), (5, 6, "5")).toDF("int", "int2", "str").as("b")