Right now, the expended start will include the name of expression as prefix for column, that's not better than without expending, we should not have the prefix.
Author: Davies Liu <davies@databricks.com>
Closes#9984 from davies/expand_star.
Currently pivot's signature looks like
```scala
scala.annotation.varargs
def pivot(pivotColumn: Column, values: Column*): GroupedData
scala.annotation.varargs
def pivot(pivotColumn: String, values: Any*): GroupedData
```
I think we can remove the one that takes "Column" types, since callers should always be passing in literals. It'd also be more clear if the values are not varargs, but rather Seq or java.util.List.
I also made similar changes for Python.
Author: Reynold Xin <rxin@databricks.com>
Closes#9929 from rxin/SPARK-11946.
we should pass in resolved encodera to logical `CoGroup` and bind them in physical `CoGroup`
Author: Wenchen Fan <wenchen@databricks.com>
Closes#9928 from cloud-fan/cogroup.
We should use `InternalRow.isNullAt` to check if the field is null before calling `InternalRow.getXXX`
Thanks gatorsmile who discovered this bug.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#9904 from cloud-fan/null.
Can someone review my code to make sure I'm not missing anything? Thanks!
Author: Xiu Guo <xguo27@gmail.com>
Author: Xiu Guo <guoxi@us.ibm.com>
Closes#9612 from xguo27/SPARK-11628.
1. Renamed map to mapGroup, flatMap to flatMapGroup.
2. Renamed asKey -> keyAs.
3. Added more documentation.
4. Changed type parameter T to V on GroupedDataset.
5. Added since versions for all functions.
Author: Reynold Xin <rxin@databricks.com>
Closes#9880 from rxin/SPARK-11899.
seems scala 2.11 doesn't support: define private methods in `trait xxx` and use it in `object xxx extend xxx`.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#9879 from cloud-fan/follow.
This mainly moves SqlNewHadoopRDD to the sql package. There is some state that is
shared between core and I've left that in core. This allows some other associated
minor cleanup.
Author: Nong Li <nong@databricks.com>
Closes#9845 from nongli/spark-11787.
#theScaryParts (i.e. changes to the repl, executor classloaders and codegen)...
Author: Michael Armbrust <michael@databricks.com>
Author: Yin Huai <yhuai@databricks.com>
Closes#9825 from marmbrus/dataset-replClasses2.
Hive has since changed this behavior as well. https://issues.apache.org/jira/browse/HIVE-3454
Author: Nong Li <nong@databricks.com>
Author: Nong Li <nongli@gmail.com>
Author: Yin Huai <yhuai@databricks.com>
Closes#9685 from nongli/spark-11724.
before this PR, when users try to get an encoder for an un-supported class, they will only get a very simple error message like `Encoder for type xxx is not supported`.
After this PR, the error message become more friendly, for example:
```
No Encoder found for abc.xyz.NonEncodable
- array element class: "abc.xyz.NonEncodable"
- field (class: "scala.Array", name: "arrayField")
- root class: "abc.xyz.AnotherClass"
```
Author: Wenchen Fan <wenchen@databricks.com>
Closes#9810 from cloud-fan/error-message.
JIRA: https://issues.apache.org/jira/browse/SPARK-11817
Instead of return None, we should truncate the fractional seconds to prevent inserting NULL.
Author: Liang-Chi Hsieh <viirya@appier.com>
Closes#9834 from viirya/truncate-fractional-sec.
This PR has the following optimization:
1) The greatest/least already does the null-check, so the `If` and `IsNull` are not necessary.
2) In greatest/least, it should initialize the result using the first child (removing one block).
3) For primitive types, the generated greater expression is too complicated (`a > b ? 1 : (a < b) ? -1 : 0) > 0`), should be as simple as `a > b`
Combine these optimization, this could improve the performance of `ss_max` query by 30%.
Author: Davies Liu <davies@databricks.com>
Closes#9846 from davies/improve_max.
Fixes bug with grouping sets (including cube/rollup) where aggregates that included grouping expressions would return the wrong (null) result.
Also simplifies the analyzer rule a bit and leaves column pruning to the optimizer.
Added multiple unit tests to DataFrameAggregateSuite and verified it passes hive compatibility suite:
```
build/sbt -Phive -Dspark.hive.whitelist='groupby.*_grouping.*' 'test-only org.apache.spark.sql.hive.execution.HiveCompatibilitySuite'
```
This is an alternative to pr https://github.com/apache/spark/pull/9419 but I think its better as it simplifies the analyzer rule instead of adding another special case to it.
Author: Andrew Ray <ray.andrew@gmail.com>
Closes#9815 from aray/groupingset-agg-fix.
After some experiment, I found it's not convenient to have separate encoder builders: `FlatEncoder` and `ProductEncoder`. For example, when create encoders for `ScalaUDF`, we have no idea if the type `T` is flat or not. So I revert the splitting change in https://github.com/apache/spark/pull/9693, while still keeping the bug fixes and tests.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#9726 from cloud-fan/follow.
The impact of this change is for a query that has a single distinct column and does not have any grouping expression like
`SELECT COUNT(DISTINCT a) FROM table`
The plan will be changed from
```
AGG-2 (count distinct)
Shuffle to a single reducer
Partial-AGG-2 (count distinct)
AGG-1 (grouping on a)
Shuffle by a
Partial-AGG-1 (grouping on 1)
```
to the following one (1.5 uses this)
```
AGG-2
AGG-1 (grouping on a)
Shuffle to a single reducer
Partial-AGG-1(grouping on a)
```
The first plan is more robust. However, to better benchmark the impact of this change, we should use 1.5's plan and use the conf of `spark.sql.specializeSingleDistinctAggPlanning` to control the plan.
Author: Yin Huai <yhuai@databricks.com>
Closes#9828 from yhuai/distinctRewriter.
We currently rely on the optimizer's constant folding to replace current_timestamp and current_date. However, this can still result in different values for different instances of current_timestamp/current_date if the optimizer is not running fast enough.
A better solution is to replace these functions in the analyzer in one shot.
Author: Reynold Xin <rxin@databricks.com>
Closes#9833 from rxin/SPARK-11849.
This patch adds an alternate to the Parquet RecordReader from the parquet-mr project
that is much faster for flat schemas. Instead of using the general converter mechanism
from parquet-mr, this directly uses the lower level APIs from parquet-columnar and a
customer RecordReader that directly assembles into UnsafeRows.
This is optionally disabled and only used for supported schemas.
Using the tpcds store sales table and doing a sum of increasingly more columns, the results
are:
For 1 Column:
Before: 11.3M rows/second
After: 18.2M rows/second
For 2 Columns:
Before: 7.2M rows/second
After: 11.2M rows/second
For 5 Columns:
Before: 2.9M rows/second
After: 4.5M rows/second
Author: Nong Li <nong@databricks.com>
Closes#9774 from nongli/parquet.
Also added some nicer error messages for incompatible types (private types and primitive types) for Kryo/Java encoder.
Author: Reynold Xin <rxin@databricks.com>
Closes#9823 from rxin/SPARK-11833.
Before this PR there were two things that would blow up if you called `df.as[MyClass]` if `MyClass` was defined in the REPL:
- [x] Because `classForName` doesn't work on the munged names returned by `tpe.erasure.typeSymbol.asClass.fullName`
- [x] Because we don't have anything to pass into the constructor for the `$outer` pointer.
Note that this PR is just adding the infrastructure for working with inner classes in encoder and is not yet sufficient to make them work in the REPL. Currently, the implementation show in 95cec7d413 is causing a bug that breaks code gen due to some interaction between janino and the `ExecutorClassLoader`. This will be addressed in a follow-up PR.
Author: Michael Armbrust <michael@databricks.com>
Closes#9602 from marmbrus/dataset-replClasses.
This patch refactors the existing Kryo encoder expressions and adds support for Java serialization.
Author: Reynold Xin <rxin@databricks.com>
Closes#9802 from rxin/SPARK-11810.
return Double.NaN for mean/average when count == 0 for all numeric types that is converted to Double, Decimal type continue to return null.
Author: JihongMa <linlin200605@gmail.com>
Closes#9705 from JihongMA/SPARK-11720.
If user use primitive parameters in UDF, there is no way for him to do the null-check for primitive inputs, so we are assuming the primitive input is null-propagatable for this case and return null if the input is null.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#9770 from cloud-fan/udf.
I also found a bug with self-joins returning incorrect results in the Dataset API. Two test cases attached and filed SPARK-11803.
Author: Reynold Xin <rxin@databricks.com>
Closes#9789 from rxin/SPARK-11802.
Based on the comment of cloud-fan in https://github.com/apache/spark/pull/9216, update the AttributeReference's hashCode function by including the hashCode of the other attributes including name, nullable and qualifiers.
Here, I am not 100% sure if we should include name in the hashCode calculation, since the original hashCode calculation does not include it.
marmbrus cloud-fan Please review if the changes are good.
Author: gatorsmile <gatorsmile@gmail.com>
Closes#9761 from gatorsmile/hashCodeNamedExpression.
In the previous method, fields.toArray will cast java.util.List[StructField] into Array[Object] which can not cast into Array[StructField], thus when invoking this method will throw "java.lang.ClassCastException: [Ljava.lang.Object; cannot be cast to [Lorg.apache.spark.sql.types.StructField;"
I directly cast java.util.List[StructField] into Array[StructField] in this patch.
Author: mayuanwen <mayuanwen@qiyi.com>
Closes#9649 from jackieMaKing/Spark-11679.
During executing PromoteStrings rule, if one side of binaryComparison is StringType and the other side is not StringType, the current code will promote(cast) the StringType to DoubleType, and if the StringType doesn't contain the numbers, it will get null value. So if it is doing <=> (NULL-safe equal) with Null, it will not filter anything, caused the problem reported by this jira.
I proposal to the changes through this PR, can you review my code changes ?
This problem only happen for <=>, other operators works fine.
scala> val filteredDF = df.filter(df("column") > (new Column(Literal(null))))
filteredDF: org.apache.spark.sql.DataFrame = [column: string]
scala> filteredDF.show
+------+
|column|
+------+
+------+
scala> val filteredDF = df.filter(df("column") === (new Column(Literal(null))))
filteredDF: org.apache.spark.sql.DataFrame = [column: string]
scala> filteredDF.show
+------+
|column|
+------+
+------+
scala> df.registerTempTable("DF")
scala> sqlContext.sql("select * from DF where 'column' = NULL")
res27: org.apache.spark.sql.DataFrame = [column: string]
scala> res27.show
+------+
|column|
+------+
+------+
Author: Kevin Yu <qyu@us.ibm.com>
Closes#9720 from kevinyu98/working_on_spark-11447.
This patch adds an alias for current_timestamp (now function).
Also fixes SPARK-9196 to re-enable the test case for current_timestamp.
Author: Reynold Xin <rxin@databricks.com>
Closes#9753 from rxin/SPARK-11768.
This fix is to change the equals method to check all of the specified fields for equality of AttributeReference.
Author: gatorsmile <gatorsmile@gmail.com>
Closes#9216 from gatorsmile/namedExpressEqual.
Invocation of getters for type extending AnyVal returns default value (if field value is null) instead of throwing NPE. Please check comments for SPARK-11553 issue for more details.
Author: Bartlomiej Alberski <bartlomiej.alberski@allegrogroup.com>
Closes#9642 from alberskib/bugfix/SPARK-11553.
These 2 are very similar, we can consolidate them into one.
Also add tests for it and fix a bug.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#9729 from cloud-fan/tuple.
JIRA: https://issues.apache.org/jira/browse/SPARK-11743
RowEncoder doesn't support UserDefinedType now. We should add the support for it.
Author: Liang-Chi Hsieh <viirya@appier.com>
Closes#9712 from viirya/rowencoder-udt.
code snippet to reproduce it:
```
TimeZone.setDefault(TimeZone.getTimeZone("Asia/Shanghai"))
val t = Timestamp.valueOf("1900-06-11 12:14:50.789")
val us = fromJavaTimestamp(t)
assert(getSeconds(us) === t.getSeconds)
```
it will be good to add a regression test for it, but the reproducing code need to change the default timezone, and even we change it back, the `lazy val defaultTimeZone` in `DataTimeUtils` is fixed.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#9728 from cloud-fan/seconds.
also add more tests for encoders, and fix bugs that I found:
* when convert array to catalyst array, we can only skip element conversion for native types(e.g. int, long, boolean), not `AtomicType`(String is AtomicType but we need to convert it)
* we should also handle scala `BigDecimal` when convert from catalyst `Decimal`.
* complex map type should be supported
other issues that still in investigation:
* encode java `BigDecimal` and decode it back, seems we will loss precision info.
* when encode case class that defined inside a object, `ClassNotFound` exception will be thrown.
I'll remove unused code in a follow-up PR.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#9693 from cloud-fan/split.
* rename `AppendColumn` to `AppendColumns` to be consistent with the physical plan name.
* clean up stale comments.
* always pass in resolved encoder to `TypedColumn.withInputType`(test added)
* enable a mistakenly disabled java test.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#9688 from cloud-fan/follow.
This PR adds a new method, `reduce`, to `GroupedDataset`, which allows similar operations to `reduceByKey` on a traditional `PairRDD`.
```scala
val ds = Seq("abc", "xyz", "hello").toDS()
ds.groupBy(_.length).reduce(_ + _).collect() // not actually commutative :P
res0: Array(3 -> "abcxyz", 5 -> "hello")
```
While implementing this method and its test cases several more deficiencies were found in our encoder handling. Specifically, in order to support positional resolution, named resolution and tuple composition, it is important to keep the unresolved encoder around and to use it when constructing new `Datasets` with the same object type but different output attributes. We now divide the encoder lifecycle into three phases (that mirror the lifecycle of standard expressions) and have checks at various boundaries:
- Unresoved Encoders: all users facing encoders (those constructed by implicits, static methods, or tuple composition) are unresolved, meaning they have only `UnresolvedAttributes` for named fields and `BoundReferences` for fields accessed by ordinal.
- Resolved Encoders: internal to a `[Grouped]Dataset` the encoder is resolved, meaning all input has been resolved to a specific `AttributeReference`. Any encoders that are placed into a logical plan for use in object construction should be resolved.
- BoundEncoder: Are constructed by physical plans, right before actual conversion from row -> object is performed.
It is left to future work to add explicit checks for resolution and provide good error messages when it fails. We might also consider enforcing the above constraints in the type system (i.e. `fromRow` only exists on a `ResolvedEncoder`), but we should probably wait before spending too much time on this.
Author: Michael Armbrust <michael@databricks.com>
Author: Wenchen Fan <wenchen@databricks.com>
Closes#9673 from marmbrus/pr/9628.
switched stddev support from DeclarativeAggregate to ImperativeAggregate.
Author: JihongMa <linlin200605@gmail.com>
Closes#9380 from JihongMA/SPARK-11420.
`to_unix_timestamp` is the deterministic version of `unix_timestamp`, as it accepts at least one parameters.
Since the behavior here is quite similar to `unix_timestamp`, I think the dataframe API is not necessary here.
Author: Daoyuan Wang <daoyuan.wang@intel.com>
Closes#9347 from adrian-wang/to_unix_timestamp.
This adds a pivot method to the dataframe api.
Following the lead of cube and rollup this adds a Pivot operator that is translated into an Aggregate by the analyzer.
Currently the syntax is like:
~~courseSales.pivot(Seq($"year"), $"course", Seq("dotNET", "Java"), sum($"earnings"))~~
~~Would we be interested in the following syntax also/alternatively? and~~
courseSales.groupBy($"year").pivot($"course", "dotNET", "Java").agg(sum($"earnings"))
//or
courseSales.groupBy($"year").pivot($"course").agg(sum($"earnings"))
Later we can add it to `SQLParser`, but as Hive doesn't support it we cant add it there, right?
~~Also what would be the suggested Java friendly method signature for this?~~
Author: Andrew Ray <ray.andrew@gmail.com>
Closes#7841 from aray/sql-pivot.