## What changes were proposed in this pull request?
Prior this pr, the following code would cause an NPE:
`case class point(a:String, b:String, c:String, d: Int)`
`val data = Seq(
point("1","2","3", 1),
point("4","5","6", 1),
point("7","8","9", 1)
)`
`sc.parallelize(data).toDF().registerTempTable("table")`
`spark.sql("select a, b, c, count(d) from table group by a, b, c GROUPING SETS ((a)) ").show()`
The reason is that when the grouping_id() behavior was changed in #10677, some code (which should be changed) was left out.
Take the above code for example, prior #10677, the bit mask for set "(a)" was `001`, while after #10677 the bit mask was changed to `011`. However, the `nonNullBitmask` was not changed accordingly.
This pr will fix this problem.
## How was this patch tested?
add integration tests
Author: wangyang <wangyang@haizhi.com>
Closes#15416 from yangw1234/groupingid.
## What changes were proposed in this pull request?
This PR proposes to fix
```diff
test("FileStreamSink - json") {
- testFormat(Some("text"))
+ testFormat(Some("json"))
}
```
`text` is being tested above
```
test("FileStreamSink - text") {
testFormat(Some("text"))
}
```
## How was this patch tested?
Fixed test in `FileStreamSinkSuite.scala`.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#15785 from HyukjinKwon/SPARK-18192.
## What changes were proposed in this pull request?
For data source tables, we will put its table schema, partition columns, etc. to table properties, to work around some hive metastore issues, e.g. not case-preserving, bad decimal type support, etc.
We should also do this for hive serde tables, to reduce the difference between hive serde tables and data source tables, e.g. column names should be case preserving.
## How was this patch tested?
existing tests, and a new test in `HiveExternalCatalog`
Author: Wenchen Fan <wenchen@databricks.com>
Closes#14750 from cloud-fan/minor1.
## What changes were proposed in this pull request?
The `PushDownPredicate` rule can create a wrong result if we try to push a filter containing a predicate subquery through a project when the subquery and the project share attributes (have the same source).
The current PR fixes this by making sure that we do not push down when there is a predicate subquery that outputs the same attributes as the filters new child plan.
## How was this patch tested?
Added a test to `SubquerySuite`. nsyca has done previous work this. I have taken test from his initial PR.
Author: Herman van Hovell <hvanhovell@databricks.com>
Closes#15761 from hvanhovell/SPARK-17337.
## What changes were proposed in this pull request?
`QueryExecution.toString` currently captures `java.lang.Throwable`s; this is far from a best practice and can lead to confusing situation or invalid application states. This PR fixes this by only capturing `AnalysisException`s.
## How was this patch tested?
Added a `QueryExecutionSuite`.
Author: Herman van Hovell <hvanhovell@databricks.com>
Closes#15760 from hvanhovell/SPARK-18259.
## What changes were proposed in this pull request?
This patch improves error reporting for FileStressSuite, when there is an error in Spark itself (not user code). This works by simply tightening the exception verification, and gets rid of the unnecessary thread for starting the stream.
Also renamed the class FileStreamStressSuite to make it more obvious it is a streaming suite.
## How was this patch tested?
This is a test only change and I manually verified error reporting by injecting some bug in the addBatch code for FileStreamSink.
Author: Reynold Xin <rxin@databricks.com>
Closes#15757 from rxin/SPARK-18257.
## What changes were proposed in this pull request?
This patch renames partitionProviderIsHive to tracksPartitionsInCatalog, as the old name was too Hive specific.
## How was this patch tested?
Should be covered by existing tests.
Author: Reynold Xin <rxin@databricks.com>
Closes#15750 from rxin/SPARK-18244.
## What changes were proposed in this pull request?
This PR adds a new hash-based aggregate operator named `ObjectHashAggregateExec` that supports `TypedImperativeAggregate`, which may use arbitrary Java objects as aggregation states. Please refer to the [design doc](https://issues.apache.org/jira/secure/attachment/12834260/%5BDesign%20Doc%5D%20Support%20for%20Arbitrary%20Aggregation%20States.pdf) attached in [SPARK-17949](https://issues.apache.org/jira/browse/SPARK-17949) for more details about it.
The major benefit of this operator is better performance when evaluating `TypedImperativeAggregate` functions, especially when there are relatively few distinct groups. Functions like Hive UDAFs, `collect_list`, and `collect_set` may also benefit from this after being migrated to `TypedImperativeAggregate`.
The following feature flag is introduced to enable or disable the new aggregate operator:
- Name: `spark.sql.execution.useObjectHashAggregateExec`
- Default value: `true`
We can also configure the fallback threshold using the following SQL operation:
- Name: `spark.sql.objectHashAggregate.sortBased.fallbackThreshold`
- Default value: 128
Fallback to sort-based aggregation when more than 128 distinct groups are accumulated in the aggregation hash map. This number is intentionally made small to avoid GC problems since aggregation buffers of this operator may contain arbitrary Java objects.
This may be improved by implementing size tracking for this operator, but that can be done in a separate PR.
Code generation and size tracking are planned to be implemented in follow-up PRs.
## Benchmark results
### `ObjectHashAggregateExec` vs `SortAggregateExec`
The first benchmark compares `ObjectHashAggregateExec` and `SortAggregateExec` by evaluating `typed_count`, a testing `TypedImperativeAggregate` version of the SQL `count` function.
```
Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.10.5
Intel(R) Core(TM) i7-4960HQ CPU 2.60GHz
object agg v.s. sort agg: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
sort agg w/ group by 31251 / 31908 3.4 298.0 1.0X
object agg w/ group by w/o fallback 6903 / 7141 15.2 65.8 4.5X
object agg w/ group by w/ fallback 20945 / 21613 5.0 199.7 1.5X
sort agg w/o group by 4734 / 5463 22.1 45.2 6.6X
object agg w/o group by w/o fallback 4310 / 4529 24.3 41.1 7.3X
```
The next benchmark compares `ObjectHashAggregateExec` and `SortAggregateExec` by evaluating the Spark native version of `percentile_approx`.
Note that `percentile_approx` is so heavy an aggregate function that the bottleneck of the benchmark is evaluating the aggregate function itself rather than the aggregate operator since I couldn't run a large scale benchmark on my laptop. That's why the results are so close and looks counter-intuitive (aggregation with grouping is even faster than that aggregation without grouping).
```
Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.10.5
Intel(R) Core(TM) i7-4960HQ CPU 2.60GHz
object agg v.s. sort agg: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
sort agg w/ group by 3418 / 3530 0.6 1630.0 1.0X
object agg w/ group by w/o fallback 3210 / 3314 0.7 1530.7 1.1X
object agg w/ group by w/ fallback 3419 / 3511 0.6 1630.1 1.0X
sort agg w/o group by 4336 / 4499 0.5 2067.3 0.8X
object agg w/o group by w/o fallback 4271 / 4372 0.5 2036.7 0.8X
```
### Hive UDAF vs Spark AF
This benchmark compares the following two kinds of aggregate functions:
- "hive udaf": Hive implementation of `percentile_approx`, without partial aggregation supports, evaluated using `SortAggregateExec`.
- "spark af": Spark native implementation of `percentile_approx`, with partial aggregation support, evaluated using `ObjectHashAggregateExec`
The performance differences are mostly due to faster implementation and partial aggregation support in the Spark native version of `percentile_approx`.
This benchmark basically shows the performance differences between the worst case, where an aggregate function without partial aggregation support is evaluated using `SortAggregateExec`, and the best case, where a `TypedImperativeAggregate` with partial aggregation support is evaluated using `ObjectHashAggregateExec`.
```
Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.10.5
Intel(R) Core(TM) i7-4960HQ CPU 2.60GHz
hive udaf vs spark af: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
hive udaf w/o group by 5326 / 5408 0.0 81264.2 1.0X
spark af w/o group by 93 / 111 0.7 1415.6 57.4X
hive udaf w/ group by 3804 / 3946 0.0 58050.1 1.4X
spark af w/ group by w/o fallback 71 / 90 0.9 1085.7 74.8X
spark af w/ group by w/ fallback 98 / 111 0.7 1501.6 54.1X
```
### Real world benchmark
We also did a relatively large benchmark using a real world query involving `percentile_approx`:
- Hive UDAF implementation, sort-based aggregation, w/o partial aggregation support
24.77 minutes
- Native implementation, sort-based aggregation, w/ partial aggregation support
4.64 minutes
- Native implementation, object hash aggregator, w/ partial aggregation support
1.80 minutes
## How was this patch tested?
New unit tests and randomized test cases are added in `ObjectAggregateFunctionSuite`.
Author: Cheng Lian <lian@databricks.com>
Closes#15590 from liancheng/obj-hash-agg.
### What changes were proposed in this pull request?
When `FilterExec` contains `isNotNull`, which could be inferred and pushed down or users specified, we convert the nullability of the involved columns if the top-layer expression is null-intolerant. However, this is not correct, if the top-layer expression is not a leaf expression, it could still tolerate the null when it has null-tolerant child expressions.
For example, `cast(coalesce(a#5, a#15) as double)`. Although `cast` is a null-intolerant expression, but obviously`coalesce` is null-tolerant. Thus, it could eat null.
When the nullability is wrong, we could generate incorrect results in different cases. For example,
``` Scala
val df1 = Seq((1, 2), (2, 3)).toDF("a", "b")
val df2 = Seq((2, 5), (3, 4)).toDF("a", "c")
val joinedDf = df1.join(df2, Seq("a"), "outer").na.fill(0)
val df3 = Seq((3, 1)).toDF("a", "d")
joinedDf.join(df3, "a").show
```
The optimized plan is like
```
Project [a#29, b#30, c#31, d#42]
+- Join Inner, (a#29 = a#41)
:- Project [cast(coalesce(cast(coalesce(a#5, a#15) as double), 0.0) as int) AS a#29, cast(coalesce(cast(b#6 as double), 0.0) as int) AS b#30, cast(coalesce(cast(c#16 as double), 0.0) as int) AS c#31]
: +- Filter isnotnull(cast(coalesce(cast(coalesce(a#5, a#15) as double), 0.0) as int))
: +- Join FullOuter, (a#5 = a#15)
: :- LocalRelation [a#5, b#6]
: +- LocalRelation [a#15, c#16]
+- LocalRelation [a#41, d#42]
```
Without the fix, it returns an empty result. With the fix, it can return a correct answer:
```
+---+---+---+---+
| a| b| c| d|
+---+---+---+---+
| 3| 0| 4| 1|
+---+---+---+---+
```
### How was this patch tested?
Added test cases to verify the nullability changes in FilterExec. Also added a test case for verifying the reported incorrect result.
Author: gatorsmile <gatorsmile@gmail.com>
Closes#15523 from gatorsmile/nullabilityFilterExec.
## What changes were proposed in this pull request?
This patch moves the new commit protocol API from sql/core to core module, so we can use it in the future in the RDD API.
As part of this patch, I also moved the speficiation of the random uuid for the write path out of the commit protocol, and instead pass in a job id.
## How was this patch tested?
N/A
Author: Reynold Xin <rxin@databricks.com>
Closes#15731 from rxin/SPARK-18219.
## What changes were proposed in this pull request?
In Spark 1.6 and earlier, we can drop the database we are using. In Spark 2.0, native implementation prevent us from dropping current database, which may break some old queries. This PR would re-enable the feature.
## How was this patch tested?
one new unit test in `SessionCatalogSuite`.
Author: Daoyuan Wang <daoyuan.wang@intel.com>
Closes#15011 from adrian-wang/dropcurrent.
## What changes were proposed in this pull request?
This PR proposes to change the documentation for functions. Please refer the discussion from https://github.com/apache/spark/pull/15513
The changes include
- Re-indent the documentation
- Add examples/arguments in `extended` where the arguments are multiple or specific format (e.g. xml/ json).
For examples, the documentation was updated as below:
### Functions with single line usage
**Before**
- `pow`
``` sql
Usage: pow(x1, x2) - Raise x1 to the power of x2.
Extended Usage:
> SELECT pow(2, 3);
8.0
```
- `current_timestamp`
``` sql
Usage: current_timestamp() - Returns the current timestamp at the start of query evaluation.
Extended Usage:
No example for current_timestamp.
```
**After**
- `pow`
``` sql
Usage: pow(expr1, expr2) - Raises `expr1` to the power of `expr2`.
Extended Usage:
Examples:
> SELECT pow(2, 3);
8.0
```
- `current_timestamp`
``` sql
Usage: current_timestamp() - Returns the current timestamp at the start of query evaluation.
Extended Usage:
No example/argument for current_timestamp.
```
### Functions with (already) multiple line usage
**Before**
- `approx_count_distinct`
``` sql
Usage: approx_count_distinct(expr) - Returns the estimated cardinality by HyperLogLog++.
approx_count_distinct(expr, relativeSD=0.05) - Returns the estimated cardinality by HyperLogLog++
with relativeSD, the maximum estimation error allowed.
Extended Usage:
No example for approx_count_distinct.
```
- `percentile_approx`
``` sql
Usage:
percentile_approx(col, percentage [, accuracy]) - Returns the approximate percentile value of numeric
column `col` at the given percentage. The value of percentage must be between 0.0
and 1.0. The `accuracy` parameter (default: 10000) is a positive integer literal which
controls approximation accuracy at the cost of memory. Higher value of `accuracy` yields
better accuracy, `1.0/accuracy` is the relative error of the approximation.
percentile_approx(col, array(percentage1 [, percentage2]...) [, accuracy]) - Returns the approximate
percentile array of column `col` at the given percentage array. Each value of the
percentage array must be between 0.0 and 1.0. The `accuracy` parameter (default: 10000) is
a positive integer literal which controls approximation accuracy at the cost of memory.
Higher value of `accuracy` yields better accuracy, `1.0/accuracy` is the relative error of
the approximation.
Extended Usage:
No example for percentile_approx.
```
**After**
- `approx_count_distinct`
``` sql
Usage:
approx_count_distinct(expr[, relativeSD]) - Returns the estimated cardinality by HyperLogLog++.
`relativeSD` defines the maximum estimation error allowed.
Extended Usage:
No example/argument for approx_count_distinct.
```
- `percentile_approx`
``` sql
Usage:
percentile_approx(col, percentage [, accuracy]) - Returns the approximate percentile value of numeric
column `col` at the given percentage. The value of percentage must be between 0.0
and 1.0. The `accuracy` parameter (default: 10000) is a positive numeric literal which
controls approximation accuracy at the cost of memory. Higher value of `accuracy` yields
better accuracy, `1.0/accuracy` is the relative error of the approximation.
When `percentage` is an array, each value of the percentage array must be between 0.0 and 1.0.
In this case, returns the approximate percentile array of column `col` at the given
percentage array.
Extended Usage:
Examples:
> SELECT percentile_approx(10.0, array(0.5, 0.4, 0.1), 100);
[10.0,10.0,10.0]
> SELECT percentile_approx(10.0, 0.5, 100);
10.0
```
## How was this patch tested?
Manually tested
**When examples are multiple**
``` sql
spark-sql> describe function extended reflect;
Function: reflect
Class: org.apache.spark.sql.catalyst.expressions.CallMethodViaReflection
Usage: reflect(class, method[, arg1[, arg2 ..]]) - Calls a method with reflection.
Extended Usage:
Examples:
> SELECT reflect('java.util.UUID', 'randomUUID');
c33fb387-8500-4bfa-81d2-6e0e3e930df2
> SELECT reflect('java.util.UUID', 'fromString', 'a5cf6c42-0c85-418f-af6c-3e4e5b1328f2');
a5cf6c42-0c85-418f-af6c-3e4e5b1328f2
```
**When `Usage` is in single line**
``` sql
spark-sql> describe function extended min;
Function: min
Class: org.apache.spark.sql.catalyst.expressions.aggregate.Min
Usage: min(expr) - Returns the minimum value of `expr`.
Extended Usage:
No example/argument for min.
```
**When `Usage` is already in multiple lines**
``` sql
spark-sql> describe function extended percentile_approx;
Function: percentile_approx
Class: org.apache.spark.sql.catalyst.expressions.aggregate.ApproximatePercentile
Usage:
percentile_approx(col, percentage [, accuracy]) - Returns the approximate percentile value of numeric
column `col` at the given percentage. The value of percentage must be between 0.0
and 1.0. The `accuracy` parameter (default: 10000) is a positive numeric literal which
controls approximation accuracy at the cost of memory. Higher value of `accuracy` yields
better accuracy, `1.0/accuracy` is the relative error of the approximation.
When `percentage` is an array, each value of the percentage array must be between 0.0 and 1.0.
In this case, returns the approximate percentile array of column `col` at the given
percentage array.
Extended Usage:
Examples:
> SELECT percentile_approx(10.0, array(0.5, 0.4, 0.1), 100);
[10.0,10.0,10.0]
> SELECT percentile_approx(10.0, 0.5, 100);
10.0
```
**When example/argument is missing**
``` sql
spark-sql> describe function extended rank;
Function: rank
Class: org.apache.spark.sql.catalyst.expressions.Rank
Usage:
rank() - Computes the rank of a value in a group of values. The result is one plus the number
of rows preceding or equal to the current row in the ordering of the partition. The values
will produce gaps in the sequence.
Extended Usage:
No example/argument for rank.
```
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#15677 from HyukjinKwon/SPARK-17963-1.
## What changes were proposed in this pull request?
Due to a limitation of hive metastore(table location must be directory path, not file path), we always store `path` for data source table in storage properties, instead of the `locationUri` field. However, we should not expose this difference to `CatalogTable` level, but just treat it as a hack in `HiveExternalCatalog`, like we store table schema of data source table in table properties.
This PR unifies `path` and `locationUri` outside of `HiveExternalCatalog`, both data source table and hive serde table should use the `locationUri` field.
This PR also unifies the way we handle default table location for managed table. Previously, the default table location of hive serde managed table is set by external catalog, but the one of data source table is set by command. After this PR, we follow the hive way and the default table location is always set by external catalog.
For managed non-file-based tables, we will assign a default table location and create an empty directory for it, the table location will be removed when the table is dropped. This is reasonable as metastore doesn't care about whether a table is file-based or not, and an empty table directory has no harm.
For external non-file-based tables, ideally we can omit the table location, but due to a hive metastore issue, we will assign a random location to it, and remove it right after the table is created. See SPARK-15269 for more details. This is fine as it's well isolated in `HiveExternalCatalog`.
To keep the existing behaviour of the `path` option, in this PR we always add the `locationUri` to storage properties using key `path`, before passing storage properties to `DataSource` as data source options.
## How was this patch tested?
existing tests.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#15024 from cloud-fan/path.
## What changes were proposed in this pull request?
RuntimeReplaceable is used to create aliases for expressions, but the way it deals with type coercion is pretty weird (each expression is responsible for how to handle type coercion, which does not obey the normal implicit type cast rules).
This patch simplifies its handling by allowing the analyzer to traverse into the actual expression of a RuntimeReplaceable.
## How was this patch tested?
- Correctness should be guaranteed by existing unit tests already
- Removed SQLCompatibilityFunctionSuite and moved it sql-compatibility-functions.sql
- Added a new test case in sql-compatibility-functions.sql for verifying explain behavior.
Author: Reynold Xin <rxin@databricks.com>
Closes#15723 from rxin/SPARK-18214.
## What changes were proposed in this pull request?
When a user appended a column using a "nondeterministic" function to a DataFrame, e.g., `rand`, `randn`, and `monotonically_increasing_id`, the expected semantic is the following:
- The value in each row should remain unchanged, as if we materialize the column immediately, regardless of later DataFrame operations.
However, since we use `TaskContext.getPartitionId` to get the partition index from the current thread, the values from nondeterministic columns might change if we call `union` or `coalesce` after. `TaskContext.getPartitionId` returns the partition index of the current Spark task, which might not be the corresponding partition index of the DataFrame where we defined the column.
See the unit tests below or JIRA for examples.
This PR uses the partition index from `RDD.mapPartitionWithIndex` instead of `TaskContext` and fixes the partition initialization logic in whole-stage codegen, normal codegen, and codegen fallback. `initializeStatesForPartition(partitionIndex: Int)` was added to `Projection`, `Nondeterministic`, and `Predicate` (codegen) and initialized right after object creation in `mapPartitionWithIndex`. `newPredicate` now returns a `Predicate` instance rather than a function for proper initialization.
## How was this patch tested?
Unit tests. (Actually I'm not very confident that this PR fixed all issues without introducing new ones ...)
cc: rxin davies
Author: Xiangrui Meng <meng@databricks.com>
Closes#15567 from mengxr/SPARK-14393.
## What changes were proposed in this pull request?
Simplify struct creation, especially the aspect of `CleanupAliases` which missed some aliases when handling trees created by `CreateStruct`.
This PR includes:
1. A failing test (create struct with nested aliases, some of the aliases survive `CleanupAliases`).
2. A fix that transforms `CreateStruct` into a `CreateNamedStruct` constructor, effectively eliminating `CreateStruct` from all expression trees.
3. A `NamePlaceHolder` used by `CreateStruct` when column names cannot be extracted from unresolved `NamedExpression`.
4. A new Analyzer rule that resolves `NamePlaceHolder` into a string literal once the `NamedExpression` is resolved.
5. `CleanupAliases` code was simplified as it no longer has to deal with `CreateStruct`'s top level columns.
## How was this patch tested?
Running all tests-suits in package org.apache.spark.sql, especially including the analysis suite, making sure added test initially fails, after applying suggested fix rerun the entire analysis package successfully.
Modified few tests that expected `CreateStruct` which is now transformed into `CreateNamedStruct`.
Author: eyal farago <eyal farago>
Author: Herman van Hovell <hvanhovell@databricks.com>
Author: eyal farago <eyal.farago@gmail.com>
Author: Eyal Farago <eyal.farago@actimize.com>
Author: Hyukjin Kwon <gurwls223@gmail.com>
Author: eyalfa <eyal.farago@gmail.com>
Closes#15718 from hvanhovell/SPARK-16839-2.
## What changes were proposed in this pull request?
Fix `Locale.US` for all usages of `DateFormat`, `NumberFormat`
## How was this patch tested?
Existing tests.
Author: Sean Owen <sowen@cloudera.com>
Closes#15610 from srowen/SPARK-18076.
## What changes were proposed in this pull request?
The PR fixes the bug that the QueryStartedEvent is not logged
the postToAll() in the original code is actually calling StreamingQueryListenerBus.postToAll() which has no listener at all....we shall post by sparkListenerBus.postToAll(s) and this.postToAll() to trigger local listeners as well as the listeners registered in LiveListenerBus
zsxwing
## How was this patch tested?
The following snapshot shows that QueryStartedEvent has been logged correctly
![image](https://cloud.githubusercontent.com/assets/678008/19821553/007a7d28-9d2d-11e6-9f13-49851559cdaa.png)
Author: CodingCat <zhunansjtu@gmail.com>
Closes#15675 from CodingCat/SPARK-18144.
## What changes were proposed in this pull request?
This patch adds support for all file formats in structured streaming sinks. This is actually a very small change thanks to all the previous refactoring done using the new internal commit protocol API.
## How was this patch tested?
Updated FileStreamSinkSuite to add test cases for json, text, and parquet.
Author: Reynold Xin <rxin@databricks.com>
Closes#15711 from rxin/SPARK-18192.
## What changes were proposed in this pull request?
There are a couple issues with the current 2.1 behavior when inserting into Datasource tables with partitions managed by Hive.
(1) OVERWRITE TABLE ... PARTITION will actually overwrite the entire table instead of just the specified partition.
(2) INSERT|OVERWRITE does not work with partitions that have custom locations.
This PR fixes both of these issues for Datasource tables managed by Hive. The behavior for legacy tables or when `manageFilesourcePartitions = false` is unchanged.
There is one other issue in that INSERT OVERWRITE with dynamic partitions will overwrite the entire table instead of just the updated partitions, but this behavior is pretty complicated to implement for Datasource tables. We should address that in a future release.
## How was this patch tested?
Unit tests.
Author: Eric Liang <ekl@databricks.com>
Closes#15705 from ericl/sc-4942.
## What changes were proposed in this pull request?
When the metadata logs for various parts of Structured Streaming are stored on non-HDFS filesystems such as NFS or ext4, the HDFSMetadataLog class leaves hidden HDFS-style checksum (CRC) files in the log directory, one file per batch. This PR modifies HDFSMetadataLog so that it detects the use of a filesystem that doesn't use CRC files and removes the CRC files.
## How was this patch tested?
Modified an existing test case in HDFSMetadataLogSuite to check whether HDFSMetadataLog correctly removes CRC files on the local POSIX filesystem. Ran the entire regression suite.
Author: frreiss <frreiss@us.ibm.com>
Closes#15027 from frreiss/fred-17475.
## What changes were proposed in this pull request?
Column.expr is private[sql], but it's an actually really useful field to have for debugging. We should open it up, similar to how we use QueryExecution.
## How was this patch tested?
N/A - this is a simple visibility change.
Author: Reynold Xin <rxin@databricks.com>
Closes#15724 from rxin/SPARK-18216.
## What changes were proposed in this pull request?
This patch adds a new commit protocol implementation ManifestFileCommitProtocol that follows the existing streaming flow, and uses it in FileStreamSink to consolidate the write path in structured streaming with the batch mode write path.
This deletes a lot of code, and would make it trivial to support other functionalities that are currently available in batch but not in streaming, including all file formats and bucketing.
## How was this patch tested?
Should be covered by existing tests.
Author: Reynold Xin <rxin@databricks.com>
Closes#15710 from rxin/SPARK-18025.
## What changes were proposed in this pull request?
This PR proposes to add `to_json` function in contrast with `from_json` in Scala, Java and Python.
It'd be useful if we can convert a same column from/to json. Also, some datasources do not support nested types. If we are forced to save a dataframe into those data sources, we might be able to work around by this function.
The usage is as below:
``` scala
val df = Seq(Tuple1(Tuple1(1))).toDF("a")
df.select(to_json($"a").as("json")).show()
```
``` bash
+--------+
| json|
+--------+
|{"_1":1}|
+--------+
```
## How was this patch tested?
Unit tests in `JsonFunctionsSuite` and `JsonExpressionsSuite`.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#15354 from HyukjinKwon/SPARK-17764.
## What changes were proposed in this pull request?
Aggregation Without Window/GroupBy expressions will fail in `checkAnalysis`, the error message is a bit misleading, we should generate a more specific error message for this case.
For example,
```
spark.read.load("/some-data")
.withColumn("date_dt", to_date($"date"))
.withColumn("year", year($"date_dt"))
.withColumn("week", weekofyear($"date_dt"))
.withColumn("user_count", count($"userId"))
.withColumn("daily_max_in_week", max($"user_count").over(weeklyWindow))
)
```
creates the following output:
```
org.apache.spark.sql.AnalysisException: expression '`randomColumn`' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get.;
```
In the error message above, `randomColumn` doesn't appear in the query(acturally it's added by function `withColumn`), so the message is not enough for the user to address the problem.
## How was this patch tested?
Manually test
Before:
```
scala> spark.sql("select col, count(col) from tbl")
org.apache.spark.sql.AnalysisException: expression 'tbl.`col`' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get.;;
```
After:
```
scala> spark.sql("select col, count(col) from tbl")
org.apache.spark.sql.AnalysisException: grouping expressions sequence is empty, and 'tbl.`col`' is not an aggregate function. Wrap '(count(col#231L) AS count(col)#239L)' in windowing function(s) or wrap 'tbl.`col`' in first() (or first_value) if you don't care which value you get.;;
```
Also add new test sqls in `group-by.sql`.
Author: jiangxingbo <jiangxb1987@gmail.com>
Closes#15672 from jiangxb1987/groupBy-empty.
## What changes were proposed in this pull request?
Likewise [DataSet.scala](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L156) KeyValueGroupedDataset should mark the queryExecution as transient.
As mentioned in the Jira ticket, without transient we saw serialization issues like
```
Caused by: java.io.NotSerializableException: org.apache.spark.sql.execution.QueryExecution
Serialization stack:
- object not serializable (class: org.apache.spark.sql.execution.QueryExecution, value: ==
```
## How was this patch tested?
Run the query which is specified in the Jira ticket before and after:
```
val a = spark.createDataFrame(sc.parallelize(Seq((1,2),(3,4)))).as[(Int,Int)]
val grouped = a.groupByKey(
{x:(Int,Int)=>x._1}
)
val mappedGroups = grouped.mapGroups((k,x)=>
{(k,1)}
)
val yyy = sc.broadcast(1)
val last = mappedGroups.rdd.map(xx=>
{ val simpley = yyy.value 1 }
)
```
Author: Ergin Seyfe <eseyfe@fb.com>
Closes#15706 from seyfe/keyvaluegrouped_serialization.
## What changes were proposed in this pull request?
This is a follow-up to https://github.com/apache/spark/pull/15634.
## How was this patch tested?
N/A
Author: Liwei Lin <lwlin7@gmail.com>
Closes#15712 from lw-lin/18103.
## What changes were proposed in this pull request?
Simplify struct creation, especially the aspect of `CleanupAliases` which missed some aliases when handling trees created by `CreateStruct`.
This PR includes:
1. A failing test (create struct with nested aliases, some of the aliases survive `CleanupAliases`).
2. A fix that transforms `CreateStruct` into a `CreateNamedStruct` constructor, effectively eliminating `CreateStruct` from all expression trees.
3. A `NamePlaceHolder` used by `CreateStruct` when column names cannot be extracted from unresolved `NamedExpression`.
4. A new Analyzer rule that resolves `NamePlaceHolder` into a string literal once the `NamedExpression` is resolved.
5. `CleanupAliases` code was simplified as it no longer has to deal with `CreateStruct`'s top level columns.
## How was this patch tested?
running all tests-suits in package org.apache.spark.sql, especially including the analysis suite, making sure added test initially fails, after applying suggested fix rerun the entire analysis package successfully.
modified few tests that expected `CreateStruct` which is now transformed into `CreateNamedStruct`.
Credit goes to hvanhovell for assisting with this PR.
Author: eyal farago <eyal farago>
Author: eyal farago <eyal.farago@gmail.com>
Author: Herman van Hovell <hvanhovell@databricks.com>
Author: Eyal Farago <eyal.farago@actimize.com>
Author: Hyukjin Kwon <gurwls223@gmail.com>
Author: eyalfa <eyal.farago@gmail.com>
Closes#14444 from eyalfa/SPARK-16839_redundant_aliases_after_cleanupAliases.
## What changes were proposed in this pull request?
Currently an unqualified `getFunction(..)`call returns a wrong result; the returned function is shown as temporary function without a database. For example:
```
scala> sql("create function fn1 as 'org.apache.hadoop.hive.ql.udf.generic.GenericUDFAbs'")
res0: org.apache.spark.sql.DataFrame = []
scala> spark.catalog.getFunction("fn1")
res1: org.apache.spark.sql.catalog.Function = Function[name='fn1', className='org.apache.hadoop.hive.ql.udf.generic.GenericUDFAbs', isTemporary='true']
```
This PR fixes this by adding database information to ExpressionInfo (which is used to store the function information).
## How was this patch tested?
Added more thorough tests to `CatalogSuite`.
Author: Herman van Hovell <hvanhovell@databricks.com>
Closes#15542 from hvanhovell/SPARK-17996.
## What changes were proposed in this pull request?
When multiple records have the minimum value, the answer of ApproximatePercentile is wrong.
## How was this patch tested?
add a test case
Author: wangzhenhua <wangzhenhua@huawei.com>
Closes#15641 from wzhfy/percentile.
## What changes were proposed in this pull request?
This patch introduces an internal commit protocol API that is used by the batch data source to do write commits. It currently has only one implementation that uses Hadoop MapReduce's OutputCommitter API. In the future, this commit API can be used to unify streaming and batch commits.
## How was this patch tested?
Should be covered by existing write tests.
Author: Reynold Xin <rxin@databricks.com>
Author: Eric Liang <ekl@databricks.com>
Closes#15707 from rxin/SPARK-18024-2.
## What changes were proposed in this pull request?
When inserting into datasource tables with partitions managed by the hive metastore, we need to notify the metastore of newly added partitions. Previously this was implemented via `msck repair table`, but this is more expensive than needed.
This optimizes the insertion path to add only the updated partitions.
## How was this patch tested?
Existing tests (I verified manually that tests fail if the repair operation is omitted).
Author: Eric Liang <ekl@databricks.com>
Closes#15633 from ericl/spark-18087.
## What changes were proposed in this pull request?
The test `when schema inference is turned on, should read partition data` should not delete files because the source maybe is listing files. This PR just removes the delete actions since they are not necessary.
## How was this patch tested?
Jenkins
Author: Shixiong Zhu <shixiong@databricks.com>
Closes#15699 from zsxwing/SPARK-18030.
## What changes were proposed in this pull request?
### Problem
Iterative ML code may easily create query plans that grow exponentially. We found that query planning time also increases exponentially even when all the sub-plan trees are cached.
The following snippet illustrates the problem:
``` scala
(0 until 6).foldLeft(Seq(1, 2, 3).toDS) { (plan, iteration) =>
println(s"== Iteration $iteration ==")
val time0 = System.currentTimeMillis()
val joined = plan.join(plan, "value").join(plan, "value").join(plan, "value").join(plan, "value")
joined.cache()
println(s"Query planning takes ${System.currentTimeMillis() - time0} ms")
joined.as[Int]
}
// == Iteration 0 ==
// Query planning takes 9 ms
// == Iteration 1 ==
// Query planning takes 26 ms
// == Iteration 2 ==
// Query planning takes 53 ms
// == Iteration 3 ==
// Query planning takes 163 ms
// == Iteration 4 ==
// Query planning takes 700 ms
// == Iteration 5 ==
// Query planning takes 3418 ms
```
This is because when building a new Dataset, the new plan is always built upon `QueryExecution.analyzed`, which doesn't leverage existing cached plans.
On the other hand, usually, doing caching every a few iterations may not be the right direction for this problem since caching is too memory consuming (imaging computing connected components over a graph with 50 billion nodes). What we really need here is to truncate both the query plan (to minimize query planning time) and the lineage of the underlying RDD (to avoid stack overflow).
### Changes introduced in this PR
This PR tries to fix this issue by introducing a `checkpoint()` method into `Dataset[T]`, which does exactly the things described above. The following snippet, which is essentially the same as the one above but invokes `checkpoint()` instead of `cache()`, shows the micro benchmark result of this PR:
One key point is that the checkpointed Dataset should preserve the origianl partitioning and ordering information of the original Dataset, so that we can avoid unnecessary shuffling (similar to reading from a pre-bucketed table). This is done by adding `outputPartitioning` and `outputOrdering` to `LogicalRDD` and `RDDScanExec`.
### Micro benchmark
``` scala
spark.sparkContext.setCheckpointDir("/tmp/cp")
(0 until 100).foldLeft(Seq(1, 2, 3).toDS) { (plan, iteration) =>
println(s"== Iteration $iteration ==")
val time0 = System.currentTimeMillis()
val cp = plan.checkpoint()
cp.count()
System.out.println(s"Checkpointing takes ${System.currentTimeMillis() - time0} ms")
val time1 = System.currentTimeMillis()
val joined = cp.join(cp, "value").join(cp, "value").join(cp, "value").join(cp, "value")
val result = joined.as[Int]
println(s"Query planning takes ${System.currentTimeMillis() - time1} ms")
result
}
// == Iteration 0 ==
// Checkpointing takes 591 ms
// Query planning takes 13 ms
// == Iteration 1 ==
// Checkpointing takes 1605 ms
// Query planning takes 16 ms
// == Iteration 2 ==
// Checkpointing takes 782 ms
// Query planning takes 8 ms
// == Iteration 3 ==
// Checkpointing takes 729 ms
// Query planning takes 10 ms
// == Iteration 4 ==
// Checkpointing takes 734 ms
// Query planning takes 9 ms
// == Iteration 5 ==
// ...
// == Iteration 50 ==
// Checkpointing takes 571 ms
// Query planning takes 7 ms
// == Iteration 51 ==
// Checkpointing takes 548 ms
// Query planning takes 7 ms
// == Iteration 52 ==
// Checkpointing takes 596 ms
// Query planning takes 8 ms
// == Iteration 53 ==
// Checkpointing takes 568 ms
// Query planning takes 7 ms
// ...
```
You may see that although checkpointing is more heavy weight an operation, it always takes roughly the same amount of time to perform both checkpointing and query planning.
### Open question
mengxr mentioned that it would be more convenient if we can make `Dataset.checkpoint()` eager, i.e., always performs a `RDD.count()` after calling `RDD.checkpoint()`. Not quite sure whether this is a universal requirement. Maybe we can add a `eager: Boolean` argument for `Dataset.checkpoint()` to support that.
## How was this patch tested?
Unit test added in `DatasetSuite`.
Author: Cheng Lian <lian@databricks.com>
Author: Yin Huai <yhuai@databricks.com>
Closes#15651 from liancheng/ds-checkpoint.
## What changes were proposed in this pull request?
Because of the refactoring work in Structured Streaming, the event logs generated by Strucutred Streaming in Spark 2.0.0 and 2.0.1 cannot be parsed.
This PR just ignores these logs in ReplayListenerBus because no places use them.
## How was this patch tested?
- Generated events logs using Spark 2.0.0 and 2.0.1, and saved them as `structured-streaming-query-event-logs-2.0.0.txt` and `structured-streaming-query-event-logs-2.0.1.txt`
- The new added test makes sure ReplayListenerBus will skip these bad jsons.
Author: Shixiong Zhu <shixiong@databricks.com>
Closes#15663 from zsxwing/fix-event-log.
## What changes were proposed in this pull request?
Currently, `ANALYZE TABLE` command accepts `identifier` for option `NOSCAN`. This PR raises a ParseException for unknown option.
**Before**
```scala
scala> sql("create table test(a int)")
res0: org.apache.spark.sql.DataFrame = []
scala> sql("analyze table test compute statistics blah")
res1: org.apache.spark.sql.DataFrame = []
```
**After**
```scala
scala> sql("create table test(a int)")
res0: org.apache.spark.sql.DataFrame = []
scala> sql("analyze table test compute statistics blah")
org.apache.spark.sql.catalyst.parser.ParseException:
Expected `NOSCAN` instead of `blah`(line 1, pos 0)
```
## How was this patch tested?
Pass the Jenkins test with a new test case.
Author: Dongjoon Hyun <dongjoon@apache.org>
Closes#15640 from dongjoon-hyun/SPARK-18106.
## What changes were proposed in this pull request?
To reduce the number of components in SQL named *Catalog, rename *FileCatalog to *FileIndex. A FileIndex is responsible for returning the list of partitions / files to scan given a filtering expression.
```
TableFileCatalog => CatalogFileIndex
FileCatalog => FileIndex
ListingFileCatalog => InMemoryFileIndex
MetadataLogFileCatalog => MetadataLogFileIndex
PrunedTableFileCatalog => PrunedInMemoryFileIndex
```
cc yhuai marmbrus
## How was this patch tested?
N/A
Author: Eric Liang <ekl@databricks.com>
Author: Eric Liang <ekhliang@gmail.com>
Closes#15634 from ericl/rename-file-provider.
## What changes were proposed in this pull request?
The behavior of union is not well defined here. It is safer to explicitly execute these commands in order. The other use of `Union` in this way will be removed by https://github.com/apache/spark/pull/15633
## How was this patch tested?
Existing tests.
cc yhuai cloud-fan
Author: Eric Liang <ekhliang@gmail.com>
Author: Eric Liang <ekl@databricks.com>
Closes#15665 from ericl/spark-18146.
## What changes were proposed in this pull request?
Fixed the issue that ForeachSink didn't rethrow the exception.
## How was this patch tested?
The fixed unit test.
Author: Shixiong Zhu <shixiong@databricks.com>
Closes#15674 from zsxwing/foreach-sink-error.
## What changes were proposed in this pull request?
We should follow hive table and also store partition spec in metastore for data source table.
This brings 2 benefits:
1. It's more flexible to manage the table data files, as users can use `ADD PARTITION`, `DROP PARTITION` and `RENAME PARTITION`
2. We don't need to cache all file status for data source table anymore.
## How was this patch tested?
existing tests.
Author: Eric Liang <ekl@databricks.com>
Author: Michael Allman <michael@videoamp.com>
Author: Eric Liang <ekhliang@gmail.com>
Author: Wenchen Fan <wenchen@databricks.com>
Closes#15515 from cloud-fan/partition.
## What changes were proposed in this pull request?
A follow up PR for #14553 to fix the flaky test. It's flaky because the file list API doesn't guarantee any order of the return list.
## How was this patch tested?
Jenkins
Author: Shixiong Zhu <shixiong@databricks.com>
Closes#15661 from zsxwing/fix-StreamingQuerySuite.
## What changes were proposed in this pull request?
This PR is an enhancement of PR with commit ID:57dc326bd00cf0a49da971e9c573c48ae28acaa2.
NaN is a special type of value which is commonly seen as invalid. But We find that there are certain cases where NaN are also valuable, thus need special handling. We provided user when dealing NaN values with 3 options, to either reserve an extra bucket for NaN values, or remove the NaN values, or report an error, by setting handleNaN "keep", "skip", or "error"(default) respectively.
'''Before:
val bucketizer: Bucketizer = new Bucketizer()
.setInputCol("feature")
.setOutputCol("result")
.setSplits(splits)
'''After:
val bucketizer: Bucketizer = new Bucketizer()
.setInputCol("feature")
.setOutputCol("result")
.setSplits(splits)
.setHandleNaN("keep")
## How was this patch tested?
Tests added in QuantileDiscretizerSuite, BucketizerSuite and DataFrameStatSuite
Signed-off-by: VinceShieh <vincent.xieintel.com>
Author: VinceShieh <vincent.xie@intel.com>
Author: Vincent Xie <vincent.xie@intel.com>
Author: Joseph K. Bradley <joseph@databricks.com>
Closes#15428 from VinceShieh/spark-17219_followup.
## What changes were proposed in this pull request?
API and programming guide doc changes for Scala, Python and R.
## How was this patch tested?
manual test
Author: Felix Cheung <felixcheung_m@hotmail.com>
Closes#15629 from felixcheung/jsondoc.
## What changes were proposed in this pull request?
A short code snippet that uses toLocalIterator() on a dataframe produced by a RunnableCommand
reproduces the problem. toLocalIterator() is called by thriftserver when
`spark.sql.thriftServer.incrementalCollect`is set to handle queries producing large result
set.
**Before**
```SQL
scala> spark.sql("show databases")
res0: org.apache.spark.sql.DataFrame = [databaseName: string]
scala> res0.toLocalIterator()
16/10/26 03:00:24 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericInternalRow cannot be cast to org.apache.spark.sql.catalyst.expressions.UnsafeRow
```
**After**
```SQL
scala> spark.sql("drop database databases")
res30: org.apache.spark.sql.DataFrame = []
scala> spark.sql("show databases")
res31: org.apache.spark.sql.DataFrame = [databaseName: string]
scala> res31.toLocalIterator().asScala foreach println
[default]
[parquet]
```
## How was this patch tested?
Added a test in DDLSuite
Author: Dilip Biswal <dbiswal@us.ibm.com>
Closes#15642 from dilipbiswal/SPARK-18009.
## What changes were proposed in this pull request?
This PR contains changes to the Source trait such that the scheduler can notify data sources when it is safe to discard buffered data. Summary of changes:
* Added a method `commit(end: Offset)` that tells the Source that is OK to discard all offsets up `end`, inclusive.
* Changed the semantics of a `None` value for the `getBatch` method to mean "from the very beginning of the stream"; as opposed to "all data present in the Source's buffer".
* Added notes that the upper layers of the system will never call `getBatch` with a start value less than the last value passed to `commit`.
* Added a `lastCommittedOffset` method to allow the scheduler to query the status of each Source on restart. This addition is not strictly necessary, but it seemed like a good idea -- Sources will be maintaining their own persistent state, and there may be bugs in the checkpointing code.
* The scheduler in `StreamExecution.scala` now calls `commit` on its stream sources after marking each batch as complete in its checkpoint.
* `MemoryStream` now cleans committed batches out of its internal buffer.
* `TextSocketSource` now cleans committed batches from its internal buffer.
## How was this patch tested?
Existing regression tests already exercise the new code.
Author: frreiss <frreiss@us.ibm.com>
Closes#14553 from frreiss/fred-16963.
## What changes were proposed in this pull request?
Currently we have several test cases for group analytics(ROLLUP/CUBE/GROUPING SETS) in `SQLQuerySuite`, should better move them into a query file test.
The following test cases are moved to `group-analytics.sql`:
```
test("rollup")
test("grouping sets when aggregate functions containing groupBy columns")
test("cube")
test("grouping sets")
test("grouping and grouping_id")
test("grouping and grouping_id in having")
test("grouping and grouping_id in sort")
```
This is followup work of #15582
## How was this patch tested?
Modified query file `group-analytics.sql`, which will be tested by `SQLQueryTestSuite`.
Author: jiangxingbo <jiangxb1987@gmail.com>
Closes#15624 from jiangxb1987/group-analytics-test.
## What changes were proposed in this pull request?
Calling `Await.result` will allow other tasks to be run on the same thread when using ForkJoinPool. However, SQL uses a `ThreadLocal` execution id to trace Spark jobs launched by a query, which doesn't work perfectly in ForkJoinPool.
This PR just uses `Awaitable.result` instead to prevent ForkJoinPool from running other tasks in the current waiting thread.
## How was this patch tested?
Jenkins
Author: Shixiong Zhu <shixiong@databricks.com>
Closes#15520 from zsxwing/SPARK-13747.