This PR introduces several major changes:
1. Replacing `Expression.prettyString` with `Expression.sql`
The `prettyString` method is mostly an internal, developer faced facility for debugging purposes, and shouldn't be exposed to users.
1. Using SQL-like representation as column names for selected fields that are not named expression (back-ticks and double quotes should be removed)
Before, we were using `prettyString` as column names when possible, and sometimes the result column names can be weird. Here are several examples:
Expression | `prettyString` | `sql` | Note
------------------ | -------------- | ---------- | ---------------
`a && b` | `a && b` | `a AND b` |
`a.getField("f")` | `a[f]` | `a.f` | `a` is a struct
1. Adding trait `NonSQLExpression` extending from `Expression` for expressions that don't have a SQL representation (e.g. Scala UDF/UDAF and Java/Scala object expressions used for encoders)
`NonSQLExpression.sql` may return an arbitrary user facing string representation of the expression.
Author: Cheng Lian <lian@databricks.com>
Closes#10757 from liancheng/spark-12799.simplify-expression-string-methods.
```scala
// case 1: missing sort columns are resolvable if join is true
sql("SELECT explode(a) AS val, b FROM data WHERE b < 2 order by val, c")
// case 2: missing sort columns are not resolvable if join is false. Thus, issue an error message in this case
sql("SELECT explode(a) AS val FROM data order by val, c")
```
When sort columns are not in `Generate`, we can resolve them when `join` is equal to `true`. Still trying to add more test cases for the other `UnaryNode` types.
Could you review the changes? davies cloud-fan Thanks!
Author: gatorsmile <gatorsmile@gmail.com>
Closes#11198 from gatorsmile/missingInSort.
Conversion of outer joins, if the predicates in filter conditions can restrict the result sets so that all null-supplying rows are eliminated.
- `full outer` -> `inner` if both sides have such predicates
- `left outer` -> `inner` if the right side has such predicates
- `right outer` -> `inner` if the left side has such predicates
- `full outer` -> `left outer` if only the left side has such predicates
- `full outer` -> `right outer` if only the right side has such predicates
If applicable, this can greatly improve the performance, since outer join is much slower than inner join, full outer join is much slower than left/right outer join.
The original PR is https://github.com/apache/spark/pull/10542
Author: gatorsmile <gatorsmile@gmail.com>
Author: xiaoli <lixiao1983@gmail.com>
Author: Xiao Li <xiaoli@Xiaos-MacBook-Pro.local>
Closes#10567 from gatorsmile/outerJoinEliminationByFilterCond.
This PR adds support for rewriting constraints if there are aliases in the query plan. For e.g., if there is a query of form `SELECT a, a AS b`, any constraints on `a` now also apply to `b`.
JIRA: https://issues.apache.org/jira/browse/SPARK-13091
cc marmbrus
Author: Sameer Agarwal <sameer@databricks.com>
Closes#11144 from sameeragarwal/alias.
JIRA: https://issues.apache.org/jira/browse/SPARK-13384
## What changes were proposed in this pull request?
When we de-duplicate attributes in Analyzer, we create new attributes. However, we don't keep original qualifiers. Some plans will be failed to analysed. We should keep original qualifiers in new attributes.
## How was the this patch tested?
Unit test is added.
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#11261 from viirya/keep-attr-qualifiers.
`rand` and `randn` functions with a `seed` argument are commonly used. Based on the common sense, the results of `rand` and `randn` should be deterministic if the `seed` parameter value is provided. For example, in MS SQL Server, it also has a function `rand`. Regarding the parameter `seed`, the description is like: ```Seed is an integer expression (tinyint, smallint, or int) that gives the seed value. If seed is not specified, the SQL Server Database Engine assigns a seed value at random. For a specified seed value, the result returned is always the same.```
Update: the current implementation is unable to generate deterministic results when the partitions are not fixed. This PR documents this issue in the function descriptions.
jkbradley hit an issue and provided an example in the following JIRA: https://issues.apache.org/jira/browse/SPARK-13333
Author: gatorsmile <gatorsmile@gmail.com>
Closes#11232 from gatorsmile/randSeed.
Currently, the columns in projects of Expand that are not used by Aggregate are not pruned, this PR fix that.
Author: Davies Liu <davies@databricks.com>
Closes#11225 from davies/fix_pruning_expand.
Add `LazilyGenerateOrdering` to support generated ordering for `RangePartitioner` of `Exchange` instead of `InterpretedOrdering`.
Author: Takuya UESHIN <ueshin@happy-camper.st>
Closes#10894 from ueshin/issues/SPARK-12976.
Using GroupingSets will generate a wrong result when Aggregate Functions containing GroupBy columns.
This PR is to fix it. Since the code changes are very small. Maybe we also can merge it to 1.6
For example, the following query returns a wrong result:
```scala
sql("select course, sum(earnings) as sum from courseSales group by course, earnings" +
" grouping sets((), (course), (course, earnings))" +
" order by course, sum").show()
```
Before the fix, the results are like
```
[null,null]
[Java,null]
[Java,20000.0]
[Java,30000.0]
[dotNET,null]
[dotNET,5000.0]
[dotNET,10000.0]
[dotNET,48000.0]
```
After the fix, the results become correct:
```
[null,113000.0]
[Java,20000.0]
[Java,30000.0]
[Java,50000.0]
[dotNET,5000.0]
[dotNET,10000.0]
[dotNET,48000.0]
[dotNET,63000.0]
```
UPDATE: This PR also deprecated the external column: GROUPING__ID.
Author: gatorsmile <gatorsmile@gmail.com>
Closes#11100 from gatorsmile/groupingSets.
This patch adds a new optimizer rule for performing limit pushdown. Limits will now be pushed down in two cases:
- If a limit is on top of a `UNION ALL` operator, then a partition-local limit operator will be pushed to each of the union operator's children.
- If a limit is on top of an `OUTER JOIN` then a partition-local limit will be pushed to one side of the join. For `LEFT OUTER` and `RIGHT OUTER` joins, the limit will be pushed to the left and right side, respectively. For `FULL OUTER` join, we will only push limits when at most one of the inputs is already limited: if one input is limited we will push a smaller limit on top of it and if neither input is limited then we will limit the input which is estimated to be larger.
These optimizations were proposed previously by gatorsmile in #10451 and #10454, but those earlier PRs were closed and deferred for later because at that time Spark's physical `Limit` operator would trigger a full shuffle to perform global limits so there was a chance that pushdowns could actually harm performance by causing additional shuffles/stages. In #7334, we split the `Limit` operator into separate `LocalLimit` and `GlobalLimit` operators, so we can now push down only local limits (which don't require extra shuffles). This patch is based on both of gatorsmile's patches, with changes and simplifications due to partition-local-limiting.
When we push down the limit, we still keep the original limit in place, so we need a mechanism to ensure that the optimizer rule doesn't keep pattern-matching once the limit has been pushed down. In order to handle this, this patch adds a `maxRows` method to `SparkPlan` which returns the maximum number of rows that the plan can compute, then defines the pushdown rules to only push limits to children if the children's maxRows are greater than the limit's maxRows. This idea is carried over from #10451; see that patch for additional discussion.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#11121 from JoshRosen/limit-pushdown-2.
The java `Calendar` object is expensive to create. I have a sub query like this `SELECT a, b, c FROM table UV WHERE (datediff(UV.visitDate, '1997-01-01')>=0 AND datediff(UV.visitDate, '2015-01-01')<=0))`
The table stores `visitDate` as String type and has 3 billion records. A `Calendar` object is created every time `DateTimeUtils.stringToDate` is called. By reusing the `Calendar` object, I saw about 20 seconds performance improvement for this stage.
Author: Carson Wang <carson.wang@intel.com>
Closes#11090 from carsonwang/SPARK-13185.
The current implementation of ResolveSortReferences can only push one missing attributes into it's child, it failed to analyze TPCDS Q98, because of there are two missing attributes in that (one from Window, another from Aggregate).
Author: Davies Liu <davies@databricks.com>
Closes#11153 from davies/resolve_sort.
JIRA: https://issues.apache.org/jira/browse/SPARK-13277
There is an ANTLR warning during compilation:
warning(200): org/apache/spark/sql/catalyst/parser/SparkSqlParser.g:938:7:
Decision can match input such as "KW_USING Identifier" using multiple alternatives: 2, 3
As a result, alternative(s) 3 were disabled for that input
This patch is to fix it.
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#11168 from viirya/fix-parser-using.
The parser currently parses the following strings without a hitch:
* Table Identifier:
* `a.b.c` should fail, but results in the following table identifier `a.b`
* `table!#` should fail, but results in the following table identifier `table`
* Expression
* `1+2 r+e` should fail, but results in the following expression `1 + 2`
This PR fixes this by adding terminated rules for both expression parsing and table identifier parsing.
cc cloud-fan (we discussed this in https://github.com/apache/spark/pull/10649) jayadevanmurali (this causes your PR https://github.com/apache/spark/pull/11051 to fail)
Author: Herman van Hovell <hvanhovell@questtec.nl>
Closes#11159 from hvanhovell/SPARK-13276.
Grouping() returns a column is aggregated or not, grouping_id() returns the aggregation levels.
grouping()/grouping_id() could be used with window function, but does not work in having/sort clause, will be fixed by another PR.
The GROUPING__ID/grouping_id() in Hive is wrong (according to docs), we also did it wrongly, this PR change that to match the behavior in most databases (also the docs of Hive).
Author: Davies Liu <davies@databricks.com>
Closes#10677 from davies/grouping.
Some analysis rules generate aliases or auxiliary attribute references with the same name but different expression IDs. For example, `ResolveAggregateFunctions` introduces `havingCondition` and `aggOrder`, and `DistinctAggregationRewriter` introduces `gid`.
This is OK for normal query execution since these attribute references get expression IDs. However, it's troublesome when converting resolved query plans back to SQL query strings since expression IDs are erased.
Here's an example Spark 1.6.0 snippet for illustration:
```scala
sqlContext.range(10).select('id as 'a, 'id as 'b).registerTempTable("t")
sqlContext.sql("SELECT SUM(a) FROM t GROUP BY a, b ORDER BY COUNT(a), COUNT(b)").explain(true)
```
The above code produces the following resolved plan:
```
== Analyzed Logical Plan ==
_c0: bigint
Project [_c0#101L]
+- Sort [aggOrder#102L ASC,aggOrder#103L ASC], true
+- Aggregate [a#47L,b#48L], [(sum(a#47L),mode=Complete,isDistinct=false) AS _c0#101L,(count(a#47L),mode=Complete,isDistinct=false) AS aggOrder#102L,(count(b#48L),mode=Complete,isDistinct=false) AS aggOrder#103L]
+- Subquery t
+- Project [id#46L AS a#47L,id#46L AS b#48L]
+- LogicalRDD [id#46L], MapPartitionsRDD[44] at range at <console>:26
```
Here we can see that both aggregate expressions in `ORDER BY` are extracted into an `Aggregate` operator, and both of them are named `aggOrder` with different expression IDs.
The solution is to automatically add the expression IDs into the attribute name for the Alias and AttributeReferences that are generated by Analyzer in SQL Generation.
In this PR, it also resolves another issue. Users could use the same name as the internally generated names. The duplicate names should not cause name ambiguity. When resolving the column, Catalyst should not pick the column that is internally generated.
Could you review the solution? marmbrus liancheng
I did not set the newly added flag for all the alias and attribute reference generated by Analyzers. Please let me know if I should do it? Thank you!
Author: gatorsmile <gatorsmile@gmail.com>
Closes#11050 from gatorsmile/namingConflicts.
This PR improve the lookup of BytesToBytesMap by:
1. Generate code for calculate the hash code of grouping keys.
2. Do not use MemoryLocation, fetch the baseObject and offset for key and value directly (remove the indirection).
Author: Davies Liu <davies@databricks.com>
Closes#11010 from davies/gen_map.
Adds the benchmark results as comments.
The codegen version is slower than the interpreted version for `simple` case becasue of 3 reasons:
1. codegen version use a more complex hash algorithm than interpreted version, i.e. `Murmur3_x86_32.hashInt` vs [simple multiplication and addition](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala#L153).
2. codegen version will write the hash value to a row first and then read it out. I tried to create a `GenerateHasher` that can generate code to return hash value directly and got about 60% speed up for the `simple` case, does it worth?
3. the row in `simple` case only has one int field, so the runtime reflection may be removed because of branch prediction, which makes the interpreted version faster.
The `array` case is also slow for similar reasons, e.g. array elements are of same type, so interpreted version can probably get rid of runtime reflection by branch prediction.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#10917 from cloud-fan/hash-benchmark.
nullability should only be considered as an optimization rather than part of the type system, so instead of failing analysis for mismatch nullability, we should pass analysis and add runtime null check.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#11035 from cloud-fan/ignore-nullability.
This patch changes the implementation of the physical `Limit` operator so that it relies on the `Exchange` operator to perform data movement rather than directly using `ShuffledRDD`. In addition to improving efficiency, this lays the necessary groundwork for further optimization of limit, such as limit pushdown or whole-stage codegen.
At a high-level, this replaces the old physical `Limit` operator with two new operators, `LocalLimit` and `GlobalLimit`. `LocalLimit` performs per-partition limits, while `GlobalLimit` applies the final limit to a single partition; `GlobalLimit`'s declares that its `requiredInputDistribution` is `SinglePartition`, which will cause the planner to use an `Exchange` to perform the appropriate shuffles. Thus, a logical `Limit` appearing in the middle of a query plan will be expanded into `LocalLimit -> Exchange to one partition -> GlobalLimit`.
In the old code, calling `someDataFrame.limit(100).collect()` or `someDataFrame.take(100)` would actually skip the shuffle and use a fast-path which used `executeTake()` in order to avoid computing all partitions in case only a small number of rows were requested. This patch preserves this optimization by treating logical `Limit` operators specially when they appear as the terminal operator in a query plan: if a `Limit` is the final operator, then we will plan a special `CollectLimit` physical operator which implements the old `take()`-based logic.
In order to be able to match on operators only at the root of the query plan, this patch introduces a special `ReturnAnswer` logical operator which functions similar to `BroadcastHint`: this dummy operator is inserted at the root of the optimized logical plan before invoking the physical planner, allowing the planner to pattern-match on it.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#7334 from JoshRosen/remove-copy-in-limit.
Trivial search-and-replace to eliminate deprecation warnings in Scala 2.11.
Also works with 2.10
Author: Jakob Odersky <jakob@odersky.com>
Closes#11085 from jodersky/SPARK-13171.
https://issues.apache.org/jira/browse/SPARK-12939
Now we will catch `ObjectOperator` in `Analyzer` and resolve the `fromRowExpression/deserializer` inside it. Also update the `MapGroups` and `CoGroup` to pass in `dataAttributes`, so that we can correctly resolve value deserializer(the `child.output` contains both groupking key and values, which may mess things up if they have same-name attribtues). End-to-end tests are added.
follow-ups:
* remove encoders from typed aggregate expression.
* completely remove resolve/bind in `ExpressionEncoder`
Author: Wenchen Fan <wenchen@databricks.com>
Closes#10852 from cloud-fan/bug.
This patch incorporates review feedback from #11069, which is already merged.
Author: Andrew Or <andrew@databricks.com>
Closes#11080 from andrewor14/catalog-follow-ups.
Spark SQL should collapse adjacent `Repartition` operators and only keep the last one.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#11064 from JoshRosen/collapse-repartition.
This is a small addendum to #10762 to make the code more robust again future changes.
Author: Reynold Xin <rxin@databricks.com>
Closes#11070 from rxin/SPARK-12828-natural-join.
This is a step towards consolidating `SQLContext` and `HiveContext`.
This patch extends the existing Catalog API added in #10982 to include methods for handling table partitions. In particular, a partition is identified by `PartitionSpec`, which is just a `Map[String, String]`. The Catalog is still not used by anything yet, but its API is now more or less complete and an implementation is fully tested.
About 200 lines are test code.
Author: Andrew Or <andrew@databricks.com>
Closes#11069 from andrewor14/catalog.
The ```SparkSqlLexer``` currently swallows characters which have not been defined in the grammar. This causes problems with SQL commands, such as: ```add jar file:///tmp/ab/TestUDTF.jar```. In this example the `````` is swallowed.
This PR adds an extra Lexer rule to handle such input, and makes a tiny modification to the ```ASTNode```.
cc davies liancheng
Author: Herman van Hovell <hvanhovell@questtec.nl>
Closes#11052 from hvanhovell/SPARK-13157.
Based on the semantics of a query, we can derive a number of data constraints on output of each (logical or physical) operator. For instance, if a filter defines `‘a > 10`, we know that the output data of this filter satisfies 2 constraints:
1. `‘a > 10`
2. `isNotNull(‘a)`
This PR proposes a possible way of keeping track of these constraints and propagating them in the logical plan, which can then help us build more advanced optimizations (such as pruning redundant filters, optimizing joins, among others). We define constraints as a set of (implicitly conjunctive) expressions. For e.g., if a filter operator has constraints = `Set(‘a > 10, ‘b < 100)`, it’s implied that the outputs satisfy both individual constraints (i.e., `‘a > 10` AND `‘b < 100`).
Design Document: https://docs.google.com/a/databricks.com/document/d/1WQRgDurUBV9Y6CWOBS75PQIqJwT-6WftVa18xzm7nCo/edit?usp=sharing
Author: Sameer Agarwal <sameer@databricks.com>
Closes#10844 from sameeragarwal/constraints.
1. try to avoid the suffix (unique id)
2. remove the comment if there is no code generated.
3. re-arrange the order of functions
4. trop the new line for inlined blocks.
Author: Davies Liu <davies@databricks.com>
Closes#11032 from davies/better_suffix.
when we generate map, we first randomly pick a length, then create a seq of key value pair with the expected length, and finally call `toMap`. However, `toMap` will remove all duplicated keys, which makes the actual map size much less than we expected.
This PR fixes this problem by put keys in a set first, to guarantee we have enough keys to build a map with expected length.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#10930 from cloud-fan/random-generator.
The example will throw error like
<console>:20: error: not found: value StructType
Need to add this line:
import org.apache.spark.sql.types._
Author: Kevin (Sangwoo) Kim <sangwookim.me@gmail.com>
Closes#10141 from swkimme/patch-1.
As benchmarked and discussed here: https://github.com/apache/spark/pull/10786/files#r50038294, benefits from codegen, the declarative aggregate function could be much faster than imperative one.
Author: Davies Liu <davies@databricks.com>
Closes#10960 from davies/stddev.
Jira:
https://issues.apache.org/jira/browse/SPARK-13056
Create a map like
{ "a": "somestring", "b": null}
Query like
SELECT col["b"] FROM t1;
NPE would be thrown.
Author: Daoyuan Wang <daoyuan.wang@intel.com>
Closes#10964 from adrian-wang/npewriter.
This pull request creates an internal catalog API. The creation of this API is the first step towards consolidating SQLContext and HiveContext. I envision we will have two different implementations in Spark 2.0: (1) a simple in-memory implementation, and (2) an implementation based on the current HiveClient (ClientWrapper).
I took a look at what Hive's internal metastore interface/implementation, and then created this API based on it. I believe this is the minimal set needed in order to achieve all the needed functionality.
Author: Reynold Xin <rxin@databricks.com>
Closes#10982 from rxin/SPARK-13078.
This includes: float, boolean, short, decimal and calendar interval.
Decimal is mapped to long or byte array depending on the size and calendar
interval is mapped to a struct of int and long.
The only remaining type is map. The schema mapping is straightforward but
we might want to revisit how we deal with this in the rest of the execution
engine.
Author: Nong Li <nong@databricks.com>
Closes#10961 from nongli/spark-13043.
JIRA: https://issues.apache.org/jira/browse/SPARK-12705
**Scope:**
This PR is a general fix for sorting reference resolution when the child's `outputSet` does not have the order-by attributes (called, *missing attributes*):
- UnaryNode support is limited to `Project`, `Window`, `Aggregate`, `Distinct`, `Filter`, `RepartitionByExpression`.
- We will not try to resolve the missing references inside a subquery, unless the outputSet of this subquery contains it.
**General Reference Resolution Rules:**
- Jump over the nodes with the following types: `Distinct`, `Filter`, `RepartitionByExpression`. Do not need to add missing attributes. The reason is their `outputSet` is decided by their `inputSet`, which is the `outputSet` of their children.
- Group-by expressions in `Aggregate`: missing order-by attributes are not allowed to be added into group-by expressions since it will change the query result. Thus, in RDBMS, it is not allowed.
- Aggregate expressions in `Aggregate`: if the group-by expressions in `Aggregate` contains the missing attributes but aggregate expressions do not have it, just add them into the aggregate expressions. This can resolve the analysisExceptions thrown by the three TCPDS queries.
- `Project` and `Window` are special. We just need to add the missing attributes to their `projectList`.
**Implementation:**
1. Traverse the whole tree in a pre-order manner to find all the resolvable missing order-by attributes.
2. Traverse the whole tree in a post-order manner to add the found missing order-by attributes to the node if their `inputSet` contains the attributes.
3. If the origins of the missing order-by attributes are different nodes, each pass only resolves the missing attributes that are from the same node.
**Risk:**
Low. This rule will be trigger iff ```!s.resolved && child.resolved``` is true. Thus, very few cases are affected.
Author: gatorsmile <gatorsmile@gmail.com>
Closes#10678 from gatorsmile/sortWindows.
JIRA: https://issues.apache.org/jira/browse/SPARK-12989
In the rule `ExtractWindowExpressions`, we simply replace alias by the corresponding attribute. However, this will cause an issue exposed by the following case:
```scala
val data = Seq(("a", "b", "c", 3), ("c", "b", "a", 3)).toDF("A", "B", "C", "num")
.withColumn("Data", struct("A", "B", "C"))
.drop("A")
.drop("B")
.drop("C")
val winSpec = Window.partitionBy("Data.A", "Data.B").orderBy($"num".desc)
data.select($"*", max("num").over(winSpec) as "max").explain(true)
```
In this case, both `Data.A` and `Data.B` are `alias` in `WindowSpecDefinition`. If we replace these alias expression by their alias names, we are unable to know what they are since they will not be put in `missingExpr` too.
Author: gatorsmile <gatorsmile@gmail.com>
Author: xiaoli <lixiao1983@gmail.com>
Author: Xiao Li <xiaoli@Xiaos-MacBook-Pro.local>
Closes#10963 from gatorsmile/seletStarAfterColDrop.
The current implementation is sub-optimal:
* If an expression is always nullable, e.g. `Unhex`, we can still remove null check for children if they are not nullable.
* If an expression has some non-nullable children, we can still remove null check for these children and keep null check for others.
This PR improves this by making the null check elimination more fine-grained.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#10987 from cloud-fan/null-check.
JIRA: https://issues.apache.org/jira/browse/SPARK-12689
DDLParser processes three commands: createTable, describeTable and refreshTable.
This patch migrates the three commands to newly absorbed parser.
Author: Liang-Chi Hsieh <viirya@gmail.com>
Author: Liang-Chi Hsieh <viirya@appier.com>
Closes#10723 from viirya/migrate-ddl-describe.
Make sure we throw better error messages when Parquet schema merging fails.
Author: Cheng Lian <lian@databricks.com>
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#10979 from viirya/schema-merging-failure-message.
In jdk1.7 TimeZone.getTimeZone() is synchronized, so use an instance variable to hold an GMT TimeZone object instead of instantiate it every time.
Author: wangyang <wangyang@haizhi.com>
Closes#10994 from wangyang1992/datetimeUtil.
This patch changes Spark's build to make Scala 2.11 the default Scala version. To be clear, this does not mean that Spark will stop supporting Scala 2.10: users will still be able to compile Spark for Scala 2.10 by following the instructions on the "Building Spark" page; however, it does mean that Scala 2.11 will be the default Scala version used by our CI builds (including pull request builds).
The Scala 2.11 compiler is faster than 2.10, so I think we'll be able to look forward to a slight speedup in our CI builds (it looks like it's about 2X faster for the Maven compile-only builds, for instance).
After this patch is merged, I'll update Jenkins to add new compile-only jobs to ensure that Scala 2.10 compilation doesn't break.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#10608 from JoshRosen/SPARK-6363.