Use try to match the behavior for single distinct aggregation with Spark 1.5, but that's not scalable, we should be robust by default, have a flag to address performance regression for low cardinality aggregation.
cc yhuai nongli
Author: Davies Liu <davies@databricks.com>
Closes#10075 from davies/agg_15.
When query the Timestamp or Date column like the following
val filtered = jdbcdf.where($"TIMESTAMP_COLUMN" >= beg && $"TIMESTAMP_COLUMN" < end)
The generated SQL query is "TIMESTAMP_COLUMN >= 2015-01-01 00:00:00.0"
It should have quote around the Timestamp/Date value such as "TIMESTAMP_COLUMN >= '2015-01-01 00:00:00.0'"
Author: Huaxin Gao <huaxing@oc0558782468.ibm.com>
Closes#9872 from huaxingao/spark-11788.
The issue is that the output commiter is not idempotent and retry attempts will
fail because the output file already exists. It is not safe to clean up the file
as this output committer is by design not retryable. Currently, the job fails
with a confusing file exists error. This patch is a stop gap to tell the user
to look at the top of the error log for the proper message.
This is difficult to test locally as Spark is hardcoded not to retry. Manually
verified by upping the retry attempts.
Author: Nong Li <nong@databricks.com>
Author: Nong Li <nongli@gmail.com>
Closes#10080 from nongli/spark-11328.
Persist and Unpersist exist in both RDD and Dataframe APIs. I think they are still very critical in Dataset APIs. Not sure if my understanding is correct? If so, could you help me check if the implementation is acceptable?
Please provide your opinions. marmbrus rxin cloud-fan
Thank you very much!
Author: gatorsmile <gatorsmile@gmail.com>
Author: xiaoli <lixiao1983@gmail.com>
Author: Xiao Li <xiaoli@Xiaos-MacBook-Pro.local>
Closes#9889 from gatorsmile/persistDS.
create java version of `constructorFor` and `extractorFor` in `JavaTypeInference`
Author: Wenchen Fan <wenchen@databricks.com>
This patch had conflicts when merged, resolved by
Committer: Michael Armbrust <michael@databricks.com>
Closes#9937 from cloud-fan/pojo.
When we build the `fromRowExpression` for an encoder, we set up a lot of "unresolved" stuff and lost the required data type, which may lead to runtime error if the real type doesn't match the encoder's schema.
For example, we build an encoder for `case class Data(a: Int, b: String)` and the real type is `[a: int, b: long]`, then we will hit runtime error and say that we can't construct class `Data` with int and long, because we lost the information that `b` should be a string.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#9840 from cloud-fan/err-msg.
The reason is that, for a single culumn `RowEncoder`(or a single field product encoder), when we use it as the encoder for grouping key, we should also combine the grouping attributes, although there is only one grouping attribute.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#10059 from cloud-fan/bug.
JIRA: https://issues.apache.org/jira/browse/SPARK-11949
The result of cube plan uses incorrect schema. The schema of cube result should set nullable property to true because the grouping expressions will have null values.
Author: Liang-Chi Hsieh <viirya@appier.com>
Closes#10038 from viirya/fix-cube.
This reverts commit cc243a079b / PR #9297
I'm reverting this because it broke SQLListenerMemoryLeakSuite in the master Maven builds.
See #9991 for a discussion of why this broke the tests.
This PR improve the performance of CartesianProduct by caching the result of right plan.
After this patch, the query time of TPC-DS Q65 go down to 4 seconds from 28 minutes (420X faster).
cc nongli
Author: Davies Liu <davies@databricks.com>
Closes#9969 from davies/improve_cartesian.
In 1.6, we introduce a public API to have a SQLContext for current thread, SparkPlan should use that.
Author: Davies Liu <davies@databricks.com>
Closes#9990 from davies/leak_context.
In https://github.com/apache/spark/pull/9409 we enabled multi-column counting. The approach taken in that PR introduces a bit of overhead by first creating a row only to check if all of the columns are non-null.
This PR fixes that technical debt. Count now takes multiple columns as its input. In order to make this work I have also added support for multiple columns in the single distinct code path.
cc yhuai
Author: Herman van Hovell <hvanhovell@questtec.nl>
Closes#10015 from hvanhovell/SPARK-12024.
When calling `get_json_object` for the following two cases, both results are `"null"`:
```scala
val tuple: Seq[(String, String)] = ("5", """{"f1": null}""") :: Nil
val df: DataFrame = tuple.toDF("key", "jstring")
val res = df.select(functions.get_json_object($"jstring", "$.f1")).collect()
```
```scala
val tuple2: Seq[(String, String)] = ("5", """{"f1": "null"}""") :: Nil
val df2: DataFrame = tuple2.toDF("key", "jstring")
val res3 = df2.select(functions.get_json_object($"jstring", "$.f1")).collect()
```
Fixed the problem and also added a test case.
Author: gatorsmile <gatorsmile@gmail.com>
Closes#10018 from gatorsmile/get_json_object.
Check for partition column null-ability while building the partition spec.
Author: Dilip Biswal <dbiswal@us.ibm.com>
Closes#10001 from dilipbiswal/spark-11997.
Reference: https://jdbc.postgresql.org/documentation/head/query.html#query-with-cursor
In order for PostgreSQL to honor the fetchSize non-zero setting, its Connection.autoCommit needs to be set to false. Otherwise, it will just quietly ignore the fetchSize setting.
This adds a new side-effecting dialect specific beforeFetch method that will fire before a select query is ran.
Author: mariusvniekerk <marius.v.niekerk@gmail.com>
Closes#9861 from mariusvniekerk/SPARK-11881.
On the live web UI, there is a SQL tab which provides valuable information for the SQL query. But once the workload is finished, we won't see the SQL tab on the history server. It will be helpful if we support SQL UI on the history server so we can analyze it even after its execution.
To support SQL UI on the history server:
1. I added an `onOtherEvent` method to the `SparkListener` trait and post all SQL related events to the same event bus.
2. Two SQL events `SparkListenerSQLExecutionStart` and `SparkListenerSQLExecutionEnd` are defined in the sql module.
3. The new SQL events are written to event log using Jackson.
4. A new trait `SparkHistoryListenerFactory` is added to allow the history server to feed events to the SQL history listener. The SQL implementation is loaded at runtime using `java.util.ServiceLoader`.
Author: Carson Wang <carson.wang@intel.com>
Closes#9297 from carsonwang/SqlHistoryUI.
Currently, we does not have visualization for SQL query from Python, this PR fix that.
cc zsxwing
Author: Davies Liu <davies@databricks.com>
Closes#9949 from davies/pyspark_sql_ui.
Except inner join, maybe the other join types are also useful when users are using the joinWith function. Thus, added the joinType into the existing joinWith call in Dataset APIs.
Also providing another joinWith interface for the cartesian-join-like functionality.
Please provide your opinions. marmbrus rxin cloud-fan Thank you!
Author: gatorsmile <gatorsmile@gmail.com>
Closes#9921 from gatorsmile/joinWith.
This patch makes it consistent to use varargs in all DataFrameReader methods, including Parquet, JSON, text, and the generic load function.
Also added a few more API tests for the Java API.
Author: Reynold Xin <rxin@databricks.com>
Closes#9945 from rxin/SPARK-11967.
This PR is to provide two common `coalesce` and `repartition` in Dataset APIs.
After reading the comments of SPARK-9999, I am unclear about the plan for supporting re-partitioning in Dataset APIs. Currently, both RDD APIs and Dataframe APIs provide users such a flexibility to control the number of partitions.
In most traditional RDBMS, they expose the number of partitions, the partitioning columns, the table partitioning methods to DBAs for performance tuning and storage planning. Normally, these parameters could largely affect the query performance. Since the actual performance depends on the workload types, I think it is almost impossible to automate the discovery of the best partitioning strategy for all the scenarios.
I am wondering if Dataset APIs are planning to hide these APIs from users? Feel free to reject my PR if it does not match the plan.
Thank you for your answers. marmbrus rxin cloud-fan
Author: gatorsmile <gatorsmile@gmail.com>
Closes#9899 from gatorsmile/coalesce.
Currently pivot's signature looks like
```scala
scala.annotation.varargs
def pivot(pivotColumn: Column, values: Column*): GroupedData
scala.annotation.varargs
def pivot(pivotColumn: String, values: Any*): GroupedData
```
I think we can remove the one that takes "Column" types, since callers should always be passing in literals. It'd also be more clear if the values are not varargs, but rather Seq or java.util.List.
I also made similar changes for Python.
Author: Reynold Xin <rxin@databricks.com>
Closes#9929 from rxin/SPARK-11946.
we should pass in resolved encodera to logical `CoGroup` and bind them in physical `CoGroup`
Author: Wenchen Fan <wenchen@databricks.com>
Closes#9928 from cloud-fan/cogroup.
Based on feedback from Matei, this is more consistent with mapPartitions in Spark.
Also addresses some of the cleanups from a previous commit that renames the type variables.
Author: Reynold Xin <rxin@databricks.com>
Closes#9919 from rxin/SPARK-11933.
We should use `InternalRow.isNullAt` to check if the field is null before calling `InternalRow.getXXX`
Thanks gatorsmile who discovered this bug.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#9904 from cloud-fan/null.
Can someone review my code to make sure I'm not missing anything? Thanks!
Author: Xiu Guo <xguo27@gmail.com>
Author: Xiu Guo <guoxi@us.ibm.com>
Closes#9612 from xguo27/SPARK-11628.
1. Renamed map to mapGroup, flatMap to flatMapGroup.
2. Renamed asKey -> keyAs.
3. Added more documentation.
4. Changed type parameter T to V on GroupedDataset.
5. Added since versions for all functions.
Author: Reynold Xin <rxin@databricks.com>
Closes#9880 from rxin/SPARK-11899.
In this PR I delete a method that breaks type inference for aggregators (only in the REPL)
The error when this method is present is:
```
<console>:38: error: missing parameter type for expanded function ((x$2) => x$2._2)
ds.groupBy(_._1).agg(sum(_._2), sum(_._3)).collect()
```
Author: Michael Armbrust <michael@databricks.com>
Closes#9870 from marmbrus/dataset-repl-agg.
This mainly moves SqlNewHadoopRDD to the sql package. There is some state that is
shared between core and I've left that in core. This allows some other associated
minor cleanup.
Author: Nong Li <nong@databricks.com>
Closes#9845 from nongli/spark-11787.
Hive has since changed this behavior as well. https://issues.apache.org/jira/browse/HIVE-3454
Author: Nong Li <nong@databricks.com>
Author: Nong Li <nongli@gmail.com>
Author: Yin Huai <yhuai@databricks.com>
Closes#9685 from nongli/spark-11724.
DataSet APIs look great! However, I am lost when doing multiple level joins. For example,
```
val ds1 = Seq(("a", 1), ("b", 2)).toDS().as("a")
val ds2 = Seq(("a", 1), ("b", 2)).toDS().as("b")
val ds3 = Seq(("a", 1), ("b", 2)).toDS().as("c")
ds1.joinWith(ds2, $"a._2" === $"b._2").as("ab").joinWith(ds3, $"ab._1._2" === $"c._2").printSchema()
```
The printed schema is like
```
root
|-- _1: struct (nullable = true)
| |-- _1: struct (nullable = true)
| | |-- _1: string (nullable = true)
| | |-- _2: integer (nullable = true)
| |-- _2: struct (nullable = true)
| | |-- _1: string (nullable = true)
| | |-- _2: integer (nullable = true)
|-- _2: struct (nullable = true)
| |-- _1: string (nullable = true)
| |-- _2: integer (nullable = true)
```
Personally, I think we need the printSchema function. Sometimes, I do not know how to specify the column, especially when their data types are mixed. For example, if I want to write the following select for the above multi-level join, I have to know the schema:
```
newDS.select(expr("_1._2._2 + 1").as[Int]).collect()
```
marmbrus rxin cloud-fan Do you have the same feeling?
Author: gatorsmile <gatorsmile@gmail.com>
Closes#9855 from gatorsmile/printSchemaDataSet.
Apply the user supplied pathfilter while retrieving the files from fs.
Author: Dilip Biswal <dbiswal@us.ibm.com>
Closes#9830 from dilipbiswal/spark-11544.
Fixes bug with grouping sets (including cube/rollup) where aggregates that included grouping expressions would return the wrong (null) result.
Also simplifies the analyzer rule a bit and leaves column pruning to the optimizer.
Added multiple unit tests to DataFrameAggregateSuite and verified it passes hive compatibility suite:
```
build/sbt -Phive -Dspark.hive.whitelist='groupby.*_grouping.*' 'test-only org.apache.spark.sql.hive.execution.HiveCompatibilitySuite'
```
This is an alternative to pr https://github.com/apache/spark/pull/9419 but I think its better as it simplifies the analyzer rule instead of adding another special case to it.
Author: Andrew Ray <ray.andrew@gmail.com>
Closes#9815 from aray/groupingset-agg-fix.