Delays application of ResolvePivot until all aggregates are resolved to prevent problems with UnresolvedFunction and adds unit test
Author: Andrew Ray <ray.andrew@gmail.com>
Closes#10202 from aray/sql-pivot-unresolved-function.
This PR is to add three more data types into Encoder, including `BigDecimal`, `Date` and `Timestamp`.
marmbrus cloud-fan rxin Could you take a quick look at these three types? Not sure if it can be merged to 1.6. Thank you very much!
Author: gatorsmile <gatorsmile@gmail.com>
Closes#10188 from gatorsmile/dataTypesinEncoder.
checked with hive, greatest/least should cast their children to a tightest common type,
i.e. `(int, long) => long`, `(int, string) => error`, `(decimal(10,5), decimal(5, 10)) => error`
Author: Wenchen Fan <wenchen@databricks.com>
Closes#10196 from cloud-fan/type-coercion.
Currently, the order of joins is exactly the same as SQL query, some conditions may not pushed down to the correct join, then those join will become cross product and is extremely slow.
This patch try to re-order the inner joins (which are common in SQL query), pick the joins that have self-contain conditions first, delay those that does not have conditions.
After this patch, the TPCDS query Q64/65 can run hundreds times faster.
cc marmbrus nongli
Author: Davies Liu <davies@databricks.com>
Closes#10073 from davies/reorder_joins.
When \u appears in a comment block (i.e. in /**/), code gen will break. So, in Expression and CodegenFallback, we escape \u to \\u.
yhuai Please review it. I did reproduce it and it works after the fix. Thanks!
Author: gatorsmile <gatorsmile@gmail.com>
Closes#10155 from gatorsmile/escapeU.
This replaces https://github.com/apache/spark/pull/9696
Invoke Checkstyle and print any errors to the console, failing the step.
Use Google's style rules modified according to
https://cwiki.apache.org/confluence/display/SPARK/Spark+Code+Style+Guide
Some important checks are disabled (see TODOs in `checkstyle.xml`) due to
multiple violations being present in the codebase.
Suggest fixing those TODOs in a separate PR(s).
More on Checkstyle can be found on the [official website](http://checkstyle.sourceforge.net/).
Sample output (from [build 46345](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/46345/consoleFull)) (duplicated because I run the build twice with different profiles):
> Checkstyle checks failed at following occurrences:
[ERROR] src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java:[217,7] (coding) MissingSwitchDefault: switch without "default" clause.
> [ERROR] src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java:[198,10] (modifier) ModifierOrder: 'protected' modifier out of order with the JLS suggestions.
> [ERROR] src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java:[217,7] (coding) MissingSwitchDefault: switch without "default" clause.
> [ERROR] src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java:[198,10] (modifier) ModifierOrder: 'protected' modifier out of order with the JLS suggestions.
> [error] running /home/jenkins/workspace/SparkPullRequestBuilder2/dev/lint-java ; received return code 1
Also fix some of the minor violations that didn't require sweeping changes.
Apologies for the previous botched PRs - I finally figured out the issue.
cr: JoshRosen, pwendell
> I state that the contribution is my original work, and I license the work to the project under the project's open source license.
Author: Dmitry Erastov <derastov@gmail.com>
Closes#9867 from dskrvk/master.
When examining plans of complex queries with multiple joins, a pain point of mine is that, it's hard to immediately see the sibling node of a specific query plan node. This PR adds tree lines for the tree string of a `TreeNode`, so that the result can be visually more intuitive.
Author: Cheng Lian <lian@databricks.com>
Closes#10099 from liancheng/prettier-tree-string.
Following up #10038.
We can use bitmasks to determine which grouping expressions need to be set as nullable.
cc yhuai
Author: Liang-Chi Hsieh <viirya@appier.com>
Closes#10067 from viirya/fix-cube-following.
create java version of `constructorFor` and `extractorFor` in `JavaTypeInference`
Author: Wenchen Fan <wenchen@databricks.com>
This patch had conflicts when merged, resolved by
Committer: Michael Armbrust <michael@databricks.com>
Closes#9937 from cloud-fan/pojo.
When we build the `fromRowExpression` for an encoder, we set up a lot of "unresolved" stuff and lost the required data type, which may lead to runtime error if the real type doesn't match the encoder's schema.
For example, we build an encoder for `case class Data(a: Int, b: String)` and the real type is `[a: int, b: long]`, then we will hit runtime error and say that we can't construct class `Data` with int and long, because we lost the information that `b` should be a string.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#9840 from cloud-fan/err-msg.
JIRA: https://issues.apache.org/jira/browse/SPARK-11949
The result of cube plan uses incorrect schema. The schema of cube result should set nullable property to true because the grouping expressions will have null values.
Author: Liang-Chi Hsieh <viirya@appier.com>
Closes#10038 from viirya/fix-cube.
JIRA: https://issues.apache.org/jira/browse/SPARK-12018
The code of common subexpression elimination can be factored and simplified. Some unnecessary variables can be removed.
Author: Liang-Chi Hsieh <viirya@appier.com>
Closes#10009 from viirya/refactor-subexpr-eliminate.
In https://github.com/apache/spark/pull/9409 we enabled multi-column counting. The approach taken in that PR introduces a bit of overhead by first creating a row only to check if all of the columns are non-null.
This PR fixes that technical debt. Count now takes multiple columns as its input. In order to make this work I have also added support for multiple columns in the single distinct code path.
cc yhuai
Author: Herman van Hovell <hvanhovell@questtec.nl>
Closes#10015 from hvanhovell/SPARK-12024.
When calling `get_json_object` for the following two cases, both results are `"null"`:
```scala
val tuple: Seq[(String, String)] = ("5", """{"f1": null}""") :: Nil
val df: DataFrame = tuple.toDF("key", "jstring")
val res = df.select(functions.get_json_object($"jstring", "$.f1")).collect()
```
```scala
val tuple2: Seq[(String, String)] = ("5", """{"f1": "null"}""") :: Nil
val df2: DataFrame = tuple2.toDF("key", "jstring")
val res3 = df2.select(functions.get_json_object($"jstring", "$.f1")).collect()
```
Fixed the problem and also added a test case.
Author: gatorsmile <gatorsmile@gmail.com>
Closes#10018 from gatorsmile/get_json_object.
This is a followup for https://github.com/apache/spark/pull/9959.
I added more documentation and rewrote some monadic code into simpler ifs.
Author: Reynold Xin <rxin@databricks.com>
Closes#9995 from rxin/SPARK-11973.
this is based on https://github.com/apache/spark/pull/9844, with some bug fix and clean up.
The problems is that, normal operator should be resolved based on its child, but `Sort` operator can also be resolved based on its grandchild. So we have 3 rules that can resolve `Sort`: `ResolveReferences`, `ResolveSortReferences`(if grandchild is `Project`) and `ResolveAggregateFunctions`(if grandchild is `Aggregate`).
For example, `select c1 as a , c2 as b from tab group by c1, c2 order by a, c2`, we need to resolve `a` and `c2` for `Sort`. Firstly `a` will be resolved in `ResolveReferences` based on its child, and when we reach `ResolveAggregateFunctions`, we will try to resolve both `a` and `c2` based on its grandchild, but failed because `a` is not a legal aggregate expression.
whoever merge this PR, please give the credit to dilipbiswal
Author: Dilip Biswal <dbiswal@us.ibm.com>
Author: Wenchen Fan <wenchen@databricks.com>
Closes#9961 from cloud-fan/sort.
Currently, filter can't be pushed through aggregation with alias or literals, this patch fix that.
After this patch, the time of TPC-DS query 4 go down to 13 seconds from 141 seconds (10x improvements).
cc nongli yhuai
Author: Davies Liu <davies@databricks.com>
Closes#9959 from davies/push_filter2.
Right now, the expended start will include the name of expression as prefix for column, that's not better than without expending, we should not have the prefix.
Author: Davies Liu <davies@databricks.com>
Closes#9984 from davies/expand_star.
Currently pivot's signature looks like
```scala
scala.annotation.varargs
def pivot(pivotColumn: Column, values: Column*): GroupedData
scala.annotation.varargs
def pivot(pivotColumn: String, values: Any*): GroupedData
```
I think we can remove the one that takes "Column" types, since callers should always be passing in literals. It'd also be more clear if the values are not varargs, but rather Seq or java.util.List.
I also made similar changes for Python.
Author: Reynold Xin <rxin@databricks.com>
Closes#9929 from rxin/SPARK-11946.
we should pass in resolved encodera to logical `CoGroup` and bind them in physical `CoGroup`
Author: Wenchen Fan <wenchen@databricks.com>
Closes#9928 from cloud-fan/cogroup.
We should use `InternalRow.isNullAt` to check if the field is null before calling `InternalRow.getXXX`
Thanks gatorsmile who discovered this bug.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#9904 from cloud-fan/null.
Can someone review my code to make sure I'm not missing anything? Thanks!
Author: Xiu Guo <xguo27@gmail.com>
Author: Xiu Guo <guoxi@us.ibm.com>
Closes#9612 from xguo27/SPARK-11628.
1. Renamed map to mapGroup, flatMap to flatMapGroup.
2. Renamed asKey -> keyAs.
3. Added more documentation.
4. Changed type parameter T to V on GroupedDataset.
5. Added since versions for all functions.
Author: Reynold Xin <rxin@databricks.com>
Closes#9880 from rxin/SPARK-11899.
seems scala 2.11 doesn't support: define private methods in `trait xxx` and use it in `object xxx extend xxx`.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#9879 from cloud-fan/follow.
This mainly moves SqlNewHadoopRDD to the sql package. There is some state that is
shared between core and I've left that in core. This allows some other associated
minor cleanup.
Author: Nong Li <nong@databricks.com>
Closes#9845 from nongli/spark-11787.
#theScaryParts (i.e. changes to the repl, executor classloaders and codegen)...
Author: Michael Armbrust <michael@databricks.com>
Author: Yin Huai <yhuai@databricks.com>
Closes#9825 from marmbrus/dataset-replClasses2.
Hive has since changed this behavior as well. https://issues.apache.org/jira/browse/HIVE-3454
Author: Nong Li <nong@databricks.com>
Author: Nong Li <nongli@gmail.com>
Author: Yin Huai <yhuai@databricks.com>
Closes#9685 from nongli/spark-11724.
before this PR, when users try to get an encoder for an un-supported class, they will only get a very simple error message like `Encoder for type xxx is not supported`.
After this PR, the error message become more friendly, for example:
```
No Encoder found for abc.xyz.NonEncodable
- array element class: "abc.xyz.NonEncodable"
- field (class: "scala.Array", name: "arrayField")
- root class: "abc.xyz.AnotherClass"
```
Author: Wenchen Fan <wenchen@databricks.com>
Closes#9810 from cloud-fan/error-message.
JIRA: https://issues.apache.org/jira/browse/SPARK-11817
Instead of return None, we should truncate the fractional seconds to prevent inserting NULL.
Author: Liang-Chi Hsieh <viirya@appier.com>
Closes#9834 from viirya/truncate-fractional-sec.
This PR has the following optimization:
1) The greatest/least already does the null-check, so the `If` and `IsNull` are not necessary.
2) In greatest/least, it should initialize the result using the first child (removing one block).
3) For primitive types, the generated greater expression is too complicated (`a > b ? 1 : (a < b) ? -1 : 0) > 0`), should be as simple as `a > b`
Combine these optimization, this could improve the performance of `ss_max` query by 30%.
Author: Davies Liu <davies@databricks.com>
Closes#9846 from davies/improve_max.
Fixes bug with grouping sets (including cube/rollup) where aggregates that included grouping expressions would return the wrong (null) result.
Also simplifies the analyzer rule a bit and leaves column pruning to the optimizer.
Added multiple unit tests to DataFrameAggregateSuite and verified it passes hive compatibility suite:
```
build/sbt -Phive -Dspark.hive.whitelist='groupby.*_grouping.*' 'test-only org.apache.spark.sql.hive.execution.HiveCompatibilitySuite'
```
This is an alternative to pr https://github.com/apache/spark/pull/9419 but I think its better as it simplifies the analyzer rule instead of adding another special case to it.
Author: Andrew Ray <ray.andrew@gmail.com>
Closes#9815 from aray/groupingset-agg-fix.
After some experiment, I found it's not convenient to have separate encoder builders: `FlatEncoder` and `ProductEncoder`. For example, when create encoders for `ScalaUDF`, we have no idea if the type `T` is flat or not. So I revert the splitting change in https://github.com/apache/spark/pull/9693, while still keeping the bug fixes and tests.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#9726 from cloud-fan/follow.
The impact of this change is for a query that has a single distinct column and does not have any grouping expression like
`SELECT COUNT(DISTINCT a) FROM table`
The plan will be changed from
```
AGG-2 (count distinct)
Shuffle to a single reducer
Partial-AGG-2 (count distinct)
AGG-1 (grouping on a)
Shuffle by a
Partial-AGG-1 (grouping on 1)
```
to the following one (1.5 uses this)
```
AGG-2
AGG-1 (grouping on a)
Shuffle to a single reducer
Partial-AGG-1(grouping on a)
```
The first plan is more robust. However, to better benchmark the impact of this change, we should use 1.5's plan and use the conf of `spark.sql.specializeSingleDistinctAggPlanning` to control the plan.
Author: Yin Huai <yhuai@databricks.com>
Closes#9828 from yhuai/distinctRewriter.
We currently rely on the optimizer's constant folding to replace current_timestamp and current_date. However, this can still result in different values for different instances of current_timestamp/current_date if the optimizer is not running fast enough.
A better solution is to replace these functions in the analyzer in one shot.
Author: Reynold Xin <rxin@databricks.com>
Closes#9833 from rxin/SPARK-11849.
This patch adds an alternate to the Parquet RecordReader from the parquet-mr project
that is much faster for flat schemas. Instead of using the general converter mechanism
from parquet-mr, this directly uses the lower level APIs from parquet-columnar and a
customer RecordReader that directly assembles into UnsafeRows.
This is optionally disabled and only used for supported schemas.
Using the tpcds store sales table and doing a sum of increasingly more columns, the results
are:
For 1 Column:
Before: 11.3M rows/second
After: 18.2M rows/second
For 2 Columns:
Before: 7.2M rows/second
After: 11.2M rows/second
For 5 Columns:
Before: 2.9M rows/second
After: 4.5M rows/second
Author: Nong Li <nong@databricks.com>
Closes#9774 from nongli/parquet.
Also added some nicer error messages for incompatible types (private types and primitive types) for Kryo/Java encoder.
Author: Reynold Xin <rxin@databricks.com>
Closes#9823 from rxin/SPARK-11833.
Before this PR there were two things that would blow up if you called `df.as[MyClass]` if `MyClass` was defined in the REPL:
- [x] Because `classForName` doesn't work on the munged names returned by `tpe.erasure.typeSymbol.asClass.fullName`
- [x] Because we don't have anything to pass into the constructor for the `$outer` pointer.
Note that this PR is just adding the infrastructure for working with inner classes in encoder and is not yet sufficient to make them work in the REPL. Currently, the implementation show in 95cec7d413 is causing a bug that breaks code gen due to some interaction between janino and the `ExecutorClassLoader`. This will be addressed in a follow-up PR.
Author: Michael Armbrust <michael@databricks.com>
Closes#9602 from marmbrus/dataset-replClasses.
This patch refactors the existing Kryo encoder expressions and adds support for Java serialization.
Author: Reynold Xin <rxin@databricks.com>
Closes#9802 from rxin/SPARK-11810.
return Double.NaN for mean/average when count == 0 for all numeric types that is converted to Double, Decimal type continue to return null.
Author: JihongMa <linlin200605@gmail.com>
Closes#9705 from JihongMA/SPARK-11720.
If user use primitive parameters in UDF, there is no way for him to do the null-check for primitive inputs, so we are assuming the primitive input is null-propagatable for this case and return null if the input is null.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#9770 from cloud-fan/udf.
I also found a bug with self-joins returning incorrect results in the Dataset API. Two test cases attached and filed SPARK-11803.
Author: Reynold Xin <rxin@databricks.com>
Closes#9789 from rxin/SPARK-11802.
Based on the comment of cloud-fan in https://github.com/apache/spark/pull/9216, update the AttributeReference's hashCode function by including the hashCode of the other attributes including name, nullable and qualifiers.
Here, I am not 100% sure if we should include name in the hashCode calculation, since the original hashCode calculation does not include it.
marmbrus cloud-fan Please review if the changes are good.
Author: gatorsmile <gatorsmile@gmail.com>
Closes#9761 from gatorsmile/hashCodeNamedExpression.
In the previous method, fields.toArray will cast java.util.List[StructField] into Array[Object] which can not cast into Array[StructField], thus when invoking this method will throw "java.lang.ClassCastException: [Ljava.lang.Object; cannot be cast to [Lorg.apache.spark.sql.types.StructField;"
I directly cast java.util.List[StructField] into Array[StructField] in this patch.
Author: mayuanwen <mayuanwen@qiyi.com>
Closes#9649 from jackieMaKing/Spark-11679.
During executing PromoteStrings rule, if one side of binaryComparison is StringType and the other side is not StringType, the current code will promote(cast) the StringType to DoubleType, and if the StringType doesn't contain the numbers, it will get null value. So if it is doing <=> (NULL-safe equal) with Null, it will not filter anything, caused the problem reported by this jira.
I proposal to the changes through this PR, can you review my code changes ?
This problem only happen for <=>, other operators works fine.
scala> val filteredDF = df.filter(df("column") > (new Column(Literal(null))))
filteredDF: org.apache.spark.sql.DataFrame = [column: string]
scala> filteredDF.show
+------+
|column|
+------+
+------+
scala> val filteredDF = df.filter(df("column") === (new Column(Literal(null))))
filteredDF: org.apache.spark.sql.DataFrame = [column: string]
scala> filteredDF.show
+------+
|column|
+------+
+------+
scala> df.registerTempTable("DF")
scala> sqlContext.sql("select * from DF where 'column' = NULL")
res27: org.apache.spark.sql.DataFrame = [column: string]
scala> res27.show
+------+
|column|
+------+
+------+
Author: Kevin Yu <qyu@us.ibm.com>
Closes#9720 from kevinyu98/working_on_spark-11447.
This patch adds an alias for current_timestamp (now function).
Also fixes SPARK-9196 to re-enable the test case for current_timestamp.
Author: Reynold Xin <rxin@databricks.com>
Closes#9753 from rxin/SPARK-11768.
This fix is to change the equals method to check all of the specified fields for equality of AttributeReference.
Author: gatorsmile <gatorsmile@gmail.com>
Closes#9216 from gatorsmile/namedExpressEqual.
Invocation of getters for type extending AnyVal returns default value (if field value is null) instead of throwing NPE. Please check comments for SPARK-11553 issue for more details.
Author: Bartlomiej Alberski <bartlomiej.alberski@allegrogroup.com>
Closes#9642 from alberskib/bugfix/SPARK-11553.
These 2 are very similar, we can consolidate them into one.
Also add tests for it and fix a bug.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#9729 from cloud-fan/tuple.
JIRA: https://issues.apache.org/jira/browse/SPARK-11743
RowEncoder doesn't support UserDefinedType now. We should add the support for it.
Author: Liang-Chi Hsieh <viirya@appier.com>
Closes#9712 from viirya/rowencoder-udt.
code snippet to reproduce it:
```
TimeZone.setDefault(TimeZone.getTimeZone("Asia/Shanghai"))
val t = Timestamp.valueOf("1900-06-11 12:14:50.789")
val us = fromJavaTimestamp(t)
assert(getSeconds(us) === t.getSeconds)
```
it will be good to add a regression test for it, but the reproducing code need to change the default timezone, and even we change it back, the `lazy val defaultTimeZone` in `DataTimeUtils` is fixed.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#9728 from cloud-fan/seconds.
also add more tests for encoders, and fix bugs that I found:
* when convert array to catalyst array, we can only skip element conversion for native types(e.g. int, long, boolean), not `AtomicType`(String is AtomicType but we need to convert it)
* we should also handle scala `BigDecimal` when convert from catalyst `Decimal`.
* complex map type should be supported
other issues that still in investigation:
* encode java `BigDecimal` and decode it back, seems we will loss precision info.
* when encode case class that defined inside a object, `ClassNotFound` exception will be thrown.
I'll remove unused code in a follow-up PR.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#9693 from cloud-fan/split.
* rename `AppendColumn` to `AppendColumns` to be consistent with the physical plan name.
* clean up stale comments.
* always pass in resolved encoder to `TypedColumn.withInputType`(test added)
* enable a mistakenly disabled java test.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#9688 from cloud-fan/follow.
This PR adds a new method, `reduce`, to `GroupedDataset`, which allows similar operations to `reduceByKey` on a traditional `PairRDD`.
```scala
val ds = Seq("abc", "xyz", "hello").toDS()
ds.groupBy(_.length).reduce(_ + _).collect() // not actually commutative :P
res0: Array(3 -> "abcxyz", 5 -> "hello")
```
While implementing this method and its test cases several more deficiencies were found in our encoder handling. Specifically, in order to support positional resolution, named resolution and tuple composition, it is important to keep the unresolved encoder around and to use it when constructing new `Datasets` with the same object type but different output attributes. We now divide the encoder lifecycle into three phases (that mirror the lifecycle of standard expressions) and have checks at various boundaries:
- Unresoved Encoders: all users facing encoders (those constructed by implicits, static methods, or tuple composition) are unresolved, meaning they have only `UnresolvedAttributes` for named fields and `BoundReferences` for fields accessed by ordinal.
- Resolved Encoders: internal to a `[Grouped]Dataset` the encoder is resolved, meaning all input has been resolved to a specific `AttributeReference`. Any encoders that are placed into a logical plan for use in object construction should be resolved.
- BoundEncoder: Are constructed by physical plans, right before actual conversion from row -> object is performed.
It is left to future work to add explicit checks for resolution and provide good error messages when it fails. We might also consider enforcing the above constraints in the type system (i.e. `fromRow` only exists on a `ResolvedEncoder`), but we should probably wait before spending too much time on this.
Author: Michael Armbrust <michael@databricks.com>
Author: Wenchen Fan <wenchen@databricks.com>
Closes#9673 from marmbrus/pr/9628.
switched stddev support from DeclarativeAggregate to ImperativeAggregate.
Author: JihongMa <linlin200605@gmail.com>
Closes#9380 from JihongMA/SPARK-11420.
`to_unix_timestamp` is the deterministic version of `unix_timestamp`, as it accepts at least one parameters.
Since the behavior here is quite similar to `unix_timestamp`, I think the dataframe API is not necessary here.
Author: Daoyuan Wang <daoyuan.wang@intel.com>
Closes#9347 from adrian-wang/to_unix_timestamp.
This adds a pivot method to the dataframe api.
Following the lead of cube and rollup this adds a Pivot operator that is translated into an Aggregate by the analyzer.
Currently the syntax is like:
~~courseSales.pivot(Seq($"year"), $"course", Seq("dotNET", "Java"), sum($"earnings"))~~
~~Would we be interested in the following syntax also/alternatively? and~~
courseSales.groupBy($"year").pivot($"course", "dotNET", "Java").agg(sum($"earnings"))
//or
courseSales.groupBy($"year").pivot($"course").agg(sum($"earnings"))
Later we can add it to `SQLParser`, but as Hive doesn't support it we cant add it there, right?
~~Also what would be the suggested Java friendly method signature for this?~~
Author: Andrew Ray <ray.andrew@gmail.com>
Closes#7841 from aray/sql-pivot.
We need to support custom classes like java beans and combine them into tuple, and it's very hard to do it with the TypeTag-based approach.
We should keep only the compose-based way to create tuple encoder.
This PR also move `Encoder` to `org.apache.spark.sql`
Author: Wenchen Fan <wenchen@databricks.com>
Closes#9567 from cloud-fan/java.
This PR is a 2nd follow-up for [SPARK-9241](https://issues.apache.org/jira/browse/SPARK-9241). It contains the following improvements:
* Fix for a potential bug in distinct child expression and attribute alignment.
* Improved handling of duplicate distinct child expressions.
* Added test for distinct UDAF with multiple children.
cc yhuai
Author: Herman van Hovell <hvanhovell@questtec.nl>
Closes#9566 from hvanhovell/SPARK-9241-followup-2.
This patch adds the building blocks for codegening subexpr elimination and implements
it end to end for UnsafeProjection. The building blocks can be used to do the same thing
for other operators.
It introduces some utilities to compute common sub expressions. Expressions can be added to
this data structure. The expr and its children will be recursively matched against existing
expressions (ones previously added) and grouped into common groups. This is built using
the existing `semanticEquals`. It does not understand things like commutative or associative
expressions. This can be done as future work.
After building this data structure, the codegen process takes advantage of it by:
1. Generating a helper function in the generated class that computes the common
subexpression. This is done for all common subexpressions that have at least
two occurrences and the expression tree is sufficiently complex.
2. When generating the apply() function, if the helper function exists, call that
instead of regenerating the expression tree. Repeated calls to the helper function
shortcircuit the evaluation logic.
Author: Nong Li <nong@databricks.com>
Author: Nong Li <nongli@gmail.com>
This patch had conflicts when merged, resolved by
Committer: Michael Armbrust <michael@databricks.com>
Closes#9480 from nongli/spark-10371.
Currently the user facing api for typed aggregation has some limitations:
* the customized typed aggregation must be the first of aggregation list
* the customized typed aggregation can only use long as buffer type
* the customized typed aggregation can only use flat type as result type
This PR tries to remove these limitations.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#9599 from cloud-fan/agg.
https://issues.apache.org/jira/browse/SPARK-9830
This PR contains the following main changes.
* Removing `AggregateExpression1`.
* Removing `Aggregate` operator, which is used to evaluate `AggregateExpression1`.
* Removing planner rule used to plan `Aggregate`.
* Linking `MultipleDistinctRewriter` to analyzer.
* Renaming `AggregateExpression2` to `AggregateExpression` and `AggregateFunction2` to `AggregateFunction`.
* Updating places where we create aggregate expression. The way to create aggregate expressions is `AggregateExpression(aggregateFunction, mode, isDistinct)`.
* Changing `val`s in `DeclarativeAggregate`s that touch children of this function to `lazy val`s (when we create aggregate expression in DataFrame API, children of an aggregate function can be unresolved).
Author: Yin Huai <yhuai@databricks.com>
Closes#9556 from yhuai/removeAgg1.
A few changes:
1. Removed fold, since it can be confusing for distributed collections.
2. Created specific interfaces for each Dataset function (e.g. MapFunction, ReduceFunction, MapPartitionsFunction)
3. Added more documentation and test cases.
The other thing I'm considering doing is to have a "collector" interface for FlatMapFunction and MapPartitionsFunction, similar to MapReduce's map function.
Author: Reynold Xin <rxin@databricks.com>
Closes#9531 from rxin/SPARK-11564.
This PR adds support for multiple column in a single count distinct aggregate to the new aggregation path.
cc yhuai
Author: Herman van Hovell <hvanhovell@questtec.nl>
Closes#9409 from hvanhovell/SPARK-11451.
This PR is a follow up for PR https://github.com/apache/spark/pull/9406. It adds more documentation to the rewriting rule, removes a redundant if expression in the non-distinct aggregation path and adds a multiple distinct test to the AggregationQuerySuite.
cc yhuai marmbrus
Author: Herman van Hovell <hvanhovell@questtec.nl>
Closes#9541 from hvanhovell/SPARK-9241-followup.
The second PR for SPARK-9241, this adds support for multiple distinct columns to the new aggregation code path.
This PR solves the multiple DISTINCT column problem by rewriting these Aggregates into an Expand-Aggregate-Aggregate combination. See the [JIRA ticket](https://issues.apache.org/jira/browse/SPARK-9241) for some information on this. The advantages over the - competing - [first PR](https://github.com/apache/spark/pull/9280) are:
- This can use the faster TungstenAggregate code path.
- It is impossible to OOM due to an ```OpenHashSet``` allocating to much memory. However, this will multiply the number of input rows by the number of distinct clauses (plus one), and puts a lot more memory pressure on the aggregation code path itself.
The location of this Rule is a bit funny, and should probably change when the old aggregation path is changed.
cc yhuai - Could you also tell me where to add tests for this?
Author: Herman van Hovell <hvanhovell@questtec.nl>
Closes#9406 from hvanhovell/SPARK-9241-rewriter.
This PR enables the Expand operator to process and produce Unsafe Rows.
Author: Herman van Hovell <hvanhovell@questtec.nl>
Closes#9414 from hvanhovell/SPARK-11450.
JIRA: https://issues.apache.org/jira/browse/SPARK-9162
Currently ScalaUDF extends CodegenFallback and doesn't provide code generation implementation. This path implements code generation for ScalaUDF.
Author: Liang-Chi Hsieh <viirya@appier.com>
Closes#9270 from viirya/scalaudf-codegen.
A cleanup for https://github.com/apache/spark/pull/9085.
The `DecimalLit` is very similar to `FloatLit`, we can just keep one of them.
Also added low level unit test at `SqlParserSuite`
Author: Wenchen Fan <wenchen@databricks.com>
Closes#9482 from cloud-fan/parser.
This PR adds the ability to do typed SQL aggregations. We will likely also want to provide an interface to allow users to do aggregations on objects, but this is deferred to another PR.
```scala
val ds = Seq(("a", 10), ("a", 20), ("b", 1), ("b", 2), ("c", 1)).toDS()
ds.groupBy(_._1).agg(sum("_2").as[Int]).collect()
res0: Array(("a", 30), ("b", 3), ("c", 1))
```
Author: Michael Armbrust <michael@databricks.com>
Closes#9499 from marmbrus/dataset-agg.
Currently, if the Timestamp is before epoch (1970/01/01), the hours, minutes and seconds will be negative (also rounding up).
Author: Davies Liu <davies@databricks.com>
Closes#9502 from davies/neg_hour.
functions.scala was getting pretty long. I broke it into multiple files.
I also added explicit data types for some public vals, and renamed aggregate function pretty names to lower case, which is more consistent with rest of the functions.
Author: Reynold Xin <rxin@databricks.com>
Closes#9471 from rxin/SPARK-11505.
stddev is an alias for stddev_samp. variance should be consistent with stddev.
Also took the chance to remove internal Stddev and Variance, and only kept StddevSamp/StddevPop and VarianceSamp/VariancePop.
Author: Reynold Xin <rxin@databricks.com>
Closes#9449 from rxin/SPARK-11490.
Right now, SQL's mutable projection updates every value of the mutable project after it evaluates the corresponding expression. This makes the behavior of MutableProjection confusing and complicate the implementation of common aggregate functions like stddev because developers need to be aware that when evaluating {{i+1}}th expression of a mutable projection, {{i}}th slot of the mutable row has already been updated.
This PR make the MutableProjection atomic, by generating all the results of expressions first, then copy them into mutableRow.
Had run a mircro-benchmark, there is no notable performance difference between using class members and local variables.
cc yhuai
Author: Davies Liu <davies@databricks.com>
Closes#9422 from davies/atomic_mutable and squashes the following commits:
bbc1758 [Davies Liu] support wide table
8a0ae14 [Davies Liu] fix bug
bec07da [Davies Liu] refactor
2891628 [Davies Liu] make mutableProjection atomic