This is the initial work for whole stage codegen, it support Projection/Filter/Range, we will continue work on this to support more physical operators.
A micro benchmark show that a query with range, filter and projection could be 3X faster then before.
It's turned on by default. For a tree that have at least two chained plans, a WholeStageCodegen will be inserted into it, for example, the following plan
```
Limit 10
+- Project [(id#5L + 1) AS (id + 1)#6L]
+- Filter ((id#5L & 1) = 1)
+- Range 0, 1, 4, 10, [id#5L]
```
will be translated into
```
Limit 10
+- WholeStageCodegen
+- Project [(id#1L + 1) AS (id + 1)#2L]
+- Filter ((id#1L & 1) = 1)
+- Range 0, 1, 4, 10, [id#1L]
```
Here is the call graph to generate Java source for A and B (A support codegen, but B does not):
```
* WholeStageCodegen Plan A FakeInput Plan B
* =========================================================================
*
* -> execute()
* |
* doExecute() --------> produce()
* |
* doProduce() -------> produce()
* |
* doProduce() ---> execute()
* |
* consume()
* doConsume() ------------|
* |
* doConsume() <----- consume()
```
A SparkPlan that support codegen need to implement doProduce() and doConsume():
```
def doProduce(ctx: CodegenContext): (RDD[InternalRow], String)
def doConsume(ctx: CodegenContext, child: SparkPlan, input: Seq[ExprCode]): String
```
Author: Davies Liu <davies@databricks.com>
Closes#10735 from davies/whole2.
We iterate the bytes to calculate hashCode before, but now we have `Murmur3_x86_32.hashUnsafeBytes` that don't require the bytes to be word algned, we should use that instead.
A simple benchmark shows it's about 3 X faster, benchmark code: https://gist.github.com/cloud-fan/fa77713ccebf0823b2ab#file-arrayhashbenchmark-scala
Author: Wenchen Fan <wenchen@databricks.com>
Closes#10784 from cloud-fan/array-hashcode.
In this PR the new CatalystQl parser stack reaches grammar parity with the old Parser-Combinator based SQL Parser. This PR also replaces all uses of the old Parser, and removes it from the code base.
Although the existing Hive and SQL parser dialects were mostly the same, some kinks had to be worked out:
- The SQL Parser allowed syntax like ```APPROXIMATE(0.01) COUNT(DISTINCT a)```. In order to make this work we needed to hardcode approximate operators in the parser, or we would have to create an approximate expression. ```APPROXIMATE_COUNT_DISTINCT(a, 0.01)``` would also do the job and is much easier to maintain. So, this PR **removes** this keyword.
- The old SQL Parser supports ```LIMIT``` clauses in nested queries. This is **not supported** anymore. See https://github.com/apache/spark/pull/10689 for the rationale for this.
- Hive has a charset name char set literal combination it supports, for instance the following expression ```_ISO-8859-1 0x4341464562616265``` would yield this string: ```CAFEbabe```. Hive will only allow charset names to start with an underscore. This is quite annoying in spark because as soon as you use a tuple names will start with an underscore. In this PR we **remove** this feature from the parser. It would be quite easy to implement such a feature as an Expression later on.
- Hive and the SQL Parser treat decimal literals differently. Hive will turn any decimal into a ```Double``` whereas the SQL Parser would convert a non-scientific decimal into a ```BigDecimal```, and would turn a scientific decimal into a Double. We follow Hive's behavior here. The new parser supports a big decimal literal, for instance: ```81923801.42BD```, which can be used when a big decimal is needed.
cc rxin viirya marmbrus yhuai cloud-fan
Author: Herman van Hovell <hvanhovell@questtec.nl>
Closes#10745 from hvanhovell/SPARK-12575-2.
We made it a `NamedExpression` to workaroud some hacky cases long time ago, and now seems it's safe to remove it.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#10765 from cloud-fan/minor.
The goal of this PR is to eliminate unnecessary translations when there are back-to-back `MapPartitions` operations. In order to achieve this I also made the following simplifications:
- Operators no longer have hold encoders, instead they have only the expressions that they need. The benefits here are twofold: the expressions are visible to transformations so go through the normal resolution/binding process. now that they are visible we can change them on a case by case basis.
- Operators no longer have type parameters. Since the engine is responsible for its own type checking, having the types visible to the complier was an unnecessary complication. We still leverage the scala compiler in the companion factory when constructing a new operator, but after this the types are discarded.
Deferred to a follow up PR:
- Remove as much of the resolution/binding from Dataset/GroupedDataset as possible. We should still eagerly check resolution and throw an error though in the case of mismatches for an `as` operation.
- Eliminate serializations in more cases by adding more cases to `EliminateSerialization`
Author: Michael Armbrust <michael@databricks.com>
Closes#10747 from marmbrus/encoderExpressions.
The generated code for CaseWhen uses a control variable "got" to make sure we do not evaluate more branches once a branch is true. Changing that to generate just simple "if / else" would be slightly more efficient.
This closes#10737.
Author: Reynold Xin <rxin@databricks.com>
Closes#10755 from rxin/SPARK-12771.
This PR makes bucketing and exchange share one common hash algorithm, so that we can guarantee the data distribution is same between shuffle and bucketed data source, which enables us to only shuffle one side when join a bucketed table and a normal one.
This PR also fixes the tests that are broken by the new hash behaviour in shuffle.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#10703 from cloud-fan/use-hash-expr-in-shuffle.
This pull request rewrites CaseWhen expression to break the single, monolithic "branches" field into a sequence of tuples (Seq[(condition, value)]) and an explicit optional elseValue field.
Prior to this pull request, each even position in "branches" represents the condition for each branch, and each odd position represents the value for each branch. The use of them have been pretty confusing with a lot sliding windows or grouped(2) calls.
Author: Reynold Xin <rxin@databricks.com>
Closes#10734 from rxin/simplify-case.
Fix the style violation (space before , and :).
This PR is a followup for #10643 and rework of #10685 .
Author: Kousuke Saruta <sarutak@oss.nttdata.co.jp>
Closes#10732 from sarutak/SPARK-12692-followup-sql.
This patch removes CaseKeyWhen expression and replaces it with a factory method that generates the equivalent CaseWhen. This reduces the amount of code we'd need to maintain in the future for both code generation and optimizer.
Note that we introduced CaseKeyWhen to avoid duplicate evaluations of the key. This is no longer a problem because we now have common subexpression elimination.
Author: Reynold Xin <rxin@databricks.com>
Closes#10722 from rxin/SPARK-12768.
This pull request does a few small things:
1. Separated if simplification from BooleanSimplification and created a new rule SimplifyConditionals. In the future we can also simplify other conditional expressions here.
2. Added unit test for SimplifyConditionals.
3. Renamed SimplifyCaseConversionExpressionsSuite to SimplifyStringCaseConversionSuite
Author: Reynold Xin <rxin@databricks.com>
Closes#10716 from rxin/SPARK-12762.
Fix the style violation (space before , and :).
This PR is a followup for #10643.
Author: Kousuke Saruta <sarutak@oss.nttdata.co.jp>
Closes#10718 from sarutak/SPARK-12692-followup-sql.
Scala syntax allows binary case classes to be used as infix operator in pattern matching. This PR makes use of this syntax sugar to make `BooleanSimplification` more readable.
Author: Cheng Lian <lian@databricks.com>
Closes#10445 from liancheng/boolean-simplification-simplification.
The PR allows us to use the new SQL parser to parse SQL expressions such as: ```1 + sin(x*x)```
We enable this functionality in this PR, but we will not start using this actively yet. This will be done as soon as we have reached grammar parity with the existing parser stack.
cc rxin
Author: Herman van Hovell <hvanhovell@questtec.nl>
Closes#10649 from hvanhovell/SPARK-12576.
Turn import ordering violations into build errors, plus a few adjustments
to account for how the checker behaves. I'm a little on the fence about
whether the existing code is right, but it's easier to appease the checker
than to discuss what's the more correct order here.
Plus a few fixes to imports that cropped in since my recent cleanups.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#10612 from vanzin/SPARK-3873-enable.
This PR tries to enable Spark SQL to convert resolved logical plans back to SQL query strings. For now, the major use case is to canonicalize Spark SQL native view support. The major entry point is `SQLBuilder.toSQL`, which returns an `Option[String]` if the logical plan is recognized.
The current version is still in WIP status, and is quite limited. Known limitations include:
1. The logical plan must be analyzed but not optimized
The optimizer erases `Subquery` operators, which contain necessary scope information for SQL generation. Future versions should be able to recover erased scope information by inserting subqueries when necessary.
1. The logical plan must be created using HiveQL query string
Query plans generated by composing arbitrary DataFrame API combinations are not supported yet. Operators within these query plans need to be rearranged into a canonical form that is more suitable for direct SQL generation. For example, the following query plan
```
Filter (a#1 < 10)
+- MetastoreRelation default, src, None
```
need to be canonicalized into the following form before SQL generation:
```
Project [a#1, b#2, c#3]
+- Filter (a#1 < 10)
+- MetastoreRelation default, src, None
```
Otherwise, the SQL generation process will have to handle a large number of special cases.
1. Only a fraction of expressions and basic logical plan operators are supported in this PR
Currently, 95.7% (1720 out of 1798) query plans in `HiveCompatibilitySuite` can be successfully converted to SQL query strings.
Known unsupported components are:
- Expressions
- Part of math expressions
- Part of string expressions (buggy?)
- Null expressions
- Calendar interval literal
- Part of date time expressions
- Complex type creators
- Special `NOT` expressions, e.g. `NOT LIKE` and `NOT IN`
- Logical plan operators/patterns
- Cube, rollup, and grouping set
- Script transformation
- Generator
- Distinct aggregation patterns that fit `DistinctAggregationRewriter` analysis rule
- Window functions
Support for window functions, generators, and cubes etc. will be added in follow-up PRs.
This PR leverages `HiveCompatibilitySuite` for testing SQL generation in a "round-trip" manner:
* For all select queries, we try to convert it back to SQL
* If the query plan is convertible, we parse the generated SQL into a new logical plan
* Run the new logical plan instead of the original one
If the query plan is inconvertible, the test case simply falls back to the original logic.
TODO
- [x] Fix failed test cases
- [x] Support for more basic expressions and logical plan operators (e.g. distinct aggregation etc.)
- [x] Comments and documentation
Author: Cheng Lian <lian@databricks.com>
Closes#10541 from liancheng/sql-generation.
JIRA: https://issues.apache.org/jira/browse/SPARK-12687
Some queries such as `(select 1 as a) union (select 2 as a)` can't work. This patch fixes it.
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#10660 from viirya/fix-union.
Use multi-line string literals for ExpressionDescription with ``// scalastyle:off line.size.limit`` and ``// scalastyle:on line.size.limit``
The policy is here, as describe at https://github.com/apache/spark/pull/10488
Let's use multi-line string literals. If we have to have a line with more than 100 characters, let's use ``// scalastyle:off line.size.limit`` and ``// scalastyle:on line.size.limit`` to just bypass the line number requirement.
Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>
Closes#10524 from kiszk/SPARK-12580.
To avoid to have a huge Java source (over 64K loc), that can't be compiled.
cc hvanhovell
Author: Davies Liu <davies@databricks.com>
Closes#10624 from davies/split_ident.
This PR moves a major part of the new SQL parser to Catalyst. This is a prelude to start using this parser for all of our SQL parsing. The following key changes have been made:
The ANTLR Parser & Supporting classes have been moved to the Catalyst project. They are now part of the ```org.apache.spark.sql.catalyst.parser``` package. These classes contained quite a bit of code that was originally from the Hive project, I have added aknowledgements whenever this applied. All Hive dependencies have been factored out. I have also taken this chance to clean-up the ```ASTNode``` class, and to improve the error handling.
The HiveQl object that provides the functionality to convert an AST into a LogicalPlan has been refactored into three different classes, one for every SQL sub-project:
- ```CatalystQl```: This implements Query and Expression parsing functionality.
- ```SparkQl```: This is a subclass of CatalystQL and provides SQL/Core only functionality such as Explain and Describe.
- ```HiveQl```: This is a subclass of ```SparkQl``` and this adds Hive-only functionality to the parser such as Analyze, Drop, Views, CTAS & Transforms. This class still depends on Hive.
cc rxin
Author: Herman van Hovell <hvanhovell@questtec.nl>
Closes#10583 from hvanhovell/SPARK-12575.
JIRA: https://issues.apache.org/jira/browse/SPARK-12439
In toCatalystArray, we should look at the data type returned by dataTypeFor instead of silentSchemaFor, to determine if the element is native type. An obvious problem is when the element is Option[Int] class, catalsilentSchemaFor will return Int, then we will wrongly recognize the element is native type.
There is another problem when using Option as array element. When we encode data like Seq(Some(1), Some(2), None) with encoder, we will use MapObjects to construct an array for it later. But in MapObjects, we don't check if the return value of lambdaFunction is null or not. That causes a bug that the decoded data for Seq(Some(1), Some(2), None) would be Seq(1, 2, -1), instead of Seq(1, 2, null).
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#10391 from viirya/fix-catalystarray.
address comments in #10435
This makes the API easier to use if user programmatically generate the call to hash, and they will get analysis exception if the arguments of hash is empty.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#10588 from cloud-fan/hash.
just write the arguments into unsafe row and use murmur3 to calculate hash code
Author: Wenchen Fan <wenchen@databricks.com>
Closes#10435 from cloud-fan/hash-expr.
The reader was previously not setting the row length meaning it was wrong if there were variable
length columns. This problem does not manifest usually, since the value in the column is correct and
projecting the row fixes the issue.
Author: Nong Li <nong@databricks.com>
Closes#10576 from nongli/spark-12589.
This PR enable cube/rollup as function, so they can be used as this:
```
select a, b, sum(c) from t group by rollup(a, b)
```
Author: Davies Liu <davies@databricks.com>
Closes#10522 from davies/rollup.
It is currently possible to change the values of the supposedly immutable ```GenericRow``` and ```GenericInternalRow``` classes. This is caused by the fact that scala's ArrayOps ```toArray``` (returned by calling ```toSeq```) will return the backing array instead of a copy. This PR fixes this problem.
This PR was inspired by https://github.com/apache/spark/pull/10374 by apo1.
cc apo1 sarutak marmbrus cloud-fan nongli (everyone in the previous conversation).
Author: Herman van Hovell <hvanhovell@questtec.nl>
Closes#10553 from hvanhovell/SPARK-12421.
Avoiding the the No such table exception and throwing analysis exception as per the bug: SPARK-12533
Author: thomastechs <thomas.sebastian@tcs.com>
Closes#10529 from thomastechs/topic-branch.
Right now, numFields will be passed in by pointTo(), then bitSetWidthInBytes is calculated, making pointTo() a little bit heavy.
It should be part of constructor of UnsafeRow.
Author: Davies Liu <davies@databricks.com>
Closes#10528 from davies/numFields.
Most of cases we should propagate null when call `NewInstance`, and so far there is only one case we should stop null propagation: create product/java bean. So I think it makes more sense to propagate null by dafault.
This also fixes a bug when encode null array/map, which is firstly discovered in https://github.com/apache/spark/pull/10401
Author: Wenchen Fan <wenchen@databricks.com>
Closes#10443 from cloud-fan/encoder.
```
org.apache.spark.sql.AnalysisException: cannot resolve 'value' given input columns text;
```
lets put a `:` after `columns` and put the columns in `[]` so that they match the toString of DataFrame.
Author: gatorsmile <gatorsmile@gmail.com>
Closes#10518 from gatorsmile/improveAnalysisExceptionMsg.
In Spark we allow UDFs to declare its expected input types in order to apply type coercion. The expected input type parameter takes a Seq[DataType] and uses Nil when no type coercion is applied. It makes more sense to take Option[Seq[DataType]] instead, so we can differentiate a no-arg function vs function with no expected input type specified.
Author: Reynold Xin <rxin@databricks.com>
Closes#10504 from rxin/SPARK-12549.
When explain any plan with Generate, we will see an exclamation mark in the plan. Normally, when we see this mark, it means the plan has an error. This PR is to correct the `missingInput` in `Generate`.
For example,
```scala
val df = Seq((1, "a b c"), (2, "a b"), (3, "a")).toDF("number", "letters")
val df2 =
df.explode('letters) {
case Row(letters: String) => letters.split(" ").map(Tuple1(_)).toSeq
}
df2.explain(true)
```
Before the fix, the plan is like
```
== Parsed Logical Plan ==
'Generate UserDefinedGenerator('letters), true, false, None
+- Project [_1#0 AS number#2,_2#1 AS letters#3]
+- LocalRelation [_1#0,_2#1], [[1,a b c],[2,a b],[3,a]]
== Analyzed Logical Plan ==
number: int, letters: string, _1: string
Generate UserDefinedGenerator(letters#3), true, false, None, [_1#8]
+- Project [_1#0 AS number#2,_2#1 AS letters#3]
+- LocalRelation [_1#0,_2#1], [[1,a b c],[2,a b],[3,a]]
== Optimized Logical Plan ==
Generate UserDefinedGenerator(letters#3), true, false, None, [_1#8]
+- LocalRelation [number#2,letters#3], [[1,a b c],[2,a b],[3,a]]
== Physical Plan ==
!Generate UserDefinedGenerator(letters#3), true, false, [number#2,letters#3,_1#8]
+- LocalTableScan [number#2,letters#3], [[1,a b c],[2,a b],[3,a]]
```
**Updates**: The same issues are also found in the other four Dataset operators: `MapPartitions`/`AppendColumns`/`MapGroups`/`CoGroup`. Fixed all these four.
Author: gatorsmile <gatorsmile@gmail.com>
Author: xiaoli <lixiao1983@gmail.com>
Author: Xiao Li <xiaoli@Xiaos-MacBook-Pro.local>
Closes#10393 from gatorsmile/generateExplain.
Moved (case) classes Strategy, Once, FixedPoint and Batch to the companion object. This is necessary if we want to have the Optimizer easily extendable in the following sense: Usually a user wants to add additional rules, and just take the ones that are already there. However, inner classes made that impossible since the code did not compile
This allows easy extension of existing Optimizers see the DefaultOptimizerExtendableSuite for a corresponding test case.
Author: Stephan Kessler <stephan.kessler@sap.com>
Closes#10174 from stephankessler/SPARK-7727.
Accessing null elements in an array field fails when tungsten is enabled.
It works in Spark 1.3.1, and in Spark > 1.5 with Tungsten disabled.
This PR solves this by checking if the accessed element in the array field is null, in the generated code.
Example:
```
// Array of String
case class AS( as: Seq[String] )
val dfAS = sc.parallelize( Seq( AS ( Seq("a",null,"b") ) ) ).toDF
dfAS.registerTempTable("T_AS")
for (i <- 0 to 2) { println(i + " = " + sqlContext.sql(s"select as[$i] from T_AS").collect.mkString(","))}
```
With Tungsten disabled:
```
0 = [a]
1 = [null]
2 = [b]
```
With Tungsten enabled:
```
0 = [a]
15/12/22 09:32:50 ERROR Executor: Exception in task 7.0 in stage 1.0 (TID 15)
java.lang.NullPointerException
at org.apache.spark.sql.catalyst.expressions.UnsafeRowWriters$UTF8StringWriter.getSize(UnsafeRowWriters.java:90)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
at org.apache.spark.sql.execution.TungstenProject$$anonfun$3$$anonfun$apply$3.apply(basicOperators.scala:90)
at org.apache.spark.sql.execution.TungstenProject$$anonfun$3$$anonfun$apply$3.apply(basicOperators.scala:88)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
```
Author: pierre-borckmans <pierre.borckmans@realimpactanalytics.com>
Closes#10429 from pierre-borckmans/SPARK-12477_Tungsten-Projection-Null-Element-In-Array.
When creating extractors for product types (i.e. case classes and tuples), a null check is missing, thus we always assume input product values are non-null.
This PR adds a null check in the extractor expression for product types. The null check is stripped off for top level product fields, which are mapped to the outermost `Row`s, since they can't be null.
Thanks cloud-fan for helping investigating this issue!
Author: Cheng Lian <lian@databricks.com>
Closes#10431 from liancheng/spark-12478.top-level-null-field.
Compare both left and right side of the case expression ignoring nullablity when checking for type equality.
Author: Dilip Biswal <dbiswal@us.ibm.com>
Closes#10156 from dilipbiswal/spark-12102.
First try, not sure how much information we need to provide in the usage part.
Author: Xiu Guo <xguo27@gmail.com>
Closes#10423 from xguo27/SPARK-12456.
This PR adds a new expression `AssertNotNull` to ensure non-nullable fields of products and case classes don't receive null values at runtime.
Author: Cheng Lian <lian@databricks.com>
Closes#10331 from liancheng/dataset-nullability-check.
Based on the suggestions from marmbrus , added logical/physical operators for Range for improving the performance.
Also added another API for resolving the JIRA Spark-12150.
Could you take a look at my implementation, marmbrus ? If not good, I can rework it. : )
Thank you very much!
Author: gatorsmile <gatorsmile@gmail.com>
Closes#10335 from gatorsmile/rangeOperators.
When a DataFrame or Dataset has a long schema, we should intelligently truncate to avoid flooding the screen with unreadable information.
// Standard output
[a: int, b: int]
// Truncate many top level fields
[a: int, b, string ... 10 more fields]
// Truncate long inner structs
[a: struct<a: Int ... 10 more fields>]
Author: Dilip Biswal <dbiswal@us.ibm.com>
Closes#10373 from dilipbiswal/spark-12398.
Now `StaticInvoke` receives `Any` as a object and `StaticInvoke` can be serialized but sometimes the object passed is not serializable.
For example, following code raises Exception because `RowEncoder#extractorsFor` invoked indirectly makes `StaticInvoke`.
```
case class TimestampContainer(timestamp: java.sql.Timestamp)
val rdd = sc.parallelize(1 to 2).map(_ => TimestampContainer(System.currentTimeMillis))
val df = rdd.toDF
val ds = df.as[TimestampContainer]
val rdd2 = ds.rdd <----------------- invokes extractorsFor indirectory
```
I'll add test cases.
Author: Kousuke Saruta <sarutak@oss.nttdata.co.jp>
Author: Michael Armbrust <michael@databricks.com>
Closes#10357 from sarutak/SPARK-12404.
This could simplify the generated code for expressions that is not nullable.
This PR fix lots of bugs about nullability.
Author: Davies Liu <davies@databricks.com>
Closes#10333 from davies/skip_nullable.
Description of the problem from cloud-fan
Actually this line: https://github.com/apache/spark/blob/branch-1.5/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala#L689
When we use `selectExpr`, we pass in `UnresolvedFunction` to `DataFrame.select` and fall in the last case. A workaround is to do special handling for UDTF like we did for `explode`(and `json_tuple` in 1.6), wrap it with `MultiAlias`.
Another workaround is using `expr`, for example, `df.select(expr("explode(a)").as(Nil))`, I think `selectExpr` is no longer needed after we have the `expr` function....
Author: Dilip Biswal <dbiswal@us.ibm.com>
Closes#9981 from dilipbiswal/spark-11619.
This PR removes Hive windows functions from Spark and replaces them with (native) Spark ones. The PR is on par with Hive in terms of features.
This has the following advantages:
* Better memory management.
* The ability to use spark UDAFs in Window functions.
cc rxin / yhuai
Author: Herman van Hovell <hvanhovell@questtec.nl>
Closes#9819 from hvanhovell/SPARK-8641-2.
I think it was a mistake, and we have not catched it so far until https://github.com/apache/spark/pull/10260 which begin to check if the `fromRowExpression` is resolved.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#10263 from cloud-fan/encoder.
Currently, we could generate different plans for query with single distinct (depends on spark.sql.specializeSingleDistinctAggPlanning), one works better on low cardinality columns, the other
works better for high cardinality column (default one).
This PR change to generate a single plan (three aggregations and two exchanges), which work better in both cases, then we could safely remove the flag `spark.sql.specializeSingleDistinctAggPlanning` (introduced in 1.6).
For a query like `SELECT COUNT(DISTINCT a) FROM table` will be
```
AGG-4 (count distinct)
Shuffle to a single reducer
Partial-AGG-3 (count distinct, no grouping)
Partial-AGG-2 (grouping on a)
Shuffle by a
Partial-AGG-1 (grouping on a)
```
This PR also includes large refactor for aggregation (reduce 500+ lines of code)
cc yhuai nongli marmbrus
Author: Davies Liu <davies@databricks.com>
Closes#10228 from davies/single_distinct.
in https://github.com/apache/spark/pull/10133 we found that, we shoud ensure the children of `TreeNode` are all accessible in the `productIterator`, or the behavior will be very confusing.
In this PR, I try to fix this problem by expsing the `loopVar`.
This also fixes SPARK-12131 which is caused by the hacky `MapObjects`.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#10239 from cloud-fan/map-objects.
Delays application of ResolvePivot until all aggregates are resolved to prevent problems with UnresolvedFunction and adds unit test
Author: Andrew Ray <ray.andrew@gmail.com>
Closes#10202 from aray/sql-pivot-unresolved-function.
This PR is to add three more data types into Encoder, including `BigDecimal`, `Date` and `Timestamp`.
marmbrus cloud-fan rxin Could you take a quick look at these three types? Not sure if it can be merged to 1.6. Thank you very much!
Author: gatorsmile <gatorsmile@gmail.com>
Closes#10188 from gatorsmile/dataTypesinEncoder.
checked with hive, greatest/least should cast their children to a tightest common type,
i.e. `(int, long) => long`, `(int, string) => error`, `(decimal(10,5), decimal(5, 10)) => error`
Author: Wenchen Fan <wenchen@databricks.com>
Closes#10196 from cloud-fan/type-coercion.
Currently, the order of joins is exactly the same as SQL query, some conditions may not pushed down to the correct join, then those join will become cross product and is extremely slow.
This patch try to re-order the inner joins (which are common in SQL query), pick the joins that have self-contain conditions first, delay those that does not have conditions.
After this patch, the TPCDS query Q64/65 can run hundreds times faster.
cc marmbrus nongli
Author: Davies Liu <davies@databricks.com>
Closes#10073 from davies/reorder_joins.
When \u appears in a comment block (i.e. in /**/), code gen will break. So, in Expression and CodegenFallback, we escape \u to \\u.
yhuai Please review it. I did reproduce it and it works after the fix. Thanks!
Author: gatorsmile <gatorsmile@gmail.com>
Closes#10155 from gatorsmile/escapeU.
This replaces https://github.com/apache/spark/pull/9696
Invoke Checkstyle and print any errors to the console, failing the step.
Use Google's style rules modified according to
https://cwiki.apache.org/confluence/display/SPARK/Spark+Code+Style+Guide
Some important checks are disabled (see TODOs in `checkstyle.xml`) due to
multiple violations being present in the codebase.
Suggest fixing those TODOs in a separate PR(s).
More on Checkstyle can be found on the [official website](http://checkstyle.sourceforge.net/).
Sample output (from [build 46345](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/46345/consoleFull)) (duplicated because I run the build twice with different profiles):
> Checkstyle checks failed at following occurrences:
[ERROR] src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java:[217,7] (coding) MissingSwitchDefault: switch without "default" clause.
> [ERROR] src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java:[198,10] (modifier) ModifierOrder: 'protected' modifier out of order with the JLS suggestions.
> [ERROR] src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java:[217,7] (coding) MissingSwitchDefault: switch without "default" clause.
> [ERROR] src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java:[198,10] (modifier) ModifierOrder: 'protected' modifier out of order with the JLS suggestions.
> [error] running /home/jenkins/workspace/SparkPullRequestBuilder2/dev/lint-java ; received return code 1
Also fix some of the minor violations that didn't require sweeping changes.
Apologies for the previous botched PRs - I finally figured out the issue.
cr: JoshRosen, pwendell
> I state that the contribution is my original work, and I license the work to the project under the project's open source license.
Author: Dmitry Erastov <derastov@gmail.com>
Closes#9867 from dskrvk/master.
When examining plans of complex queries with multiple joins, a pain point of mine is that, it's hard to immediately see the sibling node of a specific query plan node. This PR adds tree lines for the tree string of a `TreeNode`, so that the result can be visually more intuitive.
Author: Cheng Lian <lian@databricks.com>
Closes#10099 from liancheng/prettier-tree-string.
Following up #10038.
We can use bitmasks to determine which grouping expressions need to be set as nullable.
cc yhuai
Author: Liang-Chi Hsieh <viirya@appier.com>
Closes#10067 from viirya/fix-cube-following.
create java version of `constructorFor` and `extractorFor` in `JavaTypeInference`
Author: Wenchen Fan <wenchen@databricks.com>
This patch had conflicts when merged, resolved by
Committer: Michael Armbrust <michael@databricks.com>
Closes#9937 from cloud-fan/pojo.
When we build the `fromRowExpression` for an encoder, we set up a lot of "unresolved" stuff and lost the required data type, which may lead to runtime error if the real type doesn't match the encoder's schema.
For example, we build an encoder for `case class Data(a: Int, b: String)` and the real type is `[a: int, b: long]`, then we will hit runtime error and say that we can't construct class `Data` with int and long, because we lost the information that `b` should be a string.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#9840 from cloud-fan/err-msg.
JIRA: https://issues.apache.org/jira/browse/SPARK-11949
The result of cube plan uses incorrect schema. The schema of cube result should set nullable property to true because the grouping expressions will have null values.
Author: Liang-Chi Hsieh <viirya@appier.com>
Closes#10038 from viirya/fix-cube.
JIRA: https://issues.apache.org/jira/browse/SPARK-12018
The code of common subexpression elimination can be factored and simplified. Some unnecessary variables can be removed.
Author: Liang-Chi Hsieh <viirya@appier.com>
Closes#10009 from viirya/refactor-subexpr-eliminate.
In https://github.com/apache/spark/pull/9409 we enabled multi-column counting. The approach taken in that PR introduces a bit of overhead by first creating a row only to check if all of the columns are non-null.
This PR fixes that technical debt. Count now takes multiple columns as its input. In order to make this work I have also added support for multiple columns in the single distinct code path.
cc yhuai
Author: Herman van Hovell <hvanhovell@questtec.nl>
Closes#10015 from hvanhovell/SPARK-12024.
When calling `get_json_object` for the following two cases, both results are `"null"`:
```scala
val tuple: Seq[(String, String)] = ("5", """{"f1": null}""") :: Nil
val df: DataFrame = tuple.toDF("key", "jstring")
val res = df.select(functions.get_json_object($"jstring", "$.f1")).collect()
```
```scala
val tuple2: Seq[(String, String)] = ("5", """{"f1": "null"}""") :: Nil
val df2: DataFrame = tuple2.toDF("key", "jstring")
val res3 = df2.select(functions.get_json_object($"jstring", "$.f1")).collect()
```
Fixed the problem and also added a test case.
Author: gatorsmile <gatorsmile@gmail.com>
Closes#10018 from gatorsmile/get_json_object.
This is a followup for https://github.com/apache/spark/pull/9959.
I added more documentation and rewrote some monadic code into simpler ifs.
Author: Reynold Xin <rxin@databricks.com>
Closes#9995 from rxin/SPARK-11973.
this is based on https://github.com/apache/spark/pull/9844, with some bug fix and clean up.
The problems is that, normal operator should be resolved based on its child, but `Sort` operator can also be resolved based on its grandchild. So we have 3 rules that can resolve `Sort`: `ResolveReferences`, `ResolveSortReferences`(if grandchild is `Project`) and `ResolveAggregateFunctions`(if grandchild is `Aggregate`).
For example, `select c1 as a , c2 as b from tab group by c1, c2 order by a, c2`, we need to resolve `a` and `c2` for `Sort`. Firstly `a` will be resolved in `ResolveReferences` based on its child, and when we reach `ResolveAggregateFunctions`, we will try to resolve both `a` and `c2` based on its grandchild, but failed because `a` is not a legal aggregate expression.
whoever merge this PR, please give the credit to dilipbiswal
Author: Dilip Biswal <dbiswal@us.ibm.com>
Author: Wenchen Fan <wenchen@databricks.com>
Closes#9961 from cloud-fan/sort.
Currently, filter can't be pushed through aggregation with alias or literals, this patch fix that.
After this patch, the time of TPC-DS query 4 go down to 13 seconds from 141 seconds (10x improvements).
cc nongli yhuai
Author: Davies Liu <davies@databricks.com>
Closes#9959 from davies/push_filter2.
Right now, the expended start will include the name of expression as prefix for column, that's not better than without expending, we should not have the prefix.
Author: Davies Liu <davies@databricks.com>
Closes#9984 from davies/expand_star.
Currently pivot's signature looks like
```scala
scala.annotation.varargs
def pivot(pivotColumn: Column, values: Column*): GroupedData
scala.annotation.varargs
def pivot(pivotColumn: String, values: Any*): GroupedData
```
I think we can remove the one that takes "Column" types, since callers should always be passing in literals. It'd also be more clear if the values are not varargs, but rather Seq or java.util.List.
I also made similar changes for Python.
Author: Reynold Xin <rxin@databricks.com>
Closes#9929 from rxin/SPARK-11946.
we should pass in resolved encodera to logical `CoGroup` and bind them in physical `CoGroup`
Author: Wenchen Fan <wenchen@databricks.com>
Closes#9928 from cloud-fan/cogroup.
We should use `InternalRow.isNullAt` to check if the field is null before calling `InternalRow.getXXX`
Thanks gatorsmile who discovered this bug.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#9904 from cloud-fan/null.
Can someone review my code to make sure I'm not missing anything? Thanks!
Author: Xiu Guo <xguo27@gmail.com>
Author: Xiu Guo <guoxi@us.ibm.com>
Closes#9612 from xguo27/SPARK-11628.
1. Renamed map to mapGroup, flatMap to flatMapGroup.
2. Renamed asKey -> keyAs.
3. Added more documentation.
4. Changed type parameter T to V on GroupedDataset.
5. Added since versions for all functions.
Author: Reynold Xin <rxin@databricks.com>
Closes#9880 from rxin/SPARK-11899.
seems scala 2.11 doesn't support: define private methods in `trait xxx` and use it in `object xxx extend xxx`.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#9879 from cloud-fan/follow.
This mainly moves SqlNewHadoopRDD to the sql package. There is some state that is
shared between core and I've left that in core. This allows some other associated
minor cleanup.
Author: Nong Li <nong@databricks.com>
Closes#9845 from nongli/spark-11787.
#theScaryParts (i.e. changes to the repl, executor classloaders and codegen)...
Author: Michael Armbrust <michael@databricks.com>
Author: Yin Huai <yhuai@databricks.com>
Closes#9825 from marmbrus/dataset-replClasses2.
Hive has since changed this behavior as well. https://issues.apache.org/jira/browse/HIVE-3454
Author: Nong Li <nong@databricks.com>
Author: Nong Li <nongli@gmail.com>
Author: Yin Huai <yhuai@databricks.com>
Closes#9685 from nongli/spark-11724.
before this PR, when users try to get an encoder for an un-supported class, they will only get a very simple error message like `Encoder for type xxx is not supported`.
After this PR, the error message become more friendly, for example:
```
No Encoder found for abc.xyz.NonEncodable
- array element class: "abc.xyz.NonEncodable"
- field (class: "scala.Array", name: "arrayField")
- root class: "abc.xyz.AnotherClass"
```
Author: Wenchen Fan <wenchen@databricks.com>
Closes#9810 from cloud-fan/error-message.
JIRA: https://issues.apache.org/jira/browse/SPARK-11817
Instead of return None, we should truncate the fractional seconds to prevent inserting NULL.
Author: Liang-Chi Hsieh <viirya@appier.com>
Closes#9834 from viirya/truncate-fractional-sec.
This PR has the following optimization:
1) The greatest/least already does the null-check, so the `If` and `IsNull` are not necessary.
2) In greatest/least, it should initialize the result using the first child (removing one block).
3) For primitive types, the generated greater expression is too complicated (`a > b ? 1 : (a < b) ? -1 : 0) > 0`), should be as simple as `a > b`
Combine these optimization, this could improve the performance of `ss_max` query by 30%.
Author: Davies Liu <davies@databricks.com>
Closes#9846 from davies/improve_max.
Fixes bug with grouping sets (including cube/rollup) where aggregates that included grouping expressions would return the wrong (null) result.
Also simplifies the analyzer rule a bit and leaves column pruning to the optimizer.
Added multiple unit tests to DataFrameAggregateSuite and verified it passes hive compatibility suite:
```
build/sbt -Phive -Dspark.hive.whitelist='groupby.*_grouping.*' 'test-only org.apache.spark.sql.hive.execution.HiveCompatibilitySuite'
```
This is an alternative to pr https://github.com/apache/spark/pull/9419 but I think its better as it simplifies the analyzer rule instead of adding another special case to it.
Author: Andrew Ray <ray.andrew@gmail.com>
Closes#9815 from aray/groupingset-agg-fix.
After some experiment, I found it's not convenient to have separate encoder builders: `FlatEncoder` and `ProductEncoder`. For example, when create encoders for `ScalaUDF`, we have no idea if the type `T` is flat or not. So I revert the splitting change in https://github.com/apache/spark/pull/9693, while still keeping the bug fixes and tests.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#9726 from cloud-fan/follow.
The impact of this change is for a query that has a single distinct column and does not have any grouping expression like
`SELECT COUNT(DISTINCT a) FROM table`
The plan will be changed from
```
AGG-2 (count distinct)
Shuffle to a single reducer
Partial-AGG-2 (count distinct)
AGG-1 (grouping on a)
Shuffle by a
Partial-AGG-1 (grouping on 1)
```
to the following one (1.5 uses this)
```
AGG-2
AGG-1 (grouping on a)
Shuffle to a single reducer
Partial-AGG-1(grouping on a)
```
The first plan is more robust. However, to better benchmark the impact of this change, we should use 1.5's plan and use the conf of `spark.sql.specializeSingleDistinctAggPlanning` to control the plan.
Author: Yin Huai <yhuai@databricks.com>
Closes#9828 from yhuai/distinctRewriter.
We currently rely on the optimizer's constant folding to replace current_timestamp and current_date. However, this can still result in different values for different instances of current_timestamp/current_date if the optimizer is not running fast enough.
A better solution is to replace these functions in the analyzer in one shot.
Author: Reynold Xin <rxin@databricks.com>
Closes#9833 from rxin/SPARK-11849.
This patch adds an alternate to the Parquet RecordReader from the parquet-mr project
that is much faster for flat schemas. Instead of using the general converter mechanism
from parquet-mr, this directly uses the lower level APIs from parquet-columnar and a
customer RecordReader that directly assembles into UnsafeRows.
This is optionally disabled and only used for supported schemas.
Using the tpcds store sales table and doing a sum of increasingly more columns, the results
are:
For 1 Column:
Before: 11.3M rows/second
After: 18.2M rows/second
For 2 Columns:
Before: 7.2M rows/second
After: 11.2M rows/second
For 5 Columns:
Before: 2.9M rows/second
After: 4.5M rows/second
Author: Nong Li <nong@databricks.com>
Closes#9774 from nongli/parquet.
Also added some nicer error messages for incompatible types (private types and primitive types) for Kryo/Java encoder.
Author: Reynold Xin <rxin@databricks.com>
Closes#9823 from rxin/SPARK-11833.
Before this PR there were two things that would blow up if you called `df.as[MyClass]` if `MyClass` was defined in the REPL:
- [x] Because `classForName` doesn't work on the munged names returned by `tpe.erasure.typeSymbol.asClass.fullName`
- [x] Because we don't have anything to pass into the constructor for the `$outer` pointer.
Note that this PR is just adding the infrastructure for working with inner classes in encoder and is not yet sufficient to make them work in the REPL. Currently, the implementation show in 95cec7d413 is causing a bug that breaks code gen due to some interaction between janino and the `ExecutorClassLoader`. This will be addressed in a follow-up PR.
Author: Michael Armbrust <michael@databricks.com>
Closes#9602 from marmbrus/dataset-replClasses.
This patch refactors the existing Kryo encoder expressions and adds support for Java serialization.
Author: Reynold Xin <rxin@databricks.com>
Closes#9802 from rxin/SPARK-11810.
return Double.NaN for mean/average when count == 0 for all numeric types that is converted to Double, Decimal type continue to return null.
Author: JihongMa <linlin200605@gmail.com>
Closes#9705 from JihongMA/SPARK-11720.
If user use primitive parameters in UDF, there is no way for him to do the null-check for primitive inputs, so we are assuming the primitive input is null-propagatable for this case and return null if the input is null.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#9770 from cloud-fan/udf.
I also found a bug with self-joins returning incorrect results in the Dataset API. Two test cases attached and filed SPARK-11803.
Author: Reynold Xin <rxin@databricks.com>
Closes#9789 from rxin/SPARK-11802.
Based on the comment of cloud-fan in https://github.com/apache/spark/pull/9216, update the AttributeReference's hashCode function by including the hashCode of the other attributes including name, nullable and qualifiers.
Here, I am not 100% sure if we should include name in the hashCode calculation, since the original hashCode calculation does not include it.
marmbrus cloud-fan Please review if the changes are good.
Author: gatorsmile <gatorsmile@gmail.com>
Closes#9761 from gatorsmile/hashCodeNamedExpression.
In the previous method, fields.toArray will cast java.util.List[StructField] into Array[Object] which can not cast into Array[StructField], thus when invoking this method will throw "java.lang.ClassCastException: [Ljava.lang.Object; cannot be cast to [Lorg.apache.spark.sql.types.StructField;"
I directly cast java.util.List[StructField] into Array[StructField] in this patch.
Author: mayuanwen <mayuanwen@qiyi.com>
Closes#9649 from jackieMaKing/Spark-11679.
During executing PromoteStrings rule, if one side of binaryComparison is StringType and the other side is not StringType, the current code will promote(cast) the StringType to DoubleType, and if the StringType doesn't contain the numbers, it will get null value. So if it is doing <=> (NULL-safe equal) with Null, it will not filter anything, caused the problem reported by this jira.
I proposal to the changes through this PR, can you review my code changes ?
This problem only happen for <=>, other operators works fine.
scala> val filteredDF = df.filter(df("column") > (new Column(Literal(null))))
filteredDF: org.apache.spark.sql.DataFrame = [column: string]
scala> filteredDF.show
+------+
|column|
+------+
+------+
scala> val filteredDF = df.filter(df("column") === (new Column(Literal(null))))
filteredDF: org.apache.spark.sql.DataFrame = [column: string]
scala> filteredDF.show
+------+
|column|
+------+
+------+
scala> df.registerTempTable("DF")
scala> sqlContext.sql("select * from DF where 'column' = NULL")
res27: org.apache.spark.sql.DataFrame = [column: string]
scala> res27.show
+------+
|column|
+------+
+------+
Author: Kevin Yu <qyu@us.ibm.com>
Closes#9720 from kevinyu98/working_on_spark-11447.
This patch adds an alias for current_timestamp (now function).
Also fixes SPARK-9196 to re-enable the test case for current_timestamp.
Author: Reynold Xin <rxin@databricks.com>
Closes#9753 from rxin/SPARK-11768.
This fix is to change the equals method to check all of the specified fields for equality of AttributeReference.
Author: gatorsmile <gatorsmile@gmail.com>
Closes#9216 from gatorsmile/namedExpressEqual.
Invocation of getters for type extending AnyVal returns default value (if field value is null) instead of throwing NPE. Please check comments for SPARK-11553 issue for more details.
Author: Bartlomiej Alberski <bartlomiej.alberski@allegrogroup.com>
Closes#9642 from alberskib/bugfix/SPARK-11553.
These 2 are very similar, we can consolidate them into one.
Also add tests for it and fix a bug.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#9729 from cloud-fan/tuple.
JIRA: https://issues.apache.org/jira/browse/SPARK-11743
RowEncoder doesn't support UserDefinedType now. We should add the support for it.
Author: Liang-Chi Hsieh <viirya@appier.com>
Closes#9712 from viirya/rowencoder-udt.
code snippet to reproduce it:
```
TimeZone.setDefault(TimeZone.getTimeZone("Asia/Shanghai"))
val t = Timestamp.valueOf("1900-06-11 12:14:50.789")
val us = fromJavaTimestamp(t)
assert(getSeconds(us) === t.getSeconds)
```
it will be good to add a regression test for it, but the reproducing code need to change the default timezone, and even we change it back, the `lazy val defaultTimeZone` in `DataTimeUtils` is fixed.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#9728 from cloud-fan/seconds.
also add more tests for encoders, and fix bugs that I found:
* when convert array to catalyst array, we can only skip element conversion for native types(e.g. int, long, boolean), not `AtomicType`(String is AtomicType but we need to convert it)
* we should also handle scala `BigDecimal` when convert from catalyst `Decimal`.
* complex map type should be supported
other issues that still in investigation:
* encode java `BigDecimal` and decode it back, seems we will loss precision info.
* when encode case class that defined inside a object, `ClassNotFound` exception will be thrown.
I'll remove unused code in a follow-up PR.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#9693 from cloud-fan/split.
* rename `AppendColumn` to `AppendColumns` to be consistent with the physical plan name.
* clean up stale comments.
* always pass in resolved encoder to `TypedColumn.withInputType`(test added)
* enable a mistakenly disabled java test.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#9688 from cloud-fan/follow.
This PR adds a new method, `reduce`, to `GroupedDataset`, which allows similar operations to `reduceByKey` on a traditional `PairRDD`.
```scala
val ds = Seq("abc", "xyz", "hello").toDS()
ds.groupBy(_.length).reduce(_ + _).collect() // not actually commutative :P
res0: Array(3 -> "abcxyz", 5 -> "hello")
```
While implementing this method and its test cases several more deficiencies were found in our encoder handling. Specifically, in order to support positional resolution, named resolution and tuple composition, it is important to keep the unresolved encoder around and to use it when constructing new `Datasets` with the same object type but different output attributes. We now divide the encoder lifecycle into three phases (that mirror the lifecycle of standard expressions) and have checks at various boundaries:
- Unresoved Encoders: all users facing encoders (those constructed by implicits, static methods, or tuple composition) are unresolved, meaning they have only `UnresolvedAttributes` for named fields and `BoundReferences` for fields accessed by ordinal.
- Resolved Encoders: internal to a `[Grouped]Dataset` the encoder is resolved, meaning all input has been resolved to a specific `AttributeReference`. Any encoders that are placed into a logical plan for use in object construction should be resolved.
- BoundEncoder: Are constructed by physical plans, right before actual conversion from row -> object is performed.
It is left to future work to add explicit checks for resolution and provide good error messages when it fails. We might also consider enforcing the above constraints in the type system (i.e. `fromRow` only exists on a `ResolvedEncoder`), but we should probably wait before spending too much time on this.
Author: Michael Armbrust <michael@databricks.com>
Author: Wenchen Fan <wenchen@databricks.com>
Closes#9673 from marmbrus/pr/9628.
switched stddev support from DeclarativeAggregate to ImperativeAggregate.
Author: JihongMa <linlin200605@gmail.com>
Closes#9380 from JihongMA/SPARK-11420.
`to_unix_timestamp` is the deterministic version of `unix_timestamp`, as it accepts at least one parameters.
Since the behavior here is quite similar to `unix_timestamp`, I think the dataframe API is not necessary here.
Author: Daoyuan Wang <daoyuan.wang@intel.com>
Closes#9347 from adrian-wang/to_unix_timestamp.
This adds a pivot method to the dataframe api.
Following the lead of cube and rollup this adds a Pivot operator that is translated into an Aggregate by the analyzer.
Currently the syntax is like:
~~courseSales.pivot(Seq($"year"), $"course", Seq("dotNET", "Java"), sum($"earnings"))~~
~~Would we be interested in the following syntax also/alternatively? and~~
courseSales.groupBy($"year").pivot($"course", "dotNET", "Java").agg(sum($"earnings"))
//or
courseSales.groupBy($"year").pivot($"course").agg(sum($"earnings"))
Later we can add it to `SQLParser`, but as Hive doesn't support it we cant add it there, right?
~~Also what would be the suggested Java friendly method signature for this?~~
Author: Andrew Ray <ray.andrew@gmail.com>
Closes#7841 from aray/sql-pivot.
We need to support custom classes like java beans and combine them into tuple, and it's very hard to do it with the TypeTag-based approach.
We should keep only the compose-based way to create tuple encoder.
This PR also move `Encoder` to `org.apache.spark.sql`
Author: Wenchen Fan <wenchen@databricks.com>
Closes#9567 from cloud-fan/java.
This PR is a 2nd follow-up for [SPARK-9241](https://issues.apache.org/jira/browse/SPARK-9241). It contains the following improvements:
* Fix for a potential bug in distinct child expression and attribute alignment.
* Improved handling of duplicate distinct child expressions.
* Added test for distinct UDAF with multiple children.
cc yhuai
Author: Herman van Hovell <hvanhovell@questtec.nl>
Closes#9566 from hvanhovell/SPARK-9241-followup-2.
This patch adds the building blocks for codegening subexpr elimination and implements
it end to end for UnsafeProjection. The building blocks can be used to do the same thing
for other operators.
It introduces some utilities to compute common sub expressions. Expressions can be added to
this data structure. The expr and its children will be recursively matched against existing
expressions (ones previously added) and grouped into common groups. This is built using
the existing `semanticEquals`. It does not understand things like commutative or associative
expressions. This can be done as future work.
After building this data structure, the codegen process takes advantage of it by:
1. Generating a helper function in the generated class that computes the common
subexpression. This is done for all common subexpressions that have at least
two occurrences and the expression tree is sufficiently complex.
2. When generating the apply() function, if the helper function exists, call that
instead of regenerating the expression tree. Repeated calls to the helper function
shortcircuit the evaluation logic.
Author: Nong Li <nong@databricks.com>
Author: Nong Li <nongli@gmail.com>
This patch had conflicts when merged, resolved by
Committer: Michael Armbrust <michael@databricks.com>
Closes#9480 from nongli/spark-10371.
Currently the user facing api for typed aggregation has some limitations:
* the customized typed aggregation must be the first of aggregation list
* the customized typed aggregation can only use long as buffer type
* the customized typed aggregation can only use flat type as result type
This PR tries to remove these limitations.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#9599 from cloud-fan/agg.
https://issues.apache.org/jira/browse/SPARK-9830
This PR contains the following main changes.
* Removing `AggregateExpression1`.
* Removing `Aggregate` operator, which is used to evaluate `AggregateExpression1`.
* Removing planner rule used to plan `Aggregate`.
* Linking `MultipleDistinctRewriter` to analyzer.
* Renaming `AggregateExpression2` to `AggregateExpression` and `AggregateFunction2` to `AggregateFunction`.
* Updating places where we create aggregate expression. The way to create aggregate expressions is `AggregateExpression(aggregateFunction, mode, isDistinct)`.
* Changing `val`s in `DeclarativeAggregate`s that touch children of this function to `lazy val`s (when we create aggregate expression in DataFrame API, children of an aggregate function can be unresolved).
Author: Yin Huai <yhuai@databricks.com>
Closes#9556 from yhuai/removeAgg1.
A few changes:
1. Removed fold, since it can be confusing for distributed collections.
2. Created specific interfaces for each Dataset function (e.g. MapFunction, ReduceFunction, MapPartitionsFunction)
3. Added more documentation and test cases.
The other thing I'm considering doing is to have a "collector" interface for FlatMapFunction and MapPartitionsFunction, similar to MapReduce's map function.
Author: Reynold Xin <rxin@databricks.com>
Closes#9531 from rxin/SPARK-11564.
This PR adds support for multiple column in a single count distinct aggregate to the new aggregation path.
cc yhuai
Author: Herman van Hovell <hvanhovell@questtec.nl>
Closes#9409 from hvanhovell/SPARK-11451.
This PR is a follow up for PR https://github.com/apache/spark/pull/9406. It adds more documentation to the rewriting rule, removes a redundant if expression in the non-distinct aggregation path and adds a multiple distinct test to the AggregationQuerySuite.
cc yhuai marmbrus
Author: Herman van Hovell <hvanhovell@questtec.nl>
Closes#9541 from hvanhovell/SPARK-9241-followup.
The second PR for SPARK-9241, this adds support for multiple distinct columns to the new aggregation code path.
This PR solves the multiple DISTINCT column problem by rewriting these Aggregates into an Expand-Aggregate-Aggregate combination. See the [JIRA ticket](https://issues.apache.org/jira/browse/SPARK-9241) for some information on this. The advantages over the - competing - [first PR](https://github.com/apache/spark/pull/9280) are:
- This can use the faster TungstenAggregate code path.
- It is impossible to OOM due to an ```OpenHashSet``` allocating to much memory. However, this will multiply the number of input rows by the number of distinct clauses (plus one), and puts a lot more memory pressure on the aggregation code path itself.
The location of this Rule is a bit funny, and should probably change when the old aggregation path is changed.
cc yhuai - Could you also tell me where to add tests for this?
Author: Herman van Hovell <hvanhovell@questtec.nl>
Closes#9406 from hvanhovell/SPARK-9241-rewriter.
This PR enables the Expand operator to process and produce Unsafe Rows.
Author: Herman van Hovell <hvanhovell@questtec.nl>
Closes#9414 from hvanhovell/SPARK-11450.
JIRA: https://issues.apache.org/jira/browse/SPARK-9162
Currently ScalaUDF extends CodegenFallback and doesn't provide code generation implementation. This path implements code generation for ScalaUDF.
Author: Liang-Chi Hsieh <viirya@appier.com>
Closes#9270 from viirya/scalaudf-codegen.
A cleanup for https://github.com/apache/spark/pull/9085.
The `DecimalLit` is very similar to `FloatLit`, we can just keep one of them.
Also added low level unit test at `SqlParserSuite`
Author: Wenchen Fan <wenchen@databricks.com>
Closes#9482 from cloud-fan/parser.
This PR adds the ability to do typed SQL aggregations. We will likely also want to provide an interface to allow users to do aggregations on objects, but this is deferred to another PR.
```scala
val ds = Seq(("a", 10), ("a", 20), ("b", 1), ("b", 2), ("c", 1)).toDS()
ds.groupBy(_._1).agg(sum("_2").as[Int]).collect()
res0: Array(("a", 30), ("b", 3), ("c", 1))
```
Author: Michael Armbrust <michael@databricks.com>
Closes#9499 from marmbrus/dataset-agg.
Currently, if the Timestamp is before epoch (1970/01/01), the hours, minutes and seconds will be negative (also rounding up).
Author: Davies Liu <davies@databricks.com>
Closes#9502 from davies/neg_hour.
functions.scala was getting pretty long. I broke it into multiple files.
I also added explicit data types for some public vals, and renamed aggregate function pretty names to lower case, which is more consistent with rest of the functions.
Author: Reynold Xin <rxin@databricks.com>
Closes#9471 from rxin/SPARK-11505.
stddev is an alias for stddev_samp. variance should be consistent with stddev.
Also took the chance to remove internal Stddev and Variance, and only kept StddevSamp/StddevPop and VarianceSamp/VariancePop.
Author: Reynold Xin <rxin@databricks.com>
Closes#9449 from rxin/SPARK-11490.
Right now, SQL's mutable projection updates every value of the mutable project after it evaluates the corresponding expression. This makes the behavior of MutableProjection confusing and complicate the implementation of common aggregate functions like stddev because developers need to be aware that when evaluating {{i+1}}th expression of a mutable projection, {{i}}th slot of the mutable row has already been updated.
This PR make the MutableProjection atomic, by generating all the results of expressions first, then copy them into mutableRow.
Had run a mircro-benchmark, there is no notable performance difference between using class members and local variables.
cc yhuai
Author: Davies Liu <davies@databricks.com>
Closes#9422 from davies/atomic_mutable and squashes the following commits:
bbc1758 [Davies Liu] support wide table
8a0ae14 [Davies Liu] fix bug
bec07da [Davies Liu] refactor
2891628 [Davies Liu] make mutableProjection atomic
Hive GenericUDTF#initialize() defines field names in a returned schema though,
the current HiveGenericUDTF drops these names.
We might need to reflect these in a logical plan tree.
Author: navis.ryu <navis@apache.org>
Closes#8456 from navis/SPARK-9034.
1. Supporting expanding structs in Projections. i.e.
"SELECT s.*" where s is a struct type.
This is fixed by allowing the expand function to handle structs in addition to tables.
2. Supporting expanding * inside aggregate functions of structs.
"SELECT max(struct(col1, structCol.*))"
This requires recursively expanding the expressions. In this case, it it the aggregate
expression "max(...)" and we need to recursively expand its children inputs.
Author: Nong Li <nongli@gmail.com>
Closes#9343 from nongli/spark-11329.
From Reynold in the thread 'Exception when using some aggregate operators' (http://search-hadoop.com/m/q3RTt0xFr22nXB4/):
I don't think these are bugs. The SQL standard for average is "avg", not "mean". Similarly, a distinct count is supposed to be written as "count(distinct col)", not "countDistinct(col)".
We can, however, make "mean" an alias for "avg" to improve compatibility between DataFrame and SQL.
Author: tedyu <yuzhihong@gmail.com>
Closes#9332 from ted-yu/master.
JIRA: https://issues.apache.org/jira/browse/SPARK-9298
This patch adds pearson correlation aggregation function based on `AggregateExpression2`.
Author: Liang-Chi Hsieh <viirya@appier.com>
Closes#8587 from viirya/corr_aggregation.
DISTRIBUTE BY allows the user to hash partition the data by specified exprs. It also allows for
optioning sorting within each resulting partition. There is no required relationship between the
exprs for partitioning and sorting (i.e. one does not need to be a prefix of the other).
This patch adds to APIs to DataFrames which can be used together to provide this functionality:
1. distributeBy() which partitions the data frame into a specified number of partitions using the
partitioning exprs.
2. localSort() which sorts each partition using the provided sorting exprs.
To get the DISTRIBUTE BY functionality, the user simply does: df.distributeBy(...).localSort(...)
Author: Nong Li <nongli@gmail.com>
Closes#9364 from nongli/spark-11410.
Add a rule in optimizer to convert NULL [NOT] IN (expr1,...,expr2) to
Literal(null).
This is a follow up defect to SPARK-8654
cloud-fan Can you please take a look ?
Author: Dilip Biswal <dbiswal@us.ibm.com>
Closes#9348 from dilipbiswal/spark_11024.
Older version of Janino (>2.7) does not support Override, we should not use that in codegen.
Author: Davies Liu <davies@databricks.com>
Closes#9372 from davies/no_override.
This PR introduce a mechanism to call spill() on those SQL operators that support spilling (for example, BytesToBytesMap, UnsafeExternalSorter and ShuffleExternalSorter) if there is not enough memory for execution. The preserved first page is needed anymore, so removed.
Other Spillable objects in Spark core (ExternalSorter and AppendOnlyMap) are not included in this PR, but those could benefit from this (trigger others' spilling).
The PrepareRDD may be not needed anymore, could be removed in follow up PR.
The following script will fail with OOM before this PR, finished in 150 seconds with 2G heap (also works in 1.5 branch, with similar duration).
```python
sqlContext.setConf("spark.sql.shuffle.partitions", "1")
df = sqlContext.range(1<<25).selectExpr("id", "repeat(id, 2) as s")
df2 = df.select(df.id.alias('id2'), df.s.alias('s2'))
j = df.join(df2, df.id==df2.id2).groupBy(df.id).max("id", "id2")
j.explain()
print j.count()
```
For thread-safety, here what I'm got:
1) Without calling spill(), the operators should only be used by single thread, no safety problems.
2) spill() could be triggered in two cases, triggered by itself, or by other operators. we can check trigger == this in spill(), so it's still in the same thread, so safety problems.
3) if it's triggered by other operators (right now cache will not trigger spill()), we only spill the data into disk when it's in scanning stage (building is finished), so the in-memory sorter or memory pages are read-only, we only need to synchronize the iterator and change it.
4) During scanning, the iterator will only use one record in one page, we can't free this page, because the downstream is currently using it (used by UnsafeRow or other objects). In BytesToBytesMap, we just skip the current page, and dump all others into disk. In UnsafeExternalSorter, we keep the page that is used by current record (having the same baseObject), free it when loading the next record. In ShuffleExternalSorter, the spill() will not trigger during scanning.
5) In order to avoid deadlock, we didn't call acquireMemory during spill (so we reused the pointer array in InMemorySorter).
Author: Davies Liu <davies@databricks.com>
Closes#9241 from davies/force_spill.
This is minor, but I ran into while writing Datasets and while it wasn't needed for the final solution, it was super confusing so we should fix it.
Basically we recurse into `Seq` to see if they have children. This breaks because we don't preserve the original subclass of `Seq` (and `StructType <:< Seq[StructField]`). Since a struct can never contain children, lets just not recurse into it.
Author: Michael Armbrust <michael@databricks.com>
Closes#9334 from marmbrus/structMakeCopy.
This PR adds a new operation `joinWith` to a `Dataset`, which returns a `Tuple` for each pair where a given `condition` evaluates to true.
```scala
case class ClassData(a: String, b: Int)
val ds1 = Seq(ClassData("a", 1), ClassData("b", 2)).toDS()
val ds2 = Seq(("a", 1), ("b", 2)).toDS()
> ds1.joinWith(ds2, $"_1" === $"a").collect()
res0: Array((ClassData("a", 1), ("a", 1)), (ClassData("b", 2), ("b", 2)))
```
This operation is similar to the relation `join` function with one important difference in the result schema. Since `joinWith` preserves objects present on either side of the join, the result schema is similarly nested into a tuple under the column names `_1` and `_2`.
This type of join can be useful both for preserving type-safety with the original object types as well as working with relational data where either side of the join has column names in common.
## Required Changes to Encoders
In the process of working on this patch, several deficiencies to the way that we were handling encoders were discovered. Specifically, it turned out to be very difficult to `rebind` the non-expression based encoders to extract the nested objects from the results of joins (and also typed selects that return tuples).
As a result the following changes were made.
- `ClassEncoder` has been renamed to `ExpressionEncoder` and has been improved to also handle primitive types. Additionally, it is now possible to take arbitrary expression encoders and rewrite them into a single encoder that returns a tuple.
- All internal operations on `Dataset`s now require an `ExpressionEncoder`. If the users tries to pass a non-`ExpressionEncoder` in, an error will be thrown. We can relax this requirement in the future by constructing a wrapper class that uses expressions to project the row to the expected schema, shielding the users code from the required remapping. This will give us a nice balance where we don't force user encoders to understand attribute references and binding, but still allow our native encoder to leverage runtime code generation to construct specific encoders for a given schema that avoid an extra remapping step.
- Additionally, the semantics for different types of objects are now better defined. As stated in the `ExpressionEncoder` scaladoc:
- Classes will have their sub fields extracted by name using `UnresolvedAttribute` expressions
and `UnresolvedExtractValue` expressions.
- Tuples will have their subfields extracted by position using `BoundReference` expressions.
- Primitives will have their values extracted from the first ordinal with a schema that defaults
to the name `value`.
- Finally, the binding lifecycle for `Encoders` has now been unified across the codebase. Encoders are now `resolved` to the appropriate schema in the constructor of `Dataset`. This process replaces an unresolved expressions with concrete `AttributeReference` expressions. Binding then happens on demand, when an encoder is going to be used to construct an object. This closely mirrors the lifecycle for standard expressions when executing normal SQL or `DataFrame` queries.
Author: Michael Armbrust <michael@databricks.com>
Closes#9300 from marmbrus/datasets-tuples.
When sampling and then filtering DataFrame, the SQL Optimizer will push down filter into sample and produce wrong result. This is due to the sampler is calculated based on the original scope rather than the scope after filtering.
Author: Yanbo Liang <ybliang8@gmail.com>
Closes#9294 from yanboliang/spark-11303.
I'm new to spark. I was trying out the sort_array function then hit this exception. I looked into the spark source code. I found the root cause is that sort_array does not check for an array of NULLs. It's not meaningful to sort an array of entirely NULLs anyway.
I'm adding a check on the input array type to SortArray. If the array consists of NULLs entirely, there is no need to sort such array. I have also added a test case for this.
Please help to review my fix. Thanks!
Author: Jia Li <jiali@us.ibm.com>
Closes#9247 from jliwork/SPARK-11277.
This patch refactors the MemoryManager class structure. After #9000, Spark had the following classes:
- MemoryManager
- StaticMemoryManager
- ExecutorMemoryManager
- TaskMemoryManager
- ShuffleMemoryManager
This is fairly confusing. To simplify things, this patch consolidates several of these classes:
- ShuffleMemoryManager and ExecutorMemoryManager were merged into MemoryManager.
- TaskMemoryManager is moved into Spark Core.
**Key changes and tasks**:
- [x] Merge ExecutorMemoryManager into MemoryManager.
- [x] Move pooling logic into Allocator.
- [x] Move TaskMemoryManager from `spark-unsafe` to `spark-core`.
- [x] Refactor the existing Tungsten TaskMemoryManager interactions so Tungsten code use only this and not both this and ShuffleMemoryManager.
- [x] Refactor non-Tungsten code to use the TaskMemoryManager instead of ShuffleMemoryManager.
- [x] Merge ShuffleMemoryManager into MemoryManager.
- [x] Move code
- [x] ~~Simplify 1/n calculation.~~ **Will defer to followup, since this needs more work.**
- [x] Port ShuffleMemoryManagerSuite tests.
- [x] Move classes from `unsafe` package to `memory` package.
- [ ] Figure out how to handle the hacky use of the memory managers in HashedRelation's broadcast variable construction.
- [x] Test porting and cleanup: several tests relied on mock functionality (such as `TestShuffleMemoryManager.markAsOutOfMemory`) which has been changed or broken during the memory manager consolidation
- [x] AbstractBytesToBytesMapSuite
- [x] UnsafeExternalSorterSuite
- [x] UnsafeFixedWidthAggregationMapSuite
- [x] UnsafeKVExternalSorterSuite
**Compatiblity notes**:
- This patch introduces breaking changes in `ExternalAppendOnlyMap`, which is marked as `DevloperAPI` (likely for legacy reasons): this class now cannot be used outside of a task.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#9127 from JoshRosen/SPARK-10984.
marmbrus rxin I believe these typecasts are not required in the presence of explicit return types.
Author: Alexander Slesarenko <avslesarenko@gmail.com>
Closes#9262 from aslesarenko/remove-typecasts.
For nested StructType, the underline buffer could be used for others before, we should zero out the padding bytes for those primitive types that have less than 8 bytes.
cc cloud-fan
Author: Davies Liu <davies@databricks.com>
Closes#9217 from davies/zero_out.
*This PR adds a new experimental API to Spark, tentitively named Datasets.*
A `Dataset` is a strongly-typed collection of objects that can be transformed in parallel using functional or relational operations. Example usage is as follows:
### Functional
```scala
> val ds: Dataset[Int] = Seq(1, 2, 3).toDS()
> ds.filter(_ % 1 == 0).collect()
res1: Array[Int] = Array(1, 2, 3)
```
### Relational
```scala
scala> ds.toDF().show()
+-----+
|value|
+-----+
| 1|
| 2|
| 3|
+-----+
> ds.select(expr("value + 1").as[Int]).collect()
res11: Array[Int] = Array(2, 3, 4)
```
## Comparison to RDDs
A `Dataset` differs from an `RDD` in the following ways:
- The creation of a `Dataset` requires the presence of an explicit `Encoder` that can be
used to serialize the object into a binary format. Encoders are also capable of mapping the
schema of a given object to the Spark SQL type system. In contrast, RDDs rely on runtime
reflection based serialization.
- Internally, a `Dataset` is represented by a Catalyst logical plan and the data is stored
in the encoded form. This representation allows for additional logical operations and
enables many operations (sorting, shuffling, etc.) to be performed without deserializing to
an object.
A `Dataset` can be converted to an `RDD` by calling the `.rdd` method.
## Comparison to DataFrames
A `Dataset` can be thought of as a specialized DataFrame, where the elements map to a specific
JVM object type, instead of to a generic `Row` container. A DataFrame can be transformed into
specific Dataset by calling `df.as[ElementType]`. Similarly you can transform a strongly-typed
`Dataset` to a generic DataFrame by calling `ds.toDF()`.
## Implementation Status and TODOs
This is a rough cut at the least controversial parts of the API. The primary purpose here is to get something committed so that we can better parallelize further work and get early feedback on the API. The following is being deferred to future PRs:
- Joins and Aggregations (prototype here f11f91e6f0)
- Support for Java
Additionally, the responsibility for binding an encoder to a given schema is currently done in a fairly ad-hoc fashion. This is an internal detail, and what we are doing today works for the cases we care about. However, as we add more APIs we'll probably need to do this in a more principled way (i.e. separate resolution from binding as we do in DataFrames).
## COMPATIBILITY NOTE
Long term we plan to make `DataFrame` extend `Dataset[Row]`. However,
making this change to che class hierarchy would break the function signatures for the existing
function operations (map, flatMap, etc). As such, this class should be considered a preview
of the final API. Changes will be made to the interface after Spark 1.6.
Author: Michael Armbrust <michael@databricks.com>
Closes#9190 from marmbrus/dataset-infra.
This PR change InMemoryTableScan to output UnsafeRow, and optimize the unrolling and scanning by coping the bytes for var-length types between UnsafeRow and ByteBuffer directly without creating the wrapper objects. When scanning the decimals in TPC-DS store_sales table, it's 80% faster (copy it as long without create Decimal objects).
Author: Davies Liu <davies@databricks.com>
Closes#9203 from davies/unsafe_cache.
In the analysis phase , while processing the rules for IN predicate, we
compare the in-list types to the lhs expression type and generate
cast operation if necessary. In the case of NULL [NOT] IN expr1 , we end up
generating cast between in list types to NULL like cast (1 as NULL) which
is not a valid cast.
The fix is to find a common type between LHS and RHS expressions and cast
all the expression to the common type.
Author: Dilip Biswal <dbiswal@us.ibm.com>
This patch had conflicts when merged, resolved by
Committer: Michael Armbrust <michael@databricks.com>
Closes#9036 from dilipbiswal/spark_8654_new.
I am changing the default behavior of `First`/`Last` to respect null values (the SQL standard default behavior).
https://issues.apache.org/jira/browse/SPARK-9740
Author: Yin Huai <yhuai@databricks.com>
Closes#8113 from yhuai/firstLast.
This PR introduce a new feature to run SQL directly on files without create a table, for example:
```
select id from json.`path/to/json/files` as j
```
Author: Davies Liu <davies@databricks.com>
Closes#9173 from davies/source.
Find out the missing attributes by recursively looking
at the sort order expression and rest of the code
takes care of projecting them out.
Added description from cloud-fan
I wanna explain a bit more about this bug.
When we resolve sort ordering, we will use a special method, which only resolves UnresolvedAttributes and UnresolvedExtractValue. However, for something like Floor('a), even the 'a is resolved, the floor expression may still being unresolved as data type mismatch(for example, 'a is string type and Floor need double type), thus can't pass this filter, and we can't push down this missing attribute 'a
Author: Dilip Biswal <dbiswal@us.ibm.com>
Closes#9123 from dilipbiswal/SPARK-10534.
Implement encode/decode for external row based on `ClassEncoder`.
TODO:
* code cleanup
* ~~fix corner cases~~
* refactor the encoder interface
* improve test for product codegen, to cover more corner cases.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#9184 from cloud-fan/encoder.
Push conjunctive predicates though Aggregate operators when their references are a subset of the groupingExpressions.
Query plan before optimisation :-
Filter ((c#138L = 2) && (a#0 = 3))
Aggregate [a#0], [a#0,count(b#1) AS c#138L]
Project [a#0,b#1]
LocalRelation [a#0,b#1,c#2]
Query plan after optimisation :-
Filter (c#138L = 2)
Aggregate [a#0], [a#0,count(b#1) AS c#138L]
Filter (a#0 = 3)
Project [a#0,b#1]
LocalRelation [a#0,b#1,c#2]
Author: nitin goyal <nitin.goyal@guavus.com>
Author: nitin.goyal <nitin.goyal@guavus.com>
Closes#9167 from nitin2goyal/master.
This PR improve the performance by:
1) Generate an Iterator that take Iterator[CachedBatch] as input, and call accessors (unroll the loop for columns), avoid the expensive Iterator.flatMap.
2) Use Unsafe.getInt/getLong/getFloat/getDouble instead of ByteBuffer.getInt/getLong/getFloat/getDouble, the later one actually read byte by byte.
3) Remove the unnecessary copy() in Coalesce(), which is not related to memory cache, found during benchmark.
The following benchmark showed that we can speedup the columnar cache of int by 2x.
```
path = '/opt/tpcds/store_sales/'
int_cols = ['ss_sold_date_sk', 'ss_sold_time_sk', 'ss_item_sk','ss_customer_sk']
df = sqlContext.read.parquet(path).select(int_cols).cache()
df.count()
t = time.time()
print df.select("*")._jdf.queryExecution().toRdd().count()
print time.time() - t
```
Author: Davies Liu <davies@databricks.com>
Closes#9145 from davies/byte_buffer.
Currently, we use CartesianProduct for join with null-safe-equal condition.
```
scala> sqlContext.sql("select * from t a join t b on (a.i <=> b.i)").explain
== Physical Plan ==
TungstenProject [i#2,j#3,i#7,j#8]
Filter (i#2 <=> i#7)
CartesianProduct
LocalTableScan [i#2,j#3], [[1,1]]
LocalTableScan [i#7,j#8], [[1,1]]
```
Actually, we can have an equal-join condition as `coalesce(i, default) = coalesce(b.i, default)`, then an partitioned join algorithm could be used.
After this PR, the plan will become:
```
>>> sqlContext.sql("select * from a join b ON a.id <=> b.id").explain()
TungstenProject [id#0L,id#1L]
Filter (id#0L <=> id#1L)
SortMergeJoin [coalesce(id#0L,0)], [coalesce(id#1L,0)]
TungstenSort [coalesce(id#0L,0) ASC], false, 0
TungstenExchange hashpartitioning(coalesce(id#0L,0),200)
ConvertToUnsafe
Scan PhysicalRDD[id#0L]
TungstenSort [coalesce(id#1L,0) ASC], false, 0
TungstenExchange hashpartitioning(coalesce(id#1L,0),200)
ConvertToUnsafe
Scan PhysicalRDD[id#1L]
```
Author: Davies Liu <davies@databricks.com>
Closes#9120 from davies/null_safe.
We can't parse `NOT` operator with comparison operations like `SELECT NOT TRUE > TRUE`, this PR fixed it.
Takes over https://github.com/apache/spark/pull/6326.
Author: Wenchen Fan <cloud0fan@outlook.com>
Closes#8617 from cloud-fan/not.
The purpose of this PR is to keep the unsafe format detail only inside the unsafe class itself, so when we use them(like use unsafe array in unsafe map, use unsafe array and map in columnar cache), we don't need to understand the format before use them.
change list:
* unsafe array's 4-bytes numElements header is now required(was optional), and become a part of unsafe array format.
* w.r.t the previous changing, the `sizeInBytes` of unsafe array now counts the 4-bytes header.
* unsafe map's format was `[numElements] [key array numBytes] [key array content(without numElements header)] [value array content(without numElements header)]` before, which is a little hacky as it makes unsafe array's header optional. I think saving 4 bytes is not a big deal, so the format is now: `[key array numBytes] [unsafe key array] [unsafe value array]`.
* w.r.t the previous changing, the `sizeInBytes` of unsafe map now counts both map's header and array's header.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#9131 from cloud-fan/unsafe.
Some json parsers are not closed. parser in JacksonParser#parseJson, for example.
Author: navis.ryu <navis@apache.org>
Closes#9130 from navis/SPARK-11124.
Actually all of the `UnaryMathExpression` doens't support the Decimal, will create follow ups for supporing it. This is the first PR which will be good to review the approach I am taking.
Author: Cheng Hao <hao.cheng@intel.com>
Closes#9086 from chenghao-intel/ceiling.
This patch extends TungstenAggregate to support ImperativeAggregate functions. The existing TungstenAggregate operator only supported DeclarativeAggregate functions, which are defined in terms of Catalyst expressions and can be evaluated via generated projections. ImperativeAggregate functions, on the other hand, are evaluated by calling their `initialize`, `update`, `merge`, and `eval` methods.
The basic strategy here is similar to how SortBasedAggregate evaluates both types of aggregate functions: use a generated projection to evaluate the expression-based declarative aggregates with dummy placeholder expressions inserted in place of the imperative aggregate function output, then invoke the imperative aggregate functions and target them against the aggregation buffer. The bulk of the diff here consists of code that was copied and adapted from SortBasedAggregate, with some key changes to handle TungstenAggregate's sort fallback path.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#9038 from JoshRosen/support-interpreted-in-tungsten-agg-final.
Right now, we have QualifiedTableName, TableIdentifier, and Seq[String] to represent table identifiers. We should only have one form and TableIdentifier is the best one because it provides methods to get table name, database name, return unquoted string, and return quoted string.
Author: Wenchen Fan <wenchen@databricks.com>
Author: Wenchen Fan <cloud0fan@163.com>
Closes#8453 from cloud-fan/table-name.
We should not stop resolving having when the having condtion is resolved, or something like `count(1)` will crash.
Author: Wenchen Fan <cloud0fan@163.com>
Closes#9105 from cloud-fan/having.
This is a first draft of the ability to construct expressions that will take a catalyst internal row and construct a Product (case class or tuple) that has fields with the correct names. Support include:
- Nested classes
- Maps
- Efficiently handling of arrays of primitive types
Not yet supported:
- Case classes that require custom collection types (i.e. List instead of Seq).
Author: Michael Armbrust <michael@databricks.com>
Closes#9100 from marmbrus/productContructor.
In the current implementation of named expressions' `ExprIds`, we rely on a per-JVM AtomicLong to ensure that expression ids are unique within a JVM. However, these expression ids will not be _globally_ unique. This opens the potential for id collisions if new expression ids happen to be created inside of tasks rather than on the driver.
There are currently a few cases where tasks allocate expression ids, which happen to be safe because those expressions are never compared to expressions created on the driver. In order to guard against the introduction of invalid comparisons between driver-created and executor-created expression ids, this patch extends `ExprId` to incorporate a UUID to identify the JVM that created the id, which prevents collisions.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#9093 from JoshRosen/SPARK-11080.
This PR improve the unrolling and read of complex types in columnar cache:
1) Using UnsafeProjection to do serialization of complex types, so they will not be serialized three times (two for actualSize)
2) Copy the bytes from UnsafeRow/UnsafeArrayData to ByteBuffer directly, avoiding the immediate byte[]
3) Using the underlying array in ByteBuffer to create UTF8String/UnsafeRow/UnsafeArrayData without copy.
Combine these optimizations, we can reduce the unrolling time from 25s to 21s (20% less), reduce the scanning time from 3.5s to 2.5s (28% less).
```
df = sqlContext.read.parquet(path)
t = time.time()
df.cache()
df.count()
print 'unrolling', time.time() - t
for i in range(10):
t = time.time()
print df.select("*")._jdf.queryExecution().toRdd().count()
print time.time() - t
```
The schema is
```
root
|-- a: struct (nullable = true)
| |-- b: long (nullable = true)
| |-- c: string (nullable = true)
|-- d: array (nullable = true)
| |-- element: long (containsNull = true)
|-- e: map (nullable = true)
| |-- key: long
| |-- value: string (valueContainsNull = true)
```
Now the columnar cache depends on that UnsafeProjection support all the data types (including UDT), this PR also fix that.
Author: Davies Liu <davies@databricks.com>
Closes#9016 from davies/complex2.
JIRA: https://issues.apache.org/jira/browse/SPARK-10960
When accessing a column in inner select from a select with window function, `AnalysisException` will be thrown. For example, an query like this:
select area, rank() over (partition by area order by tmp.month) + tmp.tmp1 as c1 from (select month, area, product, 1 as tmp1 from windowData) tmp
Currently, the rule `ExtractWindowExpressions` in `Analyzer` only extracts regular expressions from `WindowFunction`, `WindowSpecDefinition` and `AggregateExpression`. We need to also extract other attributes as the one in `Alias` as shown in the above query.
Author: Liang-Chi Hsieh <viirya@appier.com>
Closes#9011 from viirya/fix-window-inner-column.
This PR improve the sessions management by replacing the thread-local based to one SQLContext per session approach, introduce separated temporary tables and UDFs/UDAFs for each session.
A new session of SQLContext could be created by:
1) create an new SQLContext
2) call newSession() on existing SQLContext
For HiveContext, in order to reduce the cost for each session, the classloader and Hive client are shared across multiple sessions (created by newSession).
CacheManager is also shared by multiple sessions, so cache a table multiple times in different sessions will not cause multiple copies of in-memory cache.
Added jars are still shared by all the sessions, because SparkContext does not support sessions.
cc marmbrus yhuai rxin
Author: Davies Liu <davies@databricks.com>
Closes#8909 from davies/sessions.
UnsafeRow contains 3 pieces of information when pointing to some data in memory (an object, a base offset, and length). When the row is serialized with Java/Kryo serialization, the object layout in memory can change if two machines have different pointer width (Oops in JVM).
To reproduce, launch Spark using
MASTER=local-cluster[2,1,1024] bin/spark-shell --conf "spark.executor.extraJavaOptions=-XX:-UseCompressedOops"
And then run the following
scala> sql("select 1 xx").collect()
Author: Reynold Xin <rxin@databricks.com>
Closes#9030 from rxin/SPARK-10914.
This PR refactors Parquet write path to follow parquet-format spec. It's a successor of PR #7679, but with less non-essential changes.
Major changes include:
1. Replaces `RowWriteSupport` and `MutableRowWriteSupport` with `CatalystWriteSupport`
- Writes Parquet data using standard layout defined in parquet-format
Specifically, we are now writing ...
- ... arrays and maps in standard 3-level structure with proper annotations and field names
- ... decimals as `INT32` and `INT64` whenever possible, and taking `FIXED_LEN_BYTE_ARRAY` as the final fallback
- Supports legacy mode which is compatible with Spark 1.4 and prior versions
The legacy mode is by default off, and can be turned on by flipping SQL option `spark.sql.parquet.writeLegacyFormat` to `true`.
- Eliminates per value data type dispatching costs via prebuilt composed writer functions
1. Cleans up the last pieces of old Parquet support code
As pointed out by rxin previously, we probably want to rename all those `Catalyst*` Parquet classes to `Parquet*` for clarity. But I'd like to do this in a follow-up PR to minimize code review noises in this one.
Author: Cheng Lian <lian@databricks.com>
Closes#8988 from liancheng/spark-8848/standard-parquet-write-path.
This PR is a first cut at code generating an encoder that takes a Scala `Product` type and converts it directly into the tungsten binary format. This is done through the addition of a new set of expression that can be used to invoke methods on raw JVM objects, extracting fields and converting the result into the required format. These can then be used directly in an `UnsafeProjection` allowing us to leverage the existing encoding logic.
According to some simple benchmarks, this can significantly speed up conversion (~4x). However, replacing CatalystConverters is deferred to a later PR to keep this PR at a reasonable size.
```scala
case class SomeInts(a: Int, b: Int, c: Int, d: Int, e: Int)
val data = SomeInts(1, 2, 3, 4, 5)
val encoder = ProductEncoder[SomeInts]
val converter = CatalystTypeConverters.createToCatalystConverter(ScalaReflection.schemaFor[SomeInts].dataType)
(1 to 5).foreach {iter =>
benchmark(s"converter $iter") {
var i = 100000000
while (i > 0) {
val res = converter(data).asInstanceOf[InternalRow]
assert(res.getInt(0) == 1)
assert(res.getInt(1) == 2)
i -= 1
}
}
benchmark(s"encoder $iter") {
var i = 100000000
while (i > 0) {
val res = encoder.toRow(data)
assert(res.getInt(0) == 1)
assert(res.getInt(1) == 2)
i -= 1
}
}
}
```
Results:
```
[info] converter 1: 7170ms
[info] encoder 1: 1888ms
[info] converter 2: 6763ms
[info] encoder 2: 1824ms
[info] converter 3: 6912ms
[info] encoder 3: 1802ms
[info] converter 4: 7131ms
[info] encoder 4: 1798ms
[info] converter 5: 7350ms
[info] encoder 5: 1912ms
```
Author: Michael Armbrust <michael@databricks.com>
Closes#9019 from marmbrus/productEncoder.
This PR refactors `HashJoinNode` to take a existing `HashedRelation`. So, we can reuse this node for both `ShuffledHashJoin` and `BroadcastHashJoin`.
https://issues.apache.org/jira/browse/SPARK-10887
Author: Yin Huai <yhuai@databricks.com>
Closes#8953 from yhuai/SPARK-10887.
In the analysis phase , while processing the rules for IN predicate, we
compare the in-list types to the lhs expression type and generate
cast operation if necessary. In the case of NULL [NOT] IN expr1 , we end up
generating cast between in list types to NULL like cast (1 as NULL) which
is not a valid cast.
The fix is to not generate such a cast if the lhs type is a NullType instead
we translate the expression to Literal(Null).
Author: Dilip Biswal <dbiswal@us.ibm.com>
Closes#8983 from dilipbiswal/spark_8654.
Its pretty hard to debug problems with expressions when you can't see all the arguments.
Before: `invoke()`
After: `invoke(inputObject#1, intField, IntegerType)`
Author: Michael Armbrust <michael@databricks.com>
Closes#9022 from marmbrus/expressionToString.
This PR improve the performance of complex types in columnar cache by using UnsafeProjection instead of KryoSerializer.
A simple benchmark show that this PR could improve the performance of scanning a cached table with complex columns by 15x (comparing to Spark 1.5).
Here is the code used to benchmark:
```
df = sc.range(1<<23).map(lambda i: Row(a=Row(b=i, c=str(i)), d=range(10), e=dict(zip(range(10), [str(i) for i in range(10)])))).toDF()
df.write.parquet("table")
```
```
df = sqlContext.read.parquet("table")
df.cache()
df.count()
t = time.time()
print df.select("*")._jdf.queryExecution().toRdd().count()
print time.time() - t
```
Author: Davies Liu <davies@databricks.com>
Closes#8971 from davies/complex.
This patch allows `Repartition` to support UnsafeRows. This is accomplished by implementing the logical `Repartition` operator in terms of `Exchange` and a new `RoundRobinPartitioning`.
Author: Josh Rosen <joshrosen@databricks.com>
Author: Liang-Chi Hsieh <viirya@appier.com>
Closes#8083 from JoshRosen/SPARK-9702.
The created decimal is wrong if using `Decimal(unscaled, precision, scale)` with unscaled > 1e18 and and precision > 18 and scale > 0.
This bug exists since the beginning.
Author: Davies Liu <davies@databricks.com>
Closes#9014 from davies/fix_decimal.
DeclarativeAggregate matches more closely with ImperativeAggregate we already have.
Author: Reynold Xin <rxin@databricks.com>
Closes#9013 from rxin/SPARK-10982.
This patch refactors several of the Aggregate2 interfaces in order to improve code clarity.
The biggest change is a refactoring of the `AggregateFunction2` class hierarchy. In the old code, we had a class named `AlgebraicAggregate` that inherited from `AggregateFunction2`, added a new set of methods, then banned the use of the inherited methods. I found this to be fairly confusing because.
If you look carefully at the existing code, you'll see that subclasses of `AggregateFunction2` fall into two disjoint categories: imperative aggregation functions which directly extended `AggregateFunction2` and declarative, expression-based aggregate functions which extended `AlgebraicAggregate`. In order to make this more explicit, this patch refactors things so that `AggregateFunction2` is a sealed abstract class with two subclasses, `ImperativeAggregateFunction` and `ExpressionAggregateFunction`. The superclass, `AggregateFunction2`, now only contains methods and fields that are common to both subclasses.
After making this change, I updated the various AggregationIterator classes to comply with this new naming scheme. I also performed several small renamings in the aggregate interfaces themselves in order to improve clarity and rewrote or expanded a number of comments.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#8973 from JoshRosen/tungsten-agg-comments.
This PR is mostly cosmetic and cleans up some warts in codegen (nearly all of which were inherited from the original quasiquote version).
- Add lines numbers to errors (in stacktraces when debug logging is on, and always for compile fails)
- Use a variable for input row instead of hardcoding "i" everywhere
- rename `primitive` -> `value` (since its often actually an object)
Author: Michael Armbrust <michael@databricks.com>
Closes#9006 from marmbrus/codegen-cleanup.
`Murmur3_x86_32.hashUnsafeWords` only accepts word-aligned bytes, but unsafe array is not.
Author: Wenchen Fan <cloud0fan@163.com>
Closes#8987 from cloud-fan/hash.
This PR is a completely rewritten of GenerateUnsafeProjection, to accomplish the goal of copying data only once. The old code of GenerateUnsafeProjection is still there to reduce review difficulty.
Instead of creating unsafe conversion code for struct, array and map, we create code of writing the content to the global row buffer.
Author: Wenchen Fan <cloud0fan@163.com>
Author: Wenchen Fan <cloud0fan@outlook.com>
Closes#8747 from cloud-fan/copy-once.
The utilities such as Substring#substringBinarySQL and BinaryPrefixComparator#computePrefix for binary data are put together in ByteArray for easy-to-read.
Author: Takeshi YAMAMURO <linguin.m.s@gmail.com>
Closes#8122 from maropu/CleanUpForBinaryType.
Floor & Ceiling function should returns Long type, rather than Double.
Verified with MySQL & Hive.
Author: Cheng Hao <hao.cheng@intel.com>
Closes#8933 from chenghao-intel/ceiling.
This is an implementation of Hive's `json_tuple` function using Jackson Streaming.
Author: Nathan Howell <nhowell@godaddy.com>
Closes#7946 from NathanHowell/SPARK-9617.
This PR implements a HyperLogLog based Approximate Count Distinct function using the new UDAF interface.
The implementation is inspired by the ClearSpring HyperLogLog implementation and should produce the same results.
There is still some documentation and testing left to do.
cc yhuai
Author: Herman van Hovell <hvanhovell@questtec.nl>
Closes#8362 from hvanhovell/SPARK-9741.
When reading Parquet string and binary-backed decimal values, Parquet `Binary.getBytes` always returns a copied byte array, which is unnecessary. Since the underlying implementation of `Binary` values there is guaranteed to be `ByteArraySliceBackedBinary`, and Parquet itself never reuses underlying byte arrays, we can use `Binary.toByteBuffer.array()` to steal the underlying byte arrays without copying them.
This brings performance benefits when scanning Parquet string and binary-backed decimal columns. Note that, this trick doesn't cover binary-backed decimals with precision greater than 18.
My micro-benchmark result is that, this brings a ~15% performance boost for scanning TPC-DS `store_sales` table (scale factor 15).
Another minor optimization done in this PR is that, now we directly construct a Java `BigDecimal` in `Decimal.toJavaBigDecimal` without constructing a Scala `BigDecimal` first. This brings another ~5% performance gain.
Author: Cheng Lian <lian@databricks.com>
Closes#8907 from liancheng/spark-10811/eliminate-array-copying.
https://issues.apache.org/jira/browse/SPARK-10741
I choose the second approach: do not change output exprIds when convert MetastoreRelation to LogicalRelation
Author: Wenchen Fan <cloud0fan@163.com>
Closes#8889 from cloud-fan/hot-bug.
Since `scala.util.parsing.combinator.Parsers` is thread-safe since Scala 2.10 (See [SI-4929](https://issues.scala-lang.org/browse/SI-4929)), we can change SqlParser to object to avoid memory leak.
I didn't change other subclasses of `scala.util.parsing.combinator.Parsers` because there is only one instance in one SQLContext, which should not be an issue.
Author: zsxwing <zsxwing@gmail.com>
Closes#8357 from zsxwing/sql-memory-leak.
From JIRA: Schema merging should only handle struct fields. But currently we also reconcile decimal precision and scale information.
Author: Holden Karau <holden@pigscanfly.ca>
Closes#8634 from holdenk/SPARK-10449-dont-merge-different-precision.
Intersect and Except are both set operators and they use the all the columns to compare equality between rows. When pushing their Project parent down, the relations they based on would change, therefore not an equivalent transformation.
JIRA: https://issues.apache.org/jira/browse/SPARK-10539
I added some comments based on the fix of https://github.com/apache/spark/pull/8742.
Author: Yijie Shen <henry.yijieshen@gmail.com>
Author: Yin Huai <yhuai@databricks.com>
Closes#8823 from yhuai/fix_set_optimization.
Kryo fails with buffer overflow even with max value (2G).
{noformat}
org.apache.spark.SparkException: Kryo serialization failed: Buffer overflow. Available: 0, required: 1
Serialization trace:
containsChild (org.apache.spark.sql.catalyst.expressions.BoundReference)
child (org.apache.spark.sql.catalyst.expressions.SortOrder)
array (scala.collection.mutable.ArraySeq)
ordering (org.apache.spark.sql.catalyst.expressions.InterpretedOrdering)
interpretedOrdering (org.apache.spark.sql.types.StructType)
schema (org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema). To avoid this, increase spark.kryoserializer.buffer.max value.
at org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:263)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:240)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
{noformat}
Author: navis.ryu <navis@apache.org>
Closes#8808 from navis/SPARK-10684.
This fixes https://issues.apache.org/jira/browse/SPARK-9794 by using a real ISO8601 parser. (courtesy of the xml component of the standard java library)
cc: angelini
Author: Kevin Cox <kevincox@kevincox.ca>
Closes#8396 from kevincox/kevincox-sql-time-parsing.
Sometimes we can't push down the whole `Project` though `Sort`, but we still have a chance to push down part of it.
Author: Wenchen Fan <cloud0fan@outlook.com>
Closes#8644 from cloud-fan/column-prune.
JIRA: https://issues.apache.org/jira/browse/SPARK-10437
If an expression in `SortOrder` is a resolved one, such as `count(1)`, the corresponding rule in `Analyzer` to make it work in order by will not be applied.
Author: Liang-Chi Hsieh <viirya@appier.com>
Closes#8599 from viirya/orderby-agg.
Move .java files in `src/main/scala` to `src/main/java` root, except for `package-info.java` (to stay next to package.scala)
Author: Sean Owen <sowen@cloudera.com>
Closes#8736 from srowen/SPARK-10576.
Adding STDDEV support for DataFrame using 1-pass online /parallel algorithm to compute variance. Please review the code change.
Author: JihongMa <linlin200605@gmail.com>
Author: Jihong MA <linlin200605@gmail.com>
Author: Jihong MA <jihongma@jihongs-mbp.usca.ibm.com>
Author: Jihong MA <jihongma@Jihongs-MacBook-Pro.local>
Closes#6297 from JihongMA/SPARK-SQL.
Before this fix, `MyDenseVectorUDT.typeName` gives `mydensevecto`, which is not desirable.
Author: Cheng Lian <lian@databricks.com>
Closes#8640 from liancheng/spark-10472/udt-type-name.
Use these in the optimizer as well:
A and (not(A) or B) => A and B
not(A and B) => not(A) or not(B)
not(A or B) => not(A) and not(B)
Author: Yash Datta <Yash.Datta@guavus.com>
Closes#5700 from saucam/bool_simp.
The reason for this extra copy is that we iterate the array twice: calculate elements data size and copy elements to array buffer.
A simple solution is to follow `createCodeForStruct`, we can dynamically grow the buffer when needed and thus don't need to know the data size ahead.
This PR also include some typo and style fixes, and did some minor refactor to make sure `input.primitive` is always variable name not code when generate unsafe code.
Author: Wenchen Fan <cloud0fan@outlook.com>
Closes#8496 from cloud-fan/avoid-copy.
When we generate unsafe code inside `createCodeForXXX`, we always assign the `input.primitive` to a temp variable in case `input.primitive` is expression code.
This PR did some refactor to make sure `input.primitive` is always variable name, and some other typo and style fixes.
Author: Wenchen Fan <cloud0fan@outlook.com>
Closes#8613 from cloud-fan/minor.
The bulk of the changes are on `transient` annotation on class parameter. Often the compiler doesn't generate a field for this parameters, so the the transient annotation would be unnecessary.
But if the class parameter are used in methods, then fields are created. So it is safer to keep the annotations.
The remainder are some potential bugs, and deprecated syntax.
Author: Luc Bourlier <luc.bourlier@typesafe.com>
Closes#8433 from skyluc/issue/sbt-2.11.
We did a lot of special handling for non-deterministic expressions in `Optimizer`. However, `PhysicalOperation` just collects all Projects and Filters and mess it up. We should respect the operators order caused by non-deterministic expressions in `PhysicalOperation`.
Author: Wenchen Fan <cloud0fan@outlook.com>
Closes#8486 from cloud-fan/fix.
For example, we can write `SELECT MAX(value) FROM src GROUP BY key + 1 ORDER BY key + 1` in PostgreSQL, and we should support this in Spark SQL.
Author: Wenchen Fan <cloud0fan@outlook.com>
Closes#8548 from cloud-fan/support-order-by-non-attribute.
After this PR, In/InSet/ArrayContain will return null if value is null, instead of false. They also will return null even if there is a null in the set/array.
Author: Davies Liu <davies@databricks.com>
Closes#8492 from davies/fix_in.
This commit fixes an issue where the public SQL `Row` class did not override `hashCode`, causing it to violate the hashCode() + equals() contract. To fix this, I simply ported the `hashCode` implementation from the 1.4.x version of `Row`.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#8500 from JoshRosen/SPARK-10325 and squashes the following commits:
51ffea1 [Josh Rosen] Override hashCode() for public Row.
In BigDecimal or java.math.BigDecimal, the precision could be smaller than scale, for example, BigDecimal("0.001") has precision = 1 and scale = 3. But DecimalType require that the precision should be larger than scale, so we should use the maximum of precision and scale when inferring the schema from decimal literal.
Author: Davies Liu <davies@databricks.com>
Closes#8428 from davies/smaller_decimal.
Replace `JavaConversions` implicits with `JavaConverters`
Most occurrences I've seen so far are necessary conversions; a few have been avoidable. None are in critical code as far as I see, yet.
Author: Sean Owen <sowen@cloudera.com>
Closes#8033 from srowen/SPARK-9613.
We misunderstood the Julian days and nanoseconds of the day in parquet (as TimestampType) from Hive/Impala, they are overlapped, so can't be added together directly.
In order to avoid the confusing rounding when do the converting, we use `2440588` as the Julian Day of epoch of unix timestamp (which should be 2440587.5).
Author: Davies Liu <davies@databricks.com>
Author: Cheng Lian <lian@databricks.com>
Closes#8400 from davies/timestamp_parquet.
This patch adds an analyzer rule to ensure that set operations (union, intersect, and except) are only applied to tables with the same number of columns. Without this rule, there are scenarios where invalid queries can return incorrect results instead of failing with error messages; SPARK-9813 provides one example of this problem. In other cases, the invalid query can crash at runtime with extremely confusing exceptions.
I also performed a bit of cleanup to refactor some of those logical operators' code into a common `SetOperation` base class.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#7631 from JoshRosen/SPARK-9293.
Currently, we eagerly attempt to resolve functions, even before their children are resolved. However, this is not valid in cases where we need to know the types of the input arguments (i.e. when resolving Hive UDFs).
As a fix, this PR delays function resolution until the functions children are resolved. This change also necessitates a change to the way we resolve aggregate expressions that are not in aggregate operators (e.g., in `HAVING` or `ORDER BY` clauses). Specifically, we can't assume that these misplaced functions will be resolved, allowing us to differentiate aggregate functions from normal functions. To compensate for this change we now attempt to resolve these unresolved expressions in the context of the aggregate operator, before checking to see if any aggregate expressions are present.
Author: Michael Armbrust <michael@databricks.com>
Closes#8371 from marmbrus/hiveUDFResolution.
This adds a missing null check to the Decimal `toScala` converter in `CatalystTypeConverters`, fixing an NPE.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#8401 from JoshRosen/SPARK-10190.
Type coercion for IF should have children resolved first, or we could meet unresolved exception.
Author: Daoyuan Wang <daoyuan.wang@intel.com>
Closes#8331 from adrian-wang/spark10130.
This is based on #7779 , thanks to tarekauel . Fix the conflict and nullability.
Closes#7779 and #8274 .
Author: Tarek Auel <tarek.auel@googlemail.com>
Author: Davies Liu <davies@databricks.com>
Closes#8330 from davies/stringLocate.
https://issues.apache.org/jira/browse/SPARK-10092
This pr is a follow-up one for Multi-DB support. It has the following changes:
* `HiveContext.refreshTable` now accepts `dbName.tableName`.
* `HiveContext.analyze` now accepts `dbName.tableName`.
* `CreateTableUsing`, `CreateTableUsingAsSelect`, `CreateTempTableUsing`, `CreateTempTableUsingAsSelect`, `CreateMetastoreDataSource`, and `CreateMetastoreDataSourceAsSelect` all take `TableIdentifier` instead of the string representation of table name.
* When you call `saveAsTable` with a specified database, the data will be saved to the correct location.
* Explicitly do not allow users to create a temporary with a specified database name (users cannot do it before).
* When we save table to metastore, we also check if db name and table name can be accepted by hive (using `MetaStoreUtils.validateName`).
Author: Yin Huai <yhuai@databricks.com>
Closes#8324 from yhuai/saveAsTableDB.
A few minor changes:
1. Improved documentation
2. Rename apply(distinct....) to distinct.
3. Changed MutableAggregationBuffer from a trait to an abstract class.
4. Renamed returnDataType to dataType to be more consistent with other expressions.
And unrelated to UDAFs:
1. Renamed file names in expressions to use suffix "Expressions" to be more consistent.
2. Moved regexp related expressions out to its own file.
3. Renamed StringComparison => StringPredicate.
Author: Reynold Xin <rxin@databricks.com>
Closes#8321 from rxin/SPARK-9242.
create t1 (a decimal(7, 2), b long);
select case when 1=1 then a else 1.0 end from t1;
select case when 1=1 then a else b end from t1;
Author: Daoyuan Wang <daoyuan.wang@intel.com>
Closes#8270 from adrian-wang/casewhenfractional.
We should rounding the result of multiply/division of decimal to expected precision/scale, also check overflow.
Author: Davies Liu <davies@databricks.com>
Closes#8287 from davies/decimal_division.
This is kind of a weird case, but given a sufficiently complex query plan (in this case a TungstenProject with an Exchange underneath), we could have NPEs on the executors due to the time when we were calling transformAllExpressions
In general we should ensure that all transformations occur on the driver and not on the executors. Some reasons for avoid executor side transformations include:
* (this case) Some operator constructors require state such as access to the Spark/SQL conf so doing a makeCopy on the executor can fail.
* (unrelated reason for avoid executor transformations) ExprIds are calculated using an atomic integer, so you can violate their uniqueness constraint by constructing them anywhere other than the driver.
This subsumes #8285.
Author: Reynold Xin <rxin@databricks.com>
Author: Michael Armbrust <michael@databricks.com>
Closes#8295 from rxin/SPARK-10096.
In UnsafeRow, we use the private field of BigInteger for better performance, but it actually didn't contribute much (3% in one benchmark) to end-to-end runtime, and make it not portable (may fail on other JVM implementations).
So we should use the public API instead.
cc rxin
Author: Davies Liu <davies@databricks.com>
Closes#8286 from davies/portable_decimal.
The type for array of array in Java is slightly different than array of others.
cc cloud-fan
Author: Davies Liu <davies@databricks.com>
Closes#8250 from davies/array_binary.
https://issues.apache.org/jira/browse/SPARK-9592#8113 has the fundamental fix. But, if we want to minimize the number of changed lines, we can go with this one. Then, in 1.6, we merge #8113.
Author: Yin Huai <yhuai@databricks.com>
Closes#8172 from yhuai/lastFix and squashes the following commits:
b28c42a [Yin Huai] Regression test.
af87086 [Yin Huai] Fix last.
We should skip unresolved `LogicalPlan`s for `PullOutNondeterministic`, as calling `output` on unresolved `LogicalPlan` will produce confusing error message.
Author: Wenchen Fan <cloud0fan@outlook.com>
Closes#8203 from cloud-fan/error-msg and squashes the following commits:
1c67ca7 [Wenchen Fan] move test
7593080 [Wenchen Fan] correct error message for aggregate
Also alias the ExtractValue instead of wrapping it with UnresolvedAlias when resolve attribute in LogicalPlan, as this alias will be trimmed if it's unnecessary.
Based on #7957 without the changes to mllib, but instead maintaining earlier behavior when using `withColumn` on expressions that already have metadata.
Author: Wenchen Fan <cloud0fan@outlook.com>
Author: Michael Armbrust <michael@databricks.com>
Closes#8215 from marmbrus/pr/7957.
As `InternalRow` does not extend `Row` now, I think we can remove it.
Author: Liang-Chi Hsieh <viirya@appier.com>
Closes#8170 from viirya/remove_canequal.
PR #7967 enables us to save data source relations to metastore in Hive compatible format when possible. But it fails to persist Parquet relations with decimal column(s) to Hive metastore of versions lower than 1.2.0. This is because `ParquetHiveSerDe` in Hive versions prior to 1.2.0 doesn't support decimal. This PR checks for this case and falls back to Spark SQL specific metastore table format.
Author: Yin Huai <yhuai@databricks.com>
Author: Cheng Lian <lian@databricks.com>
Closes#8130 from liancheng/spark-9757/old-hive-parquet-decimal.
`RuleExecutor.timeMap` is currently a non-thread-safe mutable HashMap; this can lead to infinite loops if multiple threads are concurrently modifying the map. I believe that this is responsible for some hangs that I've observed in HiveQuerySuite.
This patch addresses this by using a Guava `AtomicLongMap`.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#8120 from JoshRosen/rule-executor-time-map-fix.
HashPartitioning compatibility is currently defined w.r.t the _set_ of expressions, but the ordering of those expressions matters when computing hash codes; this could lead to incorrect answers if we mistakenly avoided a shuffle based on the assumption that HashPartitionings with the same expressions in different orders will produce equivalent row hashcodes. The first commit adds a regression test which illustrates this problem.
The fix for this is simple: make `HashPartitioning.compatibleWith` and `HashPartitioning.guarantees` sensitive to the expression ordering (i.e. do not perform set comparison).
Author: Josh Rosen <joshrosen@databricks.com>
Closes#8074 from JoshRosen/hashpartitioning-compatiblewith-fixes and squashes the following commits:
b61412f [Josh Rosen] Demonstrate that I haven't cheated in my fix
0b4d7d9 [Josh Rosen] Update so that clusteringSet is only used in satisfies().
dc9c9d7 [Josh Rosen] Add failing regression test for SPARK-9785
PlatformDependent.UNSAFE is way too verbose.
Author: Reynold Xin <rxin@databricks.com>
Closes#8094 from rxin/SPARK-9815 and squashes the following commits:
229b603 [Reynold Xin] [SPARK-9815] Rename PlatformDependent.UNSAFE -> Platform.
This patch adds a new `SortMergeOuterJoin` operator that performs left and right outer joins using sort merge join. It also refactors `SortMergeJoin` in order to improve performance and code clarity.
Along the way, I also performed a couple pieces of minor cleanup and optimization:
- Rename the `HashJoin` physical planner rule to `EquiJoinSelection`, since it's also used for non-hash joins.
- Rewrite the comment at the top of `HashJoin` to better explain the precedence for choosing join operators.
- Update `JoinSuite` to use `SqlTestUtils.withConf` for changing SQLConf settings.
This patch incorporates several ideas from adrian-wang's patch, #5717.
Closes#5717.
<!-- Reviewable:start -->
[<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/7904)
<!-- Reviewable:end -->
Author: Josh Rosen <joshrosen@databricks.com>
Author: Daoyuan Wang <daoyuan.wang@intel.com>
Closes#7904 from JoshRosen/outer-join-smj and squashes 1 commits.
This patch optimize two things:
1. passing MathContext to JavaBigDecimal.multiply/divide/reminder to do right rounding, because java.math.BigDecimal.apply(MathContext) is expensive
2. Cast integer/short/byte to decimal directly (without double)
This two optimizations could speed up the end-to-end time of a aggregation (SUM(short * decimal(5, 2)) 75% (from 19s -> 10.8s)
Author: Davies Liu <davies@databricks.com>
Closes#8052 from davies/optimize_decimal and squashes the following commits:
225efad [Davies Liu] improve decimal.times() and cast(int, decimalType)
Currently, generated UnsafeProjection can reach 64k byte code limit of Java. This patch will split the generated expressions into multiple functions, to avoid the limitation.
After this patch, we can work well with table that have up to 64k columns (hit max number of constants limit in Java), it should be enough in practice.
cc rxin
Author: Davies Liu <davies@databricks.com>
Closes#8044 from davies/wider_table and squashes the following commits:
9192e6c [Davies Liu] fix generated safe projection
d1ef81a [Davies Liu] fix failed tests
737b3d3 [Davies Liu] Merge branch 'master' of github.com:apache/spark into wider_table
ffcd132 [Davies Liu] address comments
1b95be4 [Davies Liu] put the generated class into sql package
77ed72d [Davies Liu] address comments
4518e17 [Davies Liu] Merge branch 'master' of github.com:apache/spark into wider_table
75ccd01 [Davies Liu] Merge branch 'master' of github.com:apache/spark into wider_table
495e932 [Davies Liu] support wider table with more than 1k columns for generated projections
This pull request refactors the `EnsureRequirements` planning rule in order to avoid the addition of certain unnecessary shuffles.
As an example of how unnecessary shuffles can occur, consider SortMergeJoin, which requires clustered distribution and sorted ordering of its children's input rows. Say that both of SMJ's children produce unsorted output but are both SinglePartition. In this case, we will need to inject sort operators but should not need to inject Exchanges. Unfortunately, it looks like the EnsureRequirements unnecessarily repartitions using a hash partitioning.
This patch solves this problem by refactoring `EnsureRequirements` to properly implement the `compatibleWith` checks that were broken in earlier implementations. See the significant inline comments for a better description of how this works. The majority of this PR is new comments and test cases, with few actual changes to the code.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#7988 from JoshRosen/exchange-fixes and squashes the following commits:
38006e7 [Josh Rosen] Rewrite EnsureRequirements _yet again_ to make things even simpler
0983f75 [Josh Rosen] More guarantees vs. compatibleWith cleanup; delete BroadcastPartitioning.
8784bd9 [Josh Rosen] Giant comment explaining compatibleWith vs. guarantees
1307c50 [Josh Rosen] Update conditions for requiring child compatibility.
18cddeb [Josh Rosen] Rename DummyPlan to DummySparkPlan.
2c7e126 [Josh Rosen] Merge remote-tracking branch 'origin/master' into exchange-fixes
fee65c4 [Josh Rosen] Further refinement to comments / reasoning
642b0bb [Josh Rosen] Further expand comment / reasoning
06aba0c [Josh Rosen] Add more comments
8dbc845 [Josh Rosen] Add even more tests.
4f08278 [Josh Rosen] Fix the test by adding the compatibility check to EnsureRequirements
a1c12b9 [Josh Rosen] Add failing test to demonstrate allCompatible bug
0725a34 [Josh Rosen] Small assertion cleanup.
5172ac5 [Josh Rosen] Add test for requiresChildrenToProduceSameNumberOfPartitions.
2e0f33a [Josh Rosen] Write a more generic test for EnsureRequirements.
752b8de [Josh Rosen] style fix
c628daf [Josh Rosen] Revert accidental ExchangeSuite change.
c9fb231 [Josh Rosen] Rewrite exchange to fix better handle this case.
adcc742 [Josh Rosen] Move test to PlannerSuite.
0675956 [Josh Rosen] Preserving ordering and partitioning in row format converters also does not help.
cc5669c [Josh Rosen] Adding outputPartitioning to Repartition does not fix the test.
2dfc648 [Josh Rosen] Add failing test illustrating bad exchange planning.
Author: Yijie Shen <henry.yijieshen@gmail.com>
Closes#8057 from yjshen/explode_star and squashes the following commits:
eae181d [Yijie Shen] change explaination message
54c9d11 [Yijie Shen] meaning message for * in explode
In https://github.com/apache/spark/pull/7752 we added `FromUnsafe` to convert nexted unsafe data like array/map/struct to safe versions. It's a quick solution and we already have `GenerateSafe` to do the conversion which is codegened. So we should remove `FromUnsafe` and implement its codegen version in `GenerateSafe`.
Author: Wenchen Fan <cloud0fan@outlook.com>
Closes#8029 from cloud-fan/from-unsafe and squashes the following commits:
ed40d8f [Wenchen Fan] add the copy back
a93fd4b [Wenchen Fan] cogengen FromUnsafe
All data sources show up as "PhysicalRDD" in physical plan explain. It'd be better if we can show the name of the data source.
Without this patch:
```
== Physical Plan ==
NewAggregate with UnsafeHybridAggregationIterator ArrayBuffer(date#0, cat#1) ArrayBuffer((sum(CAST((CAST(count#2, IntegerType) + 1), LongType))2,mode=Final,isDistinct=false))
Exchange hashpartitioning(date#0,cat#1)
NewAggregate with UnsafeHybridAggregationIterator ArrayBuffer(date#0, cat#1) ArrayBuffer((sum(CAST((CAST(count#2, IntegerType) + 1), LongType))2,mode=Partial,isDistinct=false))
PhysicalRDD [date#0,cat#1,count#2], MapPartitionsRDD[3] at
```
With this patch:
```
== Physical Plan ==
TungstenAggregate(key=[date#0,cat#1], value=[(sum(CAST((CAST(count#2, IntegerType) + 1), LongType)),mode=Final,isDistinct=false)]
Exchange hashpartitioning(date#0,cat#1)
TungstenAggregate(key=[date#0,cat#1], value=[(sum(CAST((CAST(count#2, IntegerType) + 1), LongType)),mode=Partial,isDistinct=false)]
ConvertToUnsafe
Scan ParquetRelation[file:/scratch/rxin/spark/sales4][date#0,cat#1,count#2]
```
Author: Reynold Xin <rxin@databricks.com>
Closes#8024 from rxin/SPARK-9733 and squashes the following commits:
811b90e [Reynold Xin] Fixed Python test case.
52cab77 [Reynold Xin] Cast.
eea9ccc [Reynold Xin] Fix test case.
fcecb22 [Reynold Xin] [SPARK-9733][SQL] Improve explain message for data source scan node.
JoinedRow.anyNull currently loops through every field to check for null, which is inefficient if the underlying rows are UnsafeRows. It should just delegate to the underlying implementation.
Author: Reynold Xin <rxin@databricks.com>
Closes#8027 from rxin/SPARK-9736 and squashes the following commits:
03a2e92 [Reynold Xin] Include all files.
90f1add [Reynold Xin] [SPARK-9736][SQL] JoinedRow.anyNull should delegate to the underlying rows.
When we convert unsafe row to safe row, we will do copy if the column is struct or string type. However, the string inside unsafe array/map are not copied, which may cause problems.
Author: Wenchen Fan <cloud0fan@outlook.com>
Closes#7990 from cloud-fan/copy and squashes the following commits:
c13d1e3 [Wenchen Fan] change test name
fe36294 [Wenchen Fan] we should deep copy UTF8String when convert unsafe row to safe row
Make sure that `$"column"` is consistent with other methods with respect to backticks. Adds a bunch of tests for various ways of constructing columns.
Author: Michael Armbrust <michael@databricks.com>
Closes#7969 from marmbrus/namesWithDots and squashes the following commits:
53ef3d7 [Michael Armbrust] [SPARK-9650][SQL] Fix quoting behavior on interpolated column names
2bf7a92 [Michael Armbrust] WIP
This is the followup of https://github.com/apache/spark/pull/7813. It renames `HybridUnsafeAggregationIterator` to `TungstenAggregationIterator` and makes it only work with `UnsafeRow`. Also, I add a `TungstenAggregate` that uses `TungstenAggregationIterator` and make `SortBasedAggregate` (renamed from `SortBasedAggregate`) only works with `SafeRow`.
Author: Yin Huai <yhuai@databricks.com>
Closes#7954 from yhuai/agg-followUp and squashes the following commits:
4d2f4fc [Yin Huai] Add comments and free map.
0d7ddb9 [Yin Huai] Add TungstenAggregationQueryWithControlledFallbackSuite to test fall back process.
91d69c2 [Yin Huai] Rename UnsafeHybridAggregationIterator to TungstenAggregateIteraotr and make it only work with UnsafeRow.
This re-applies #7955, which was reverted due to a race condition to fix build breaking.
Author: Wenchen Fan <cloud0fan@outlook.com>
Author: Reynold Xin <rxin@databricks.com>
Closes#8002 from rxin/InternalRow-toSeq and squashes the following commits:
332416a [Reynold Xin] Merge pull request #7955 from cloud-fan/toSeq
21665e2 [Wenchen Fan] fix hive again...
4addf29 [Wenchen Fan] fix hive
bc16c59 [Wenchen Fan] minor fix
33d802c [Wenchen Fan] pass data type info to InternalRow.toSeq
3dd033e [Wenchen Fan] move the default special getters implementation from InternalRow to BaseGenericInternalRow
seems https://github.com/apache/spark/pull/7955 breaks the build.
Author: Yin Huai <yhuai@databricks.com>
Closes#8001 from yhuai/SPARK-9632-fixBuild and squashes the following commits:
6c257dd [Yin Huai] Fix build.
Author: Wenchen Fan <cloud0fan@outlook.com>
Closes#7955 from cloud-fan/toSeq and squashes the following commits:
21665e2 [Wenchen Fan] fix hive again...
4addf29 [Wenchen Fan] fix hive
bc16c59 [Wenchen Fan] minor fix
33d802c [Wenchen Fan] pass data type info to InternalRow.toSeq
3dd033e [Wenchen Fan] move the default special getters implementation from InternalRow to BaseGenericInternalRow
In order to support update a varlength (actually fixed length) object, the space should be preserved even it's null. And, we can't call setNullAt(i) for it anymore, we because setNullAt(i) will remove the offset of the preserved space, should call setDecimal(i, null, precision) instead.
After this, we can do hash based aggregation on DecimalType with precision > 18. In a tests, this could decrease the end-to-end run time of aggregation query from 37 seconds (sort based) to 24 seconds (hash based).
cc rxin
Author: Davies Liu <davies@databricks.com>
Closes#7978 from davies/update_decimal and squashes the following commits:
bed8100 [Davies Liu] isSettable -> isMutable
923c9eb [Davies Liu] address comments and fix bug
385891d [Davies Liu] Merge branch 'master' of github.com:apache/spark into update_decimal
36a1872 [Davies Liu] fix tests
cd6c524 [Davies Liu] support set decimal with precision > 18
![translate](http://www.w3resource.com/PostgreSQL/postgresql-translate-function.png)
Author: zhichao.li <zhichao.li@intel.com>
Closes#7709 from zhichao-li/translate and squashes the following commits:
9418088 [zhichao.li] refine checking condition
f2ab77a [zhichao.li] clone string
9d88f2d [zhichao.li] fix indent
6aa2962 [zhichao.li] style
e575ead [zhichao.li] add python api
9d4bab0 [zhichao.li] add special case for fodable and refactor unittest
eda7ad6 [zhichao.li] update to use TernaryExpression
cdfd4be [zhichao.li] add function translate
This patches renames `RowOrdering` to `InterpretedOrdering` and updates SortMergeJoin to use the `SparkPlan` methods for constructing its ordering so that it may benefit from codegen.
This is an updated version of #7408.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#7973 from JoshRosen/SPARK-9054 and squashes the following commits:
e610655 [Josh Rosen] Add comment RE: Ascending ordering
34b8e0c [Josh Rosen] Import ordering
be19a0f [Josh Rosen] [SPARK-9054] [SQL] Rename RowOrdering to InterpretedOrdering; use newOrdering in more places.
This continues tarekauel's work in #7778.
Author: Liang-Chi Hsieh <viirya@appier.com>
Author: Tarek Auel <tarek.auel@googlemail.com>
Closes#7893 from viirya/codegen_in and squashes the following commits:
81ff97b [Liang-Chi Hsieh] For comments.
47761c6 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into codegen_in
cf4bf41 [Liang-Chi Hsieh] For comments.
f532b3c [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into codegen_in
446bbcd [Liang-Chi Hsieh] Fix bug.
b3d0ab4 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into codegen_in
4610eff [Liang-Chi Hsieh] Relax the types of references and update optimizer test.
224f18e [Liang-Chi Hsieh] Beef up the test cases for In and InSet to include all primitive data types.
86dc8aa [Liang-Chi Hsieh] Only convert In to InSet when the number of items in set is more than the threshold.
b7ded7e [Tarek Auel] [SPARK-9403][SQL] codeGen in / inSet
This is a follow-up of https://github.com/apache/spark/pull/7920 to fix comments.
Author: Yin Huai <yhuai@databricks.com>
Closes#7964 from yhuai/SPARK-9141-follow-up and squashes the following commits:
4d0ee80 [Yin Huai] Fix comments.
Currently we collapse successive projections that are added by `withColumn`. However, this optimization violates the constraint that adding nodes to a plan will never change its analyzed form and thus breaks caching. Instead of doing early optimization, in this PR I just fix some low-hanging slowness in the analyzer. In particular, I add a mechanism for skipping already analyzed subplans, `resolveOperators` and `resolveExpression`. Since trees are generally immutable after construction, it's safe to annotate a plan as already analyzed as any transformation will create a new tree with this bit no longer set.
Together these result in a faster analyzer than before, even with added timing instrumentation.
```
Original Code
[info] 3430ms
[info] 2205ms
[info] 1973ms
[info] 1982ms
[info] 1916ms
Without Project Collapsing in DataFrame
[info] 44610ms
[info] 45977ms
[info] 46423ms
[info] 46306ms
[info] 54723ms
With analyzer optimizations
[info] 6394ms
[info] 4630ms
[info] 4388ms
[info] 4093ms
[info] 4113ms
With resolveOperators
[info] 2495ms
[info] 1380ms
[info] 1685ms
[info] 1414ms
[info] 1240ms
```
Author: Michael Armbrust <michael@databricks.com>
Closes#7920 from marmbrus/withColumnCache and squashes the following commits:
2145031 [Michael Armbrust] fix hive udfs tests
5a5a525 [Michael Armbrust] remove wrong comment
7a507d5 [Michael Armbrust] style
b59d710 [Michael Armbrust] revert small change
1fa5949 [Michael Armbrust] move logic into LogicalPlan, add tests
0e2cb43 [Michael Armbrust] Merge remote-tracking branch 'origin/master' into withColumnCache
c926e24 [Michael Armbrust] naming
e593a2d [Michael Armbrust] style
f5a929e [Michael Armbrust] [SPARK-9141][SQL] Remove project collapsing from DataFrame API
38b1c83 [Michael Armbrust] WIP
JIRA: https://issues.apache.org/jira/browse/SPARK-9628
Author: Yijie Shen <henry.yijieshen@gmail.com>
Closes#7953 from yjshen/datetime_alias and squashes the following commits:
3cac3cc [Yijie Shen] rename int to SQLDate, long to SQLTimestamp for better readability
The current implementation of UnsafeExternalSort uses NoOpPrefixComparator for binary-typed data.
So, we need to add BinaryPrefixComparator in PrefixComparators.
Author: Takeshi YAMAMURO <linguin.m.s@gmail.com>
Closes#7676 from maropu/BinaryTypePrefixComparator and squashes the following commits:
fe6f31b [Takeshi YAMAMURO] Apply comments
d943c04 [Takeshi YAMAMURO] Add a codegen'd entry for BinaryType in SortPrefix
ecf3ac5 [Takeshi YAMAMURO] Support BinaryType in PrefixComparator
Let Decimal carry the correct precision and scale with DecimalType.
cc rxin yhuai
Author: Davies Liu <davies@databricks.com>
Closes#7925 from davies/decimal_scale and squashes the following commits:
e19701a [Davies Liu] some tweaks
57d78d2 [Davies Liu] fix tests
5d5bc69 [Davies Liu] match precision and scale with DecimalType
This PR is based on #7580 , thanks to EntilZha
PR for work on https://issues.apache.org/jira/browse/SPARK-8231
Currently, I have an initial implementation for contains. Based on discussion on JIRA, it should behave same as Hive: https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFArrayContains.java#L102-L128
Main points are:
1. If the array is empty, null, or the value is null, return false
2. If there is a type mismatch, throw error
3. If comparison is not supported, throw error
Closes#7580
Author: Pedro Rodriguez <prodriguez@trulia.com>
Author: Pedro Rodriguez <ski.rodriguez@gmail.com>
Author: Davies Liu <davies@databricks.com>
Closes#7949 from davies/array_contains and squashes the following commits:
d3c08bc [Davies Liu] use foreach() to avoid copy
bc3d1fe [Davies Liu] fix array_contains
719e37d [Davies Liu] Merge branch 'master' of github.com:apache/spark into array_contains
e352cf9 [Pedro Rodriguez] fixed diff from master
4d5b0ff [Pedro Rodriguez] added docs and another type check
ffc0591 [Pedro Rodriguez] fixed unit test
7a22deb [Pedro Rodriguez] Changed test to use strings instead of long/ints which are different between python 2 an 3
b5ffae8 [Pedro Rodriguez] fixed pyspark test
4e7dce3 [Pedro Rodriguez] added more docs
3082399 [Pedro Rodriguez] fixed unit test
46f9789 [Pedro Rodriguez] reverted change
d3ca013 [Pedro Rodriguez] Fixed type checking to match hive behavior, then added tests to insure this
8528027 [Pedro Rodriguez] added more tests
686e029 [Pedro Rodriguez] fix scala style
d262e9d [Pedro Rodriguez] reworked type checking code and added more tests
2517a58 [Pedro Rodriguez] removed unused import
28b4f71 [Pedro Rodriguez] fixed bug with type conversions and re-added tests
12f8795 [Pedro Rodriguez] fix scala style checks
e8a20a9 [Pedro Rodriguez] added python df (broken atm)
65b562c [Pedro Rodriguez] made array_contains nullable false
33b45aa [Pedro Rodriguez] reordered test
9623c64 [Pedro Rodriguez] fixed test
4b4425b [Pedro Rodriguez] changed Arrays in tests to Seqs
72cb4b1 [Pedro Rodriguez] added checkInputTypes and docs
69c46fb [Pedro Rodriguez] added tests and codegen
9e0bfc4 [Pedro Rodriguez] initial attempt at implementation
Author: Wenchen Fan <cloud0fan@outlook.com>
Closes#7932 from cloud-fan/generic-getter and squashes the following commits:
c60de4c [Wenchen Fan] do not expose generic getter in internal row
This patch extends UnsafeExternalSorter to support records larger than the page size. The basic strategy is the same as in #7762: store large records in their own overflow pages.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#7891 from JoshRosen/large-records-in-sql-sorter and squashes the following commits:
967580b [Josh Rosen] Merge remote-tracking branch 'origin/master' into large-records-in-sql-sorter
948c344 [Josh Rosen] Add large records tests for KV sorter.
3c17288 [Josh Rosen] Combine memory and disk cleanup into general cleanupResources() method
380f217 [Josh Rosen] Merge remote-tracking branch 'origin/master' into large-records-in-sql-sorter
27eafa0 [Josh Rosen] Fix page size in PackedRecordPointerSuite
a49baef [Josh Rosen] Address initial round of review comments
3edb931 [Josh Rosen] Remove accidentally-committed debug statements.
2b164e2 [Josh Rosen] Support large records in UnsafeExternalSorter.
Author: Wenchen Fan <cloud0fan@outlook.com>
Closes#7890 from cloud-fan/minor and squashes the following commits:
c3b1be3 [Wenchen Fan] fix style
b0cbe2e [Wenchen Fan] remove the createCode and createStructCode, and replace the usage of them by createStructCode
The analysis rule has a bug and we ended up making the sorter still capable of doing evaluation, so lets revert this for now.
Author: Michael Armbrust <michael@databricks.com>
Closes#7906 from marmbrus/revertSortProjection and squashes the following commits:
2da6972 [Michael Armbrust] unrevert unrelated changes
4f2b00c [Michael Armbrust] Revert "[SPARK-9251][SQL] do not order by expressions which still need evaluation"
This is based on #7485 , thanks to NathanHowell
Tests were copied from Hive, but do not seem to be super comprehensive. I've generally replicated Hive's unusual behavior rather than following a JSONPath reference, except for one case (as noted in the comments). I don't know if there is a way of fully replicating Hive's behavior without a slower TreeNode implementation, so I've erred on the side of performance instead.
Author: Davies Liu <davies@databricks.com>
Author: Yin Huai <yhuai@databricks.com>
Author: Nathan Howell <nhowell@godaddy.com>
Closes#7901 from davies/get_json_object and squashes the following commits:
3ace9b9 [Davies Liu] Merge branch 'get_json_object' of github.com:davies/spark into get_json_object
98766fc [Davies Liu] Merge branch 'master' of github.com:apache/spark into get_json_object
a7dc6d0 [Davies Liu] Update JsonExpressionsSuite.scala
c818519 [Yin Huai] new results.
18ce26b [Davies Liu] fix tests
6ac29fb [Yin Huai] Golden files.
25eebef [Davies Liu] use HiveQuerySuite
e0ac6ec [Yin Huai] Golden answer files.
940c060 [Davies Liu] tweat code style
44084c5 [Davies Liu] Merge branch 'master' of github.com:apache/spark into get_json_object
9192d09 [Nathan Howell] Match Hive’s behavior for unwrapping arrays of one element
8dab647 [Nathan Howell] [SPARK-8246] [SQL] Implement get_json_object
This PR is based on #7186 (just fix the conflict), thanks to tarekauel .
find_in_set(string str, string strList): int
Returns the first occurance of str in strList where strList is a comma-delimited string. Returns null if either argument is null. Returns 0 if the first argument contains any commas. For example, find_in_set('ab', 'abc,b,ab,c,def') returns 3.
Only add this to SQL, not DataFrame.
Closes#7186
Author: Tarek Auel <tarek.auel@googlemail.com>
Author: Davies Liu <davies@databricks.com>
Closes#7900 from davies/find_in_set and squashes the following commits:
4334209 [Davies Liu] Merge branch 'master' of github.com:apache/spark into find_in_set
8f00572 [Davies Liu] Merge branch 'master' of github.com:apache/spark into find_in_set
243ede4 [Tarek Auel] [SPARK-8244][SQL] hive compatibility
1aaf64e [Tarek Auel] [SPARK-8244][SQL] unit test fix
e4093a4 [Tarek Auel] [SPARK-8244][SQL] final modifier for COMMA_UTF8
0d05df5 [Tarek Auel] Merge branch 'master' into SPARK-8244
208d710 [Tarek Auel] [SPARK-8244] address comments & bug fix
71b2e69 [Tarek Auel] [SPARK-8244] find_in_set
66c7fda [Tarek Auel] Merge branch 'master' into SPARK-8244
61b8ca2 [Tarek Auel] [SPARK-8224] removed loop and split; use unsafe String comparison
4f75a65 [Tarek Auel] Merge branch 'master' into SPARK-8244
e3b20c8 [Tarek Auel] [SPARK-8244] added type check
1c2bbb7 [Tarek Auel] [SPARK-8244] findInSet
Author: Reynold Xin <rxin@databricks.com>
Closes#7897 from rxin/calculateBitSetWidthInBytes and squashes the following commits:
2e73b3a [Reynold Xin] [SQL][minor] Simplify UnsafeRow.calculateBitSetWidthInBytes.
The issue was that the tokenizer was parsing "1one" into the numeric 1 using the code on line 110. I added another case to accept strings that start with a number and then have a letter somewhere else in it as well.
Author: Joseph Batchik <joseph.batchik@cloudera.com>
Closes#7844 from JDrit/parse_error and squashes the following commits:
b8ca12f [Joseph Batchik] fixed parsing issue by adding another case
Currently, when copy the bitsets, we didn't consider that the row1 may not sit in the beginning of byte array.
cc rxin
Author: Davies Liu <davies@databricks.com>
Closes#7892 from davies/clean_join and squashes the following commits:
14cce9e [Davies Liu] cleanup generated UnsafeRowJoiner and fix bug
Author: Wenchen Fan <cloud0fan@outlook.com>
Closes#7885 from cloud-fan/cheap-copy and squashes the following commits:
0900ca1 [Wenchen Fan] replace == with ===
73f4ada [Wenchen Fan] add tests
07b865a [Wenchen Fan] add a cheap version of copy
This PR adds a base aggregation iterator `AggregationIterator`, which is used to create `SortBasedAggregationIterator` (for sort-based aggregation) and `UnsafeHybridAggregationIterator` (first it tries hash-based aggregation and falls back to the sort-based aggregation (using external sorter) if we cannot allocate memory for the map). With these two iterators, we will not need existing iterators and I am removing those. Also, we can use a single physical `Aggregate` operator and it internally determines what iterators to used.
https://issues.apache.org/jira/browse/SPARK-9240
Author: Yin Huai <yhuai@databricks.com>
Closes#7813 from yhuai/AggregateOperator and squashes the following commits:
e317e2b [Yin Huai] Remove unnecessary change.
74d93c5 [Yin Huai] Merge remote-tracking branch 'upstream/master' into AggregateOperator
ba6afbc [Yin Huai] Add a little bit more comments.
c9cf3b6 [Yin Huai] update
0f1b06f [Yin Huai] Remove unnecessary code.
21fd15f [Yin Huai] Remove unnecessary change.
964f88b [Yin Huai] Implement fallback strategy.
b1ea5cf [Yin Huai] wip
7fcbd87 [Yin Huai] Add a flag to control what iterator to use.
533d5b2 [Yin Huai] Prepare for fallback!
33b7022 [Yin Huai] wip
bd9282b [Yin Huai] UDAFs now supports UnsafeRow.
f52ee53 [Yin Huai] wip
3171f44 [Yin Huai] wip
d2c45a0 [Yin Huai] wip
f60cc83 [Yin Huai] Also check input schema.
af32210 [Yin Huai] Check iter.hasNext before we create an iterator because the constructor of the iterato will read at least one row from a non-empty input iter.
299008c [Yin Huai] First round cleanup.
3915bac [Yin Huai] Create a base iterator class for aggregation iterators and add the initial version of the hybrid iterator.
JIRA: https://issues.apache.org/jira/browse/SPARK-9549
This PR fix the following bugs:
1. `UnaryMinus`'s codegen version would fail to compile when the input is `Long.MinValue`
2. `BinaryComparison` would fail to compile in codegen mode when comparing Boolean types.
3. `AddMonth` would fail if passed a huge negative month, which would lead accessing negative index of `monthDays` array.
4. `Nanvl` with different type operands.
Author: Yijie Shen <henry.yijieshen@gmail.com>
Closes#7882 from yjshen/minor_bug_fix and squashes the following commits:
41bbd2c [Yijie Shen] fix bug in Nanvl type coercion
3dee204 [Yijie Shen] address comments
4fa5de0 [Yijie Shen] fix bugs in expressions
This PR adds a UnsafeArrayData, current we encode it in this way:
first 4 bytes is the # elements
then each 4 byte is the start offset of the element, unless it is negative, in which case the element is null.
followed by the elements themselves
an example: [10, 11, 12, 13, null, 14] will be encoded as:
5, 28, 32, 36, 40, -44, 44, 10, 11, 12, 13, 14
Note that, when we read a UnsafeArrayData from bytes, we can read the first 4 bytes as numElements and take the rest(first 4 bytes skipped) as value region.
unsafe map data just use 2 unsafe array data, first 4 bytes is # of elements, second 4 bytes is numBytes of key array, the follows key array data and value array data.
Author: Wenchen Fan <cloud0fan@outlook.com>
Closes#7752 from cloud-fan/unsafe-array and squashes the following commits:
3269bd7 [Wenchen Fan] fix a bug
6445289 [Wenchen Fan] add unit tests
49adf26 [Wenchen Fan] add unsafe map
20d1039 [Wenchen Fan] add comments and unsafe converter
821b8db [Wenchen Fan] add unsafe array
This PR adds an optimization rule, `FilterNullsInJoinKey`, to add `Filter` before join operators to filter out rows having null values for join keys.
This optimization is guarded by a new SQL conf, `spark.sql.advancedOptimization`.
The code in this PR was authored by yhuai; I'm opening this PR to factor out this change from #7685, a larger pull request which contains two other optimizations.
Author: Yin Huai <yhuai@databricks.com>
Author: Josh Rosen <joshrosen@databricks.com>
Closes#7768 from JoshRosen/filter-nulls-in-join-key and squashes the following commits:
c02fc3f [Yin Huai] Address Josh's comments.
0a8e096 [Yin Huai] Update comments.
ea7d5a6 [Yin Huai] Make sure we do not keep adding filters.
be88760 [Yin Huai] Make it clear that FilterNullsInJoinKeySuite.scala is used to test FilterNullsInJoinKey.
8bb39ad [Yin Huai] Fix non-deterministic tests.
303236b [Josh Rosen] Revert changes that are unrelated to null join key filtering
40eeece [Josh Rosen] Merge remote-tracking branch 'origin/master' into filter-nulls-in-join-key
c57a954 [Yin Huai] Bug fix.
d3d2e64 [Yin Huai] First round of cleanup.
f9516b0 [Yin Huai] Style
c6667e7 [Yin Huai] Add PartitioningCollection.
e616d3b [Yin Huai] wip
7c2d2d8 [Yin Huai] Bug fix and refactoring.
69bb072 [Yin Huai] Introduce NullSafeHashPartitioning and NullUnsafePartitioning.
d5b84c3 [Yin Huai] Do not add unnessary filters.
2201129 [Yin Huai] Filter out rows that will not be joined in equal joins early.
This PR adds `PartitioningCollection`, which is used to represent the `outputPartitioning` for SparkPlans with multiple children (e.g. `ShuffledHashJoin`). So, a `SparkPlan` can have multiple descriptions of its partitioning schemes. Taking `ShuffledHashJoin` as an example, it has two descriptions of its partitioning schemes, i.e. `left.outputPartitioning` and `right.outputPartitioning`. So when we have a query like `select * from t1 join t2 on (t1.x = t2.x) join t3 on (t2.x = t3.x)` will only have three Exchange operators (when shuffled joins are needed) instead of four.
The code in this PR was authored by yhuai; I'm opening this PR to factor out this change from #7685, a larger pull request which contains two other optimizations.
<!-- Reviewable:start -->
[<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/7773)
<!-- Reviewable:end -->
Author: Yin Huai <yhuai@databricks.com>
Author: Josh Rosen <joshrosen@databricks.com>
Closes#7773 from JoshRosen/multi-way-join-planning-improvements and squashes the following commits:
5c45924 [Josh Rosen] Merge remote-tracking branch 'origin/master' into multi-way-join-planning-improvements
cd8269b [Josh Rosen] Refactor test to use SQLTestUtils
2963857 [Yin Huai] Revert unnecessary SqlConf change.
73913f7 [Yin Huai] Add comments and test. Also, revert the change in ShuffledHashOuterJoin for now.
4a99204 [Josh Rosen] Delete unrelated expression change
884ab95 [Josh Rosen] Carve out only SPARK-2205 changes.
247e5fa [Josh Rosen] Merge remote-tracking branch 'origin/master' into multi-way-join-planning-improvements
c57a954 [Yin Huai] Bug fix.
d3d2e64 [Yin Huai] First round of cleanup.
f9516b0 [Yin Huai] Style
c6667e7 [Yin Huai] Add PartitioningCollection.
e616d3b [Yin Huai] wip
7c2d2d8 [Yin Huai] Bug fix and refactoring.
69bb072 [Yin Huai] Introduce NullSafeHashPartitioning and NullUnsafePartitioning.
d5b84c3 [Yin Huai] Do not add unnessary filters.
2201129 [Yin Huai] Filter out rows that will not be joined in equal joins early.
This pull request creates two isOrderable functions in RowOrdering that can be used to check whether a data type or a sequence of expressions can be used in sorting.
Author: Reynold Xin <rxin@databricks.com>
Closes#7880 from rxin/SPARK-9546 and squashes the following commits:
f9e322d [Reynold Xin] Fixed tests.
0439b43 [Reynold Xin] [SPARK-9546][SQL] Centralize orderable data type checking.
This pull request adds a destructAndCreateExternalSorter method to UnsafeFixedWidthAggregationMap. The new method does the following:
1. Creates a new external sorter UnsafeKVExternalSorter
2. Adds all the data into an in-memory sorter, sorts them
3. Spills the sorted in-memory data to disk
This method can be used to fallback to sort-based aggregation when under memory pressure.
The pull request also includes accounting fixes from JoshRosen.
TODOs (that can be done in follow-up PRs)
- [x] Address Josh's feedbacks from #7849
- [x] More documentation and test cases
- [x] Make sure we are doing memory accounting correctly with test cases (e.g. did we release the memory in BytesToBytesMap twice?)
- [ ] Look harder at possible memory leaks and exception handling
- [ ] Randomized tester for the KV sorter as well as the aggregation map
Author: Reynold Xin <rxin@databricks.com>
Author: Josh Rosen <joshrosen@databricks.com>
Closes#7860 from rxin/kvsorter and squashes the following commits:
986a58c [Reynold Xin] Bug fix.
599317c [Reynold Xin] Style fix and slightly more compact code.
fe7bd4e [Reynold Xin] Bug fixes.
fd71bef [Reynold Xin] Merge remote-tracking branch 'josh/large-records-in-sql-sorter' into kvsorter-with-josh-fix
3efae38 [Reynold Xin] More fixes and documentation.
45f1b09 [Josh Rosen] Ensure that spill files are cleaned up
f6a9bd3 [Reynold Xin] Josh feedback.
9be8139 [Reynold Xin] Remove testSpillFrequency.
7cbe759 [Reynold Xin] [SPARK-9531][SQL] UnsafeFixedWidthAggregationMap.destructAndCreateExternalSorter.
ae4a8af [Josh Rosen] Detect leaked unsafe memory in UnsafeExternalSorterSuite.
52f9b06 [Josh Rosen] Detect ShuffleMemoryManager leaks in UnsafeExternalSorter.
Generate prefix for DecimalType, fix the random generator of decimal
cc JoshRosen
Author: Davies Liu <davies@databricks.com>
Closes#7857 from davies/sort_decimal and squashes the following commits:
2433959 [Davies Liu] Merge branch 'master' of github.com:apache/spark into sort_decimal
de24253 [Davies Liu] fix style
0a54c1a [Davies Liu] sort decimal
When accessing a column in UnsafeRow, it's good to avoid the copy, then we should do deep copy when turn the UnsafeRow into generic Row, this PR brings generated FromUnsafeProjection to do that.
This PR also fix the expressions that cache the UTF8String, which should also copy it.
Author: Davies Liu <davies@databricks.com>
Closes#7840 from davies/avoid_copy and squashes the following commits:
230c8a1 [Davies Liu] address comment
fd797c9 [Davies Liu] Merge branch 'master' of github.com:apache/spark into avoid_copy
e095dd0 [Davies Liu] rollback rename
8ef5b0b [Davies Liu] copy String in Columnar
81360b8 [Davies Liu] fix class name
9aecb88 [Davies Liu] use FromUnsafeProjection to do deep copy for UTF8String and struct
This PR is based on #7643 , thanks to adrian-wang
Author: Davies Liu <davies@databricks.com>
Author: Daoyuan Wang <daoyuan.wang@intel.com>
Closes#7847 from davies/datediff and squashes the following commits:
74333d7 [Davies Liu] fix bug
22d8a8c [Davies Liu] optimize
85cdd21 [Davies Liu] remove unnecessary tests
241d90c [Davies Liu] Merge branch 'master' of github.com:apache/spark into datediff
e9dc0f5 [Davies Liu] fix datediff/to_utc_timestamp/from_utc_timestamp
c360447 [Daoyuan Wang] function datediff, to_utc_timestamp, from_utc_timestamp (commits merged)
This PR is based on #7208 , thanks to HuJiayin
Closes#7208
Author: HuJiayin <jiayin.hu@intel.com>
Author: Davies Liu <davies@databricks.com>
Closes#7850 from davies/initcap and squashes the following commits:
54472e9 [Davies Liu] fix python test
17ffe51 [Davies Liu] Merge branch 'master' of github.com:apache/spark into initcap
ca46390 [Davies Liu] Merge branch 'master' of github.com:apache/spark into initcap
3a906e4 [Davies Liu] implement title case in UTF8String
8b2506a [HuJiayin] Update functions.py
2cd43e5 [HuJiayin] fix python style check
b616c0e [HuJiayin] add python api
1f5a0ef [HuJiayin] add codegen
7e0c604 [HuJiayin] Merge branch 'master' of https://github.com/apache/spark into initcap
6a0b958 [HuJiayin] add column
c79482d [HuJiayin] support soundex
7ce416b [HuJiayin] support initcap rebase code
cc rxin
Author: Davies Liu <davies@databricks.com>
Closes#7856 from davies/sort_improve and squashes the following commits:
5fc81bd [Davies Liu] support DateType/TimestampType