## What changes were proposed in this pull request?
[SPARK-21595](https://issues.apache.org/jira/browse/SPARK-21595) reported that there is excessive spilling to disk due to default spill threshold for `ExternalAppendOnlyUnsafeRowArray` being quite small for WINDOW operator. Old behaviour of WINDOW operator (pre https://github.com/apache/spark/pull/16909) would hold data in an array for first 4096 records post which it would switch to `UnsafeExternalSorter` and start spilling to disk after reaching `spark.shuffle.spill.numElementsForceSpillThreshold` (or earlier if there was paucity of memory due to excessive consumers).
Currently the (switch from in-memory to `UnsafeExternalSorter`) and (`UnsafeExternalSorter` spilling to disk) for `ExternalAppendOnlyUnsafeRowArray` is controlled by a single threshold. This PR aims to separate that to have more granular control.
## How was this patch tested?
Added unit tests
Author: Tejas Patil <tejasp@fb.com>
Closes#18843 from tejasapatil/SPARK-21595.
Add an option to the JDBC data source to initialize the environment of the remote database session
## What changes were proposed in this pull request?
This proposes an option to the JDBC datasource, tentatively called " sessionInitStatement" to implement the functionality of session initialization present for example in the Sqoop connector for Oracle (see https://sqoop.apache.org/docs/1.4.6/SqoopUserGuide.html#_oraoop_oracle_session_initialization_statements ) . After each database session is opened to the remote DB, and before starting to read data, this option executes a custom SQL statement (or a PL/SQL block in the case of Oracle).
See also https://issues.apache.org/jira/browse/SPARK-21519
## How was this patch tested?
Manually tested using Spark SQL data source and Oracle JDBC
Author: LucaCanali <luca.canali@cern.ch>
Closes#18724 from LucaCanali/JDBC_datasource_sessionInitStatement.
## What changes were proposed in this pull request?
This patch introduces an internal interface for tracking metrics and/or statistics on data on the fly, as it is being written to disk during a `FileFormatWriter` job and partially reimplements SPARK-20703 in terms of it.
The interface basically consists of 3 traits:
- `WriteTaskStats`: just a tag for classes that represent statistics collected during a `WriteTask`
The only constraint it adds is that the class should be `Serializable`, as instances of it will be collected on the driver from all executors at the end of the `WriteJob`.
- `WriteTaskStatsTracker`: a trait for classes that can actually compute statistics based on tuples that are processed by a given `WriteTask` and eventually produce a `WriteTaskStats` instance.
- `WriteJobStatsTracker`: a trait for classes that act as containers of `Serializable` state that's necessary for instantiating `WriteTaskStatsTracker` on executors and finally process the resulting collection of `WriteTaskStats`, once they're gathered back on the driver.
Potential future use of this interface is e.g. CBO stats maintenance during `INSERT INTO table ... ` operations.
## How was this patch tested?
Existing tests for SPARK-20703 exercise the new code: `hive/SQLMetricsSuite`, `sql/JavaDataFrameReaderWriterSuite`, etc.
Author: Adrian Ionescu <adrian@databricks.com>
Closes#18884 from adrian-ionescu/write-stats-tracker-api.
## What changes were proposed in this pull request?
Currently `df.na.replace("*", Map[String, String]("NULL" -> null))` will produce exception.
This PR enables passing null/None as value in the replacement map in DataFrame.replace().
Note that the replacement map keys and values should still be the same type, while the values can have a mix of null/None and that type.
This PR enables following operations for example:
`df.na.replace("*", Map[String, String]("NULL" -> null))`(scala)
`df.na.replace("*", Map[Any, Any](60 -> null, 70 -> 80))`(scala)
`df.na.replace('Alice', None)`(python)
`df.na.replace([10, 20])`(python, replacing with None is by default)
One use case could be: I want to replace all the empty strings with null/None because they were incorrectly generated and then drop all null/None data
`df.na.replace("*", Map("" -> null)).na.drop()`(scala)
`df.replace(u'', None).dropna()`(python)
## How was this patch tested?
Scala unit test.
Python doctest and unit test.
Author: bravo-zhang <mzhang1230@gmail.com>
Closes#18820 from bravo-zhang/spark-14932.
## What changes were proposed in this pull request?
This PR is to add the spark version info in the table metadata. When creating the table, this value is assigned. It can help users find which version of Spark was used to create the table.
## How was this patch tested?
N/A
Author: gatorsmile <gatorsmile@gmail.com>
Closes#18709 from gatorsmile/addVersion.
## What changes were proposed in this pull request?
Window rangeBetween() API should allow literal boundary, that means, the window range frame can calculate frame of double/date/timestamp.
Example of the use case can be:
```
SELECT
val_timestamp,
cate,
avg(val_timestamp) OVER(PARTITION BY cate ORDER BY val_timestamp RANGE BETWEEN CURRENT ROW AND interval 23 days 4 hours FOLLOWING)
FROM testData
```
This PR refactors the Window `rangeBetween` and `rowsBetween` API, while the legacy user code should still be valid.
## How was this patch tested?
Add new test cases both in `DataFrameWindowFunctionsSuite` and in `window.sql`.
Author: Xingbo Jiang <xingbo.jiang@databricks.com>
Closes#18814 from jiangxb1987/literal-boundary.
## What changes were proposed in this pull request?
When I was investigating a flaky test, I realized that many places don't check the return value of `HDFSMetadataLog.get(batchId: Long): Option[T]`. When a batch is supposed to be there, the caller just ignores None rather than throwing an error. If some bug causes a query doesn't generate a batch metadata file, this behavior will hide it and allow the query continuing to run and finally delete metadata logs and make it hard to debug.
This PR ensures that places calling HDFSMetadataLog.get always check the return value.
## How was this patch tested?
Jenkins
Author: Shixiong Zhu <shixiong@databricks.com>
Closes#18799 from zsxwing/SPARK-21596.
## What changes were proposed in this pull request?
Taking over https://github.com/apache/spark/pull/18789 ; Closes#18789
Update Jackson to 2.6.7 uniformly, and some components to 2.6.7.1, to get some fixes and prep for Scala 2.12
## How was this patch tested?
Existing tests
Author: Sean Owen <sowen@cloudera.com>
Closes#18881 from srowen/SPARK-20433.
## What changes were proposed in this pull request?
If we create a type alias for a type workable with Dataset, the type alias doesn't work with Dataset.
A reproducible case looks like:
object C {
type TwoInt = (Int, Int)
def tupleTypeAlias: TwoInt = (1, 1)
}
Seq(1).toDS().map(_ => ("", C.tupleTypeAlias))
It throws an exception like:
type T1 is not a class
scala.ScalaReflectionException: type T1 is not a class
at scala.reflect.api.Symbols$SymbolApi$class.asClass(Symbols.scala:275)
...
This patch accesses the dealias of type in many places in `ScalaReflection` to fix it.
## How was this patch tested?
Added test case.
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#18813 from viirya/SPARK-21567.
## What changes were proposed in this pull request?
This commit adds a new argument for IllegalArgumentException message. This recent commit added the argument:
[dcac1d57f0)
## How was this patch tested?
Unit test have been passed
Please review http://spark.apache.org/contributing.html before opening a pull request.
Author: Marcos P. Sanchez <mpenate@stratio.com>
Closes#18862 from mpenate/feature/exception-errorifexists.
### What changes were proposed in this pull request?
```SQL
CREATE TABLE mytesttable1
USING org.apache.spark.sql.jdbc
OPTIONS (
url 'jdbc:mysql://${jdbcHostname}:${jdbcPort}/${jdbcDatabase}?user=${jdbcUsername}&password=${jdbcPassword}',
dbtable 'mytesttable1',
paritionColumn 'state_id',
lowerBound '0',
upperBound '52',
numPartitions '53',
fetchSize '10000'
)
```
The above option name `paritionColumn` is wrong. That mean, users did not provide the value for `partitionColumn`. In such case, users hit a confusing error.
```
AssertionError: assertion failed
java.lang.AssertionError: assertion failed
at scala.Predef$.assert(Predef.scala:156)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:39)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:312)
```
### How was this patch tested?
Added a test case
Author: gatorsmile <gatorsmile@gmail.com>
Closes#18864 from gatorsmile/jdbcPartCol.
## What changes were proposed in this pull request?
Propagate metadata in attribute replacement during streaming execution. This is necessary for EventTimeWatermarks consuming replaced attributes.
## How was this patch tested?
new unit test, which was verified to fail before the fix
Author: Jose Torres <joseph-torres@databricks.com>
Closes#18840 from joseph-torres/SPARK-21565.
## What changes were proposed in this pull request?
Enhanced some existing documentation
Please review http://spark.apache.org/contributing.html before opening a pull request.
Author: Mac <maclockard@gmail.com>
Closes#18710 from maclockard/maclockard-patch-1.
### What changes were proposed in this pull request?
author: BoleynSu
closes https://github.com/apache/spark/pull/18836
```Scala
val df = Seq((1, 1)).toDF("i", "j")
df.createOrReplaceTempView("T")
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
sql("select * from (select a.i from T a cross join T t where t.i = a.i) as t1 " +
"cross join T t2 where t2.i = t1.i").explain(true)
}
```
The above code could cause the following exception:
```
SortMergeJoinExec should not take Cross as the JoinType
java.lang.IllegalArgumentException: SortMergeJoinExec should not take Cross as the JoinType
at org.apache.spark.sql.execution.joins.SortMergeJoinExec.outputOrdering(SortMergeJoinExec.scala:100)
```
Our SortMergeJoinExec supports CROSS. We should not hit such an exception. This PR is to fix the issue.
### How was this patch tested?
Modified the two existing test cases.
Author: Xiao Li <gatorsmile@gmail.com>
Author: Boleyn Su <boleyn.su@gmail.com>
Closes#18863 from gatorsmile/pr-18836.
## What changes were proposed in this pull request?
**For moudle below:**
common/network-common
streaming
sql/core
sql/catalyst
**tests.jar will install or deploy twice.Like:**
`[DEBUG] Installing org.apache.spark:spark-streaming_2.11/maven-metadata.xml to /home/mi/.m2/repository/org/apache/spark/spark-streaming_2.11/maven-metadata-local.xml
[INFO] Installing /home/mi/Work/Spark/scala2.11/spark/streaming/target/spark-streaming_2.11-2.1.0-mdh2.1.0.1-SNAPSHOT-tests.jar to /home/mi/.m2/repository/org/apache/spark/spark-streaming_2.11/2.1.0-mdh2.1.0.1-SNAPSHOT/spark-streaming_2.11-2.1.0-mdh2.1.0.1-SNAPSHOT-tests.jar
[DEBUG] Skipped re-installing /home/mi/Work/Spark/scala2.11/spark/streaming/target/spark-streaming_2.11-2.1.0-mdh2.1.0.1-SNAPSHOT-tests.jar to /home/mi/.m2/repository/org/apache/spark/spark-streaming_2.11/2.1.0-mdh2.1.0.1-SNAPSHOT/spark-streaming_2.11-2.1.0-mdh2.1.0.1-SNAPSHOT-tests.jar, seems unchanged`
**The reason is below:**
`[DEBUG] (f) artifact = org.apache.spark:spark-streaming_2.11🫙2.1.0-mdh2.1.0.1-SNAPSHOT
[DEBUG] (f) attachedArtifacts = [org.apache.spark:spark-streaming_2.11:test-jar:tests:2.1.0-mdh2.1.0.1-SNAPSHOT, org.apache.spark:spark-streaming_2.11🫙tests:2.1.0-mdh2.1.0.1-SNAPSHOT, org.apache.spark:spark
-streaming_2.11:java-source:sources:2.1.0-mdh2.1.0.1-SNAPSHOT, org.apache.spark:spark-streaming_2.11:java-source:test-sources:2.1.0-mdh2.1.0.1-SNAPSHOT, org.apache.spark:spark-streaming_2.11:javadoc:javadoc:2.1.0
-mdh2.1.0.1-SNAPSHOT]`
when executing 'mvn deploy' to nexus during release.I will fail since release nexus can not be overrided.
## How was this patch tested?
Execute 'mvn clean install -Pyarn -Phadoop-2.6 -Phadoop-provided -DskipTests'
Author: zhoukang <zhoukang199191@gmail.com>
Closes#18745 from caneGuy/zhoukang/fix-installtwice.
## What changes were proposed in this pull request?
This pr (follow-up of #18772) used `UnresolvedSubqueryColumnAliases` for `visitTableName` in `AstBuilder`, which is a new unresolved `LogicalPlan` implemented in #18185.
## How was this patch tested?
Existing tests
Author: Takeshi Yamamuro <yamamuro@apache.org>
Closes#18857 from maropu/SPARK-20963-FOLLOWUP.
## What changes were proposed in this pull request?
Since Spark 2.0.0, SET hive config commands do not pass the values to HiveClient, this PR point out user to set hive config before SparkSession is initialized when they try to set hive config.
## How was this patch tested?
manual tests
<img width="1637" alt="spark-set" src="https://user-images.githubusercontent.com/5399861/29001141-03f943ee-7ab3-11e7-8584-ba5a5e81f6ad.png">
Author: Yuming Wang <wgyumg@gmail.com>
Closes#18769 from wangyum/SPARK-21574.
## What changes were proposed in this pull request?
In SQLContext.get(key,null) for a key that is not defined in the conf, and doesn't have a default value defined, throws a NPE. Int happens only when conf has a value converter
Added null check on defaultValue inside SQLConf.getConfString to avoid calling entry.valueConverter(defaultValue)
## How was this patch tested?
Added unit test
Author: vinodkc <vinod.kc.in@gmail.com>
Closes#18852 from vinodkc/br_Fix_SPARK-21588.
## What changes were proposed in this pull request?
This pr added parsing rules to support column aliases for join relations in FROM clause.
This pr is a sub-task of #18079.
## How was this patch tested?
Added tests in `AnalysisSuite`, `PlanParserSuite,` and `SQLQueryTestSuite`.
Author: Takeshi Yamamuro <yamamuro@apache.org>
Closes#18772 from maropu/SPARK-20963-2.
## What changes were proposed in this pull request?
This PR includes the changes to make the string "errorifexists" also valid for ErrorIfExists save mode.
## How was this patch tested?
Unit tests and manual tests
Author: arodriguez <arodriguez@arodriguez.stratio>
Closes#18844 from ardlema/SPARK-21640.
## What changes were proposed in this pull request?
This PR proposes to separate `extended` into `examples` and `arguments` internally so that both can be separately documented and add `since` and `note` for additional information.
For `since`, it looks users sometimes get confused by, up to my knowledge, missing version information. For example, see https://www.mail-archive.com/userspark.apache.org/msg64798.html
For few good examples to check the built documentation, please see both:
`from_json` - https://spark-test.github.io/sparksqldoc/#from_json
`like` - https://spark-test.github.io/sparksqldoc/#like
For `DESCRIBE FUNCTION`, `note` and `since` are added as below:
```
> DESCRIBE FUNCTION EXTENDED rlike;
...
Extended Usage:
Arguments:
...
Examples:
...
Note:
Use LIKE to match with simple string pattern
```
```
> DESCRIBE FUNCTION EXTENDED to_json;
...
Examples:
...
Since: 2.2.0
```
For the complete documentation, see https://spark-test.github.io/sparksqldoc/
## How was this patch tested?
Manual tests and existing tests. Please see https://spark-test.github.io/sparksqldoc
Jenkins tests are needed to double check
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#18749 from HyukjinKwon/followup-sql-doc-gen.
## What changes were proposed in this pull request?
create temporary view data as select * from values
(1, 1),
(1, 2),
(2, 1),
(2, 2),
(3, 1),
(3, 2)
as data(a, b);
`select 3, 4, sum(b) from data group by 1, 2;`
`select 3 as c, 4 as d, sum(b) from data group by c, d;`
When running these two cases, the following exception occurred:
`Error in query: GROUP BY position 4 is not in select list (valid range is [1, 3]); line 1 pos 10`
The cause of this failure:
If an aggregateExpression is integer, after replaced with this aggregateExpression, the
groupExpression still considered as an ordinal.
The solution:
This bug is due to re-entrance of an analyzed plan. We can solve it by using `resolveOperators` in `SubstituteUnresolvedOrdinals`.
## How was this patch tested?
Added unit test case
Author: liuxian <liu.xian3@zte.com.cn>
Closes#18779 from 10110346/groupby.
## What changes were proposed in this pull request?
This PR replaces #18623 to do some clean up.
Closes#18623
## How was this patch tested?
Jenkins
Author: Shixiong Zhu <shixiong@databricks.com>
Author: Andrey Taptunov <taptunov@amazon.com>
Closes#18848 from zsxwing/review-pr18623.
## What changes were proposed in this pull request?
OneRowRelation is the only plan that is a case object, which causes some issues with makeCopy using a 0-arg constructor. This patch changes it from a case object to a case class.
This blocks SPARK-21619.
## How was this patch tested?
Should be covered by existing test cases.
Author: Reynold Xin <rxin@databricks.com>
Closes#18839 from rxin/SPARK-21634.
## What changes were proposed in this pull request?
Hive `pmod(3.13, 0)`:
```:sql
hive> select pmod(3.13, 0);
OK
NULL
Time taken: 2.514 seconds, Fetched: 1 row(s)
hive>
```
Spark `mod(3.13, 0)`:
```:sql
spark-sql> select mod(3.13, 0);
NULL
spark-sql>
```
But the Spark `pmod(3.13, 0)`:
```:sql
spark-sql> select pmod(3.13, 0);
17/06/25 09:35:58 ERROR SparkSQLDriver: Failed in [select pmod(3.13, 0)]
java.lang.NullPointerException
at org.apache.spark.sql.catalyst.expressions.Pmod.pmod(arithmetic.scala:504)
at org.apache.spark.sql.catalyst.expressions.Pmod.nullSafeEval(arithmetic.scala:432)
at org.apache.spark.sql.catalyst.expressions.BinaryExpression.eval(Expression.scala:419)
at org.apache.spark.sql.catalyst.expressions.UnaryExpression.eval(Expression.scala:323)
...
```
This PR make `pmod(number, 0)` to null.
## How was this patch tested?
unit tests
Author: Yuming Wang <wgyumg@gmail.com>
Closes#18413 from wangyum/SPARK-21205.
## What changes were proposed in this pull request?
An overflow of the difference of bounds on the partitioning column leads to no data being read. This
patch checks for this overflow.
## How was this patch tested?
New unit test.
Author: Andrew Ray <ray.andrew@gmail.com>
Closes#18800 from aray/SPARK-21330.
## What changes were proposed in this pull request?
When the watermark is not a column of `dropDuplicates`, right now it will crash. This PR fixed this issue.
## How was this patch tested?
The new unit test.
Author: Shixiong Zhu <shixiong@databricks.com>
Closes#18822 from zsxwing/SPARK-21546.
## What changes were proposed in this pull request?
This PR fixed a potential overflow issue in EventTimeStats.
## How was this patch tested?
The new unit tests
Author: Shixiong Zhu <shixiong@databricks.com>
Closes#18803 from zsxwing/avg.
## What changes were proposed in this pull request?
When BytesToBytesMap spills, its longArray should be released. Otherwise, it may not released until the task complete. This array may take a significant amount of memory, which cannot be used by later operator, such as UnsafeShuffleExternalSorter, resulting in more frequent spill in sorter. This patch release the array as destructive iterator will not use this array anymore.
## How was this patch tested?
Manual test in production
Author: Zhan Zhang <zhanzhang@fb.com>
Closes#17180 from zhzhan/memory.
## What changes were proposed in this pull request?
The format of none should be consistent with other compression codec(\`snappy\`, \`lz4\`) as \`none\`.
## How was this patch tested?
This is a typo.
Author: GuoChenzhao <chenzhao.guo@intel.com>
Closes#18758 from gczsjdy/typo.
## What changes were proposed in this pull request?
This pr added parsing rules to support subquery column aliases in FROM clause.
This pr is a sub-task of #18079.
## How was this patch tested?
Added tests in `PlanParserSuite` and `SQLQueryTestSuite`.
Author: Takeshi Yamamuro <yamamuro@apache.org>
Closes#18185 from maropu/SPARK-20962.
## What changes were proposed in this pull request?
Long values can be passed to `rangeBetween` as range frame boundaries, but we silently convert it to Int values, this can cause wrong results and we should fix this.
Further more, we should accept any legal literal values as range frame boundaries. In this PR, we make it possible for Long values, and make accepting other DataTypes really easy to add.
This PR is mostly based on Herman's previous amazing work: 596f53c339
After this been merged, we can close#16818 .
## How was this patch tested?
Add new tests in `DataFrameWindowFunctionsSuite` and `TypeCoercionSuite`.
Author: Xingbo Jiang <xingbo.jiang@databricks.com>
Closes#18540 from jiangxb1987/rangeFrame.
## What changes were proposed in this pull request?
When there are aliases (these aliases were added for nested fields) as parameters in `RuntimeReplaceable`, as they are not in the children expression, those aliases can't be cleaned up in analyzer rule `CleanupAliases`.
An expression `nvl(foo.foo1, "value")` can be resolved to two semantically different expressions in a group by query because they contain different aliases.
Because those aliases are not children of `RuntimeReplaceable` which is an `UnaryExpression`. So we can't trim the aliases out by simple transforming the expressions in `CleanupAliases`.
If we want to replace the non-children aliases in `RuntimeReplaceable`, we need to add more codes to `RuntimeReplaceable` and modify all expressions of `RuntimeReplaceable`. It makes the interface ugly IMO.
Consider those aliases will be replaced later at optimization and so they're no harm, this patch chooses to simply override `canonicalized` of `RuntimeReplaceable`.
One concern is about `CleanupAliases`. Because it actually cannot clean up ALL aliases inside a plan. To make caller of this rule notice that, this patch adds a comment to `CleanupAliases`.
## How was this patch tested?
Added test.
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#18761 from viirya/SPARK-21555.
## What changes were proposed in this pull request?
Fixes current failures in dev/lint-java
## How was this patch tested?
Existing linter, tests.
Author: Sean Owen <sowen@cloudera.com>
Closes#18757 from srowen/LintJava.
## What changes were proposed in this pull request?
This PR contains a tiny update that removes an attribute resolution inconsistency in the Dataset API. The following example is taken from the ticket description:
```
spark.range(1).withColumnRenamed("id", "x").sort(col("id")) // works
spark.range(1).withColumnRenamed("id", "x").sort($"id") // works
spark.range(1).withColumnRenamed("id", "x").sort('id) // works
spark.range(1).withColumnRenamed("id", "x").sort("id") // fails with:
org.apache.spark.sql.AnalysisException: Cannot resolve column name "id" among (x);
```
The above `AnalysisException` happens because the last case calls `Dataset.apply()` to convert strings into columns, which triggers attribute resolution. To make the API consistent between overloaded methods, this PR defers the resolution and constructs columns directly.
Author: aokolnychyi <anton.okolnychyi@sap.com>
Closes#18740 from aokolnychyi/spark-21538.
## What changes were proposed in this pull request?
`UnsafeExternalSorter.recordComparator` can be either `KVComparator` or `RowComparator`, and both of them will keep the reference to the input rows they compared last time.
After sorting, we return the sorted iterator to upstream operators. However, the upstream operators may take a while to consume up the sorted iterator, and `UnsafeExternalSorter` is registered to `TaskContext` at [here](https://github.com/apache/spark/blob/v2.2.0/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java#L159-L161), which means we will keep the `UnsafeExternalSorter` instance and keep the last compared input rows in memory until the sorted iterator is consumed up.
Things get worse if we sort within partitions of a dataset and coalesce all partitions into one, as we will keep a lot of input rows in memory and the time to consume up all the sorted iterators is long.
This PR takes over https://github.com/apache/spark/pull/18543 , the idea is that, we do not keep the record comparator instance in `UnsafeExternalSorter`, but a generator of record comparator.
close#18543
## How was this patch tested?
N/A
Author: Wenchen Fan <wenchen@databricks.com>
Closes#18679 from cloud-fan/memory-leak.
## What changes were proposed in this pull request?
This is a refactoring of `ArrowConverters` and related classes.
1. Refactor `ColumnWriter` as `ArrowWriter`.
2. Add `ArrayType` and `StructType` support.
3. Refactor `ArrowConverters` to skip intermediate `ArrowRecordBatch` creation.
## How was this patch tested?
Added some tests and existing tests.
Author: Takuya UESHIN <ueshin@databricks.com>
Closes#18655 from ueshin/issues/SPARK-21440.
## What changes were proposed in this pull request?
This PR ensures that `Unsafe.sizeInBytes` must be a multiple of 8. It it is not satisfied. `Unsafe.hashCode` causes the assertion violation.
## How was this patch tested?
Will add test cases
Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>
Closes#18503 from kiszk/SPARK-21271.
## What changes were proposed in this pull request?
This generates a documentation for Spark SQL built-in functions.
One drawback is, this requires a proper build to generate built-in function list.
Once it is built, it only takes few seconds by `sql/create-docs.sh`.
Please see https://spark-test.github.io/sparksqldoc/ that I hosted to show the output documentation.
There are few more works to be done in order to make the documentation pretty, for example, separating `Arguments:` and `Examples:` but I guess this should be done within `ExpressionDescription` and `ExpressionInfo` rather than manually parsing it. I will fix these in a follow up.
This requires `pip install mkdocs` to generate HTMLs from markdown files.
## How was this patch tested?
Manually tested:
```
cd docs
jekyll build
```
,
```
cd docs
jekyll serve
```
and
```
cd sql
create-docs.sh
```
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#18702 from HyukjinKwon/SPARK-21485.
### What changes were proposed in this pull request?
Like [Hive UDFType](https://hive.apache.org/javadocs/r2.0.1/api/org/apache/hadoop/hive/ql/udf/UDFType.html), we should allow users to add the extra flags for ScalaUDF and JavaUDF too. _stateful_/_impliesOrder_ are not applicable to our Scala UDF. Thus, we only add the following two flags.
- deterministic: Certain optimizations should not be applied if UDF is not deterministic. Deterministic UDF returns same result each time it is invoked with a particular input. This determinism just needs to hold within the context of a query.
When the deterministic flag is not correctly set, the results could be wrong.
For ScalaUDF in Dataset APIs, users can call the following extra APIs for `UserDefinedFunction` to make the corresponding changes.
- `nonDeterministic`: Updates UserDefinedFunction to non-deterministic.
Also fixed the Java UDF name loss issue.
Will submit a separate PR for `distinctLike` for UDAF
### How was this patch tested?
Added test cases for both ScalaUDF
Author: gatorsmile <gatorsmile@gmail.com>
Author: Wenchen Fan <cloud0fan@gmail.com>
Closes#17848 from gatorsmile/udfRegister.
## What changes were proposed in this pull request?
This PR ensures to call `super.afterEach()` in overriding `afterEach()` method in `DatasetCacheSuite`. When we override `afterEach()` method in Testsuite, we have to call `super.afterEach()`.
This is a follow-up of #18719 and SPARK-21512.
## How was this patch tested?
Used the existing test suite
Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>
Closes#18721 from kiszk/SPARK-21516.
## What changes were proposed in this pull request?
In #18483 , we fixed the data copy bug when saving into `InternalRow`, and removed all workarounds for this bug in the aggregate code path. However, the object hash aggregate was missed, this PR fixes it.
This patch is also a requirement for #17419 , which shows that DataFrame version is slower than RDD version because of this issue.
## How was this patch tested?
existing tests
Author: Wenchen Fan <wenchen@databricks.com>
Closes#18712 from cloud-fan/minor.
## What changes were proposed in this pull request?
This PR avoids to reuse unpersistent dataset among test cases by making dataset unpersistent at the end of each test case.
In `DatasetCacheSuite`, the test case `"get storage level"` does not make dataset unpersisit after make the dataset persisitent. The same dataset will be made persistent by the test case `"persist and then rebind right encoder when join 2 datasets"` Thus, we run these test cases, the second case does not perform to make dataset persistent. This is because in
When we run only the second case, it performs to make dataset persistent. It is not good to change behavior of the second test suite. The first test case should correctly make dataset unpersistent.
```
Testing started at 17:52 ...
01:52:15.053 WARN org.apache.hadoop.util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
01:52:48.595 WARN org.apache.spark.sql.execution.CacheManager: Asked to cache already cached data.
01:52:48.692 WARN org.apache.spark.sql.execution.CacheManager: Asked to cache already cached data.
01:52:50.864 WARN org.apache.spark.storage.RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
01:52:50.864 WARN org.apache.spark.storage.RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
01:52:50.868 WARN org.apache.spark.storage.BlockManager: Block rdd_8_1 replicated to only 0 peer(s) instead of 1 peers
01:52:50.868 WARN org.apache.spark.storage.BlockManager: Block rdd_8_0 replicated to only 0 peer(s) instead of 1 peers
```
After this PR, these messages do not appear
```
Testing started at 18:14 ...
02:15:05.329 WARN org.apache.hadoop.util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Process finished with exit code 0
```
## How was this patch tested?
Used the existing test
Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>
Closes#18719 from kiszk/SPARK-21512.
## What changes were proposed in this pull request?
This patch removes the `****` string from test names in FlatMapGroupsWithStateSuite. `***` is a common string developers grep for when using Scala test (because it immediately shows the failing test cases). The existence of the `****` in test names disrupts that workflow.
## How was this patch tested?
N/A - test only change.
Author: Reynold Xin <rxin@databricks.com>
Closes#18715 from rxin/FlatMapGroupsWithStateStar.
## What changes were proposed in this pull request?
This is a follow-up of #18680.
In some environment, a compile error happens saying:
```
.../sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java:243:
error: not found: type Array
public void loadBytes(Array array) {
^
```
This pr fixes it.
## How was this patch tested?
Existing tests.
Author: Takuya UESHIN <ueshin@databricks.com>
Closes#18701 from ueshin/issues/SPARK-21472_fup1.
## What changes were proposed in this pull request?
DirectParquetOutputCommitter was removed from Spark as it was deemed unsafe to use. We however still have some code to generate warning. This patch removes those code as well.
This is kind of a follow-up of https://github.com/apache/spark/pull/16796
## How was this patch tested?
existing tests
Author: Wenchen Fan <wenchen@databricks.com>
Closes#18689 from cloud-fan/minor.
## What changes were proposed in this pull request?
Introducing `ArrowColumnVector` as a reader for Arrow vectors.
It extends `ColumnVector`, so we will be able to use it with `ColumnarBatch` and its functionalities.
Currently it supports primitive types and `StringType`, `ArrayType` and `StructType`.
## How was this patch tested?
Added tests for `ArrowColumnVector` and existing tests.
Author: Takuya UESHIN <ueshin@databricks.com>
Closes#18680 from ueshin/issues/SPARK-21472.
## What changes were proposed in this pull request?
This PR is to mark the parameter `rows` and `unsafeRow` of LocalTableScanExec transient. It can avoid serializing the unneeded objects.
## How was this patch tested?
N/A
Author: gatorsmile <gatorsmile@gmail.com>
Closes#18686 from gatorsmile/LocalTableScanExec.
## What changes were proposed in this pull request?
This is the reopen of https://github.com/apache/spark/pull/14198, with merge conflicts resolved.
ueshin Could you please take a look at my code?
Fix bugs about types that result an array of null when creating DataFrame using python.
Python's array.array have richer type than python itself, e.g. we can have `array('f',[1,2,3])` and `array('d',[1,2,3])`. Codes in spark-sql and pyspark didn't take this into consideration which might cause a problem that you get an array of null values when you have `array('f')` in your rows.
A simple code to reproduce this bug is:
```
from pyspark import SparkContext
from pyspark.sql import SQLContext,Row,DataFrame
from array import array
sc = SparkContext()
sqlContext = SQLContext(sc)
row1 = Row(floatarray=array('f',[1,2,3]), doublearray=array('d',[1,2,3]))
rows = sc.parallelize([ row1 ])
df = sqlContext.createDataFrame(rows)
df.show()
```
which have output
```
+---------------+------------------+
| doublearray| floatarray|
+---------------+------------------+
|[1.0, 2.0, 3.0]|[null, null, null]|
+---------------+------------------+
```
## How was this patch tested?
New test case added
Author: Xiang Gao <qasdfgtyuiop@gmail.com>
Author: Gao, Xiang <qasdfgtyuiop@gmail.com>
Author: Takuya UESHIN <ueshin@databricks.com>
Closes#18444 from zasdfgbnm/fix_array_infer.
## What changes were proposed in this pull request?
When using the MetadataLogFileIndex to read back a table, we don't respect the user provided schema as the proper column types. This can lead to issues when trying to read strings that look like dates that get truncated to DateType, or longs being truncated to IntegerType, just because a long value doesn't exist.
## How was this patch tested?
Unit tests and manual tests
Author: Burak Yavuz <brkyvz@gmail.com>
Closes#18676 from brkyvz/stream-partitioning.
## What changes were proposed in this pull request?
Two invalid join types were mistakenly listed in the javadoc for joinWith, in the Dataset class. I presume these were copied from the javadoc of join, but since joinWith returns a Dataset\<Tuple2\>, left_semi and left_anti are invalid, as they only return values from one of the datasets, instead of from both
## How was this patch tested?
I ran the following code :
```
public static void main(String[] args) {
SparkSession spark = new SparkSession(new SparkContext("local[*]", "Test"));
Dataset<Row> one = spark.createDataFrame(Arrays.asList(new Bean(1), new Bean(2), new Bean(3), new Bean(4), new Bean(5)), Bean.class);
Dataset<Row> two = spark.createDataFrame(Arrays.asList(new Bean(4), new Bean(5), new Bean(6), new Bean(7), new Bean(8), new Bean(9)), Bean.class);
try {two.joinWith(one, one.col("x").equalTo(two.col("x")), "inner").show();} catch (Exception e) {e.printStackTrace();}
try {two.joinWith(one, one.col("x").equalTo(two.col("x")), "cross").show();} catch (Exception e) {e.printStackTrace();}
try {two.joinWith(one, one.col("x").equalTo(two.col("x")), "outer").show();} catch (Exception e) {e.printStackTrace();}
try {two.joinWith(one, one.col("x").equalTo(two.col("x")), "full").show();} catch (Exception e) {e.printStackTrace();}
try {two.joinWith(one, one.col("x").equalTo(two.col("x")), "full_outer").show();} catch (Exception e) {e.printStackTrace();}
try {two.joinWith(one, one.col("x").equalTo(two.col("x")), "left").show();} catch (Exception e) {e.printStackTrace();}
try {two.joinWith(one, one.col("x").equalTo(two.col("x")), "left_outer").show();} catch (Exception e) {e.printStackTrace();}
try {two.joinWith(one, one.col("x").equalTo(two.col("x")), "right").show();} catch (Exception e) {e.printStackTrace();}
try {two.joinWith(one, one.col("x").equalTo(two.col("x")), "right_outer").show();} catch (Exception e) {e.printStackTrace();}
try {two.joinWith(one, one.col("x").equalTo(two.col("x")), "left_semi").show();} catch (Exception e) {e.printStackTrace();}
try {two.joinWith(one, one.col("x").equalTo(two.col("x")), "left_anti").show();} catch (Exception e) {e.printStackTrace();}
}
```
which tests all the different join types, and the last two (left_semi and left_anti) threw exceptions. The same code using join instead of joinWith did fine. The Bean class was just a java bean with a single int field, x.
Author: Corey Woodfield <coreywoodfield@gmail.com>
Closes#18462 from coreywoodfield/master.
## What changes were proposed in this pull request?
JIRA Issue: https://issues.apache.org/jira/browse/SPARK-21446
options.asConnectionProperties can not have fetchsize,because fetchsize belongs to Spark-only options, and Spark-only options have been excluded in connection properities.
So change properties of beforeFetch from options.asConnectionProperties.asScala.toMap to options.asProperties.asScala.toMap
## How was this patch tested?
Author: DFFuture <albert.zhang23@gmail.com>
Closes#18665 from DFFuture/sparksql_pg.
## What changes were proposed in this pull request?
Use of `ProcessingTime` class was deprecated in favor of `Trigger.ProcessingTime` in Spark 2.2. However interval uses to ProcessingTime causes deprecation warnings during compilation. This cannot be avoided entirely as even though it is deprecated as a public API, ProcessingTime instances are used internally in TriggerExecutor. This PR is to minimize the warning by removing its uses from tests as much as possible.
## How was this patch tested?
Existing tests.
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes#18678 from tdas/SPARK-21464.
## What changes were proposed in this pull request?
https://issues.apache.org/jira/projects/SPARK/issues/SPARK-21441
This issue can be reproduced by the following example:
```
val spark = SparkSession
.builder()
.appName("smj-codegen")
.master("local")
.config("spark.sql.autoBroadcastJoinThreshold", "1")
.getOrCreate()
val df1 = spark.createDataFrame(Seq((1, 1), (2, 2), (3, 3))).toDF("key", "int")
val df2 = spark.createDataFrame(Seq((1, "1"), (2, "2"), (3, "3"))).toDF("key", "str")
val df = df1.join(df2, df1("key") === df2("key"))
.filter("int = 2 or reflect('java.lang.Integer', 'valueOf', str) = 1")
.select("int")
df.show()
```
To conclude, the issue happens when:
(1) SortMergeJoin condition contains CodegenFallback expressions.
(2) In PhysicalPlan tree, SortMergeJoin node is the child of root node, e.g., the Project in above example.
This patch fixes the logic in `CollapseCodegenStages` rule.
## How was this patch tested?
Unit test and manual verification in our cluster.
Author: donnyzone <wellfengzhu@gmail.com>
Closes#18656 from DonnyZone/Fix_SortMergeJoinExec.
## What changes were proposed in this pull request?
In `SlidingWindowFunctionFrame`, it is now adding all rows to the buffer for which the input row value is equal to or less than the output row upper bound, then drop all rows from the buffer for which the input row value is smaller than the output row lower bound.
This could result in the buffer is very big though the window is small.
For example:
```
select a, b, sum(a)
over (partition by b order by a range between 1000000 following and 1000001 following)
from table
```
We can refine the logic and just add the qualified rows into buffer.
## How was this patch tested?
Manual test:
Run sql
`select shop, shopInfo, district, sum(revenue) over(partition by district order by revenue range between 100 following and 200 following) from revenueList limit 10`
against a table with 4 columns(shop: String, shopInfo: String, district: String, revenue: Int). The biggest partition is around 2G bytes, containing 200k lines.
Configure the executor with 2G bytes memory.
With the change in this pr, it works find. Without this change, below exception will be thrown.
```
MemoryError: Java heap space
at org.apache.spark.sql.catalyst.expressions.UnsafeRow.copy(UnsafeRow.java:504)
at org.apache.spark.sql.catalyst.expressions.UnsafeRow.copy(UnsafeRow.java:62)
at org.apache.spark.sql.execution.window.SlidingWindowFunctionFrame.write(WindowFunctionFrame.scala:201)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun$14$$anon$1.next(WindowExec.scala:365)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun$14$$anon$1.next(WindowExec.scala:289)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:341)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
```
Author: jinxing <jinxing6042@126.com>
Closes#18634 from jinxing64/SPARK-21414.
## What changes were proposed in this pull request?
Add EmptyDirectoryWriteTask for empty task while writing files. Fix the empty result for parquet format by leaving the first partition for meta writing.
## How was this patch tested?
Add new test in `FileFormatWriterSuite `
Author: xuanyuanking <xyliyuanjian@gmail.com>
Closes#18654 from xuanyuanking/SPARK-21435.
## What changes were proposed in this pull request?
- Added batchId to StreamingQueryProgress.json as that was missing from the generated json.
- Also, removed recently added numPartitions from StatefulOperatorProgress as this value does not change through the query run, and there are other ways to find that.
## How was this patch tested?
Updated unit tests
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes#18675 from tdas/SPARK-21462.
## What changes were proposed in this pull request?
Address scapegoat warnings for:
- BigDecimal double constructor
- Catching NPE
- Finalizer without super
- List.size is O(n)
- Prefer Seq.empty
- Prefer Set.empty
- reverse.map instead of reverseMap
- Type shadowing
- Unnecessary if condition.
- Use .log1p
- Var could be val
In some instances like Seq.empty, I avoided making the change even where valid in test code to keep the scope of the change smaller. Those issues are concerned with performance and it won't matter for tests.
## How was this patch tested?
Existing tests
Author: Sean Owen <sowen@cloudera.com>
Closes#18635 from srowen/Scapegoat1.
## What changes were proposed in this pull request?
Implementation may expose both timing as well as size metrics. This PR enables that.
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes#18661 from tdas/SPARK-21409-2.
## What changes were proposed in this pull request?
Currently, there is no tracking of memory usage of state stores. This JIRA is to expose that through SQL metrics and StreamingQueryProgress.
Additionally, added the ability to expose implementation-specific metrics through the StateStore APIs to the SQLMetrics.
## How was this patch tested?
Added unit tests.
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes#18629 from tdas/SPARK-21409.
### What changes were proposed in this pull request?
The build-in functions `input_file_name`, `input_file_block_start`, `input_file_block_length` do not support more than one sources, like what Hive does. Currently, Spark does not block it and the outputs are ambiguous/non-deterministic. It could be from any side.
```
hive> select *, INPUT__FILE__NAME FROM t1, t2;
FAILED: SemanticException Column INPUT__FILE__NAME Found in more than One Tables/Subqueries
```
This PR blocks it and issues an error.
### How was this patch tested?
Added a test case
Author: gatorsmile <gatorsmile@gmail.com>
Closes#18580 from gatorsmile/inputFileName.
## What changes were proposed in this pull request?
Follow up to a few comments on https://github.com/apache/spark/pull/17150#issuecomment-315020196 that couldn't be addressed before it was merged.
## How was this patch tested?
Existing tests.
Author: Sean Owen <sowen@cloudera.com>
Closes#18646 from srowen/SPARK-19810.2.
## What changes were proposed in this pull request?
This PR fixes a wrong comparison for `BinaryType`. This PR enables unsigned comparison and unsigned prefix generation for an array for `BinaryType`. Previous implementations uses signed operations.
## How was this patch tested?
Added a test suite in `OrderingSuite`.
Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>
Closes#18571 from kiszk/SPARK-21344.
## What changes were proposed in this pull request?
Add the query id as a local property to allow source and sink using it.
## How was this patch tested?
The new unit test.
Author: Shixiong Zhu <shixiong@databricks.com>
Closes#18638 from zsxwing/SPARK-21421.
## What changes were proposed in this pull request?
- Remove Scala 2.10 build profiles and support
- Replace some 2.10 support in scripts with commented placeholders for 2.12 later
- Remove deprecated API calls from 2.10 support
- Remove usages of deprecated context bounds where possible
- Remove Scala 2.10 workarounds like ScalaReflectionLock
- Other minor Scala warning fixes
## How was this patch tested?
Existing tests
Author: Sean Owen <sowen@cloudera.com>
Closes#17150 from srowen/SPARK-19810.
## What changes were proposed in this pull request?
Currently, `RowDataSourceScanExec` and `FileSourceScanExec` rely on a "metadata" string map to implement equality comparison, since the RDDs they depend on cannot be directly compared. This has resulted in a number of correctness bugs around exchange reuse, e.g. SPARK-17673 and SPARK-16818.
To make these comparisons less brittle, we should refactor these classes to compare constructor parameters directly instead of relying on the metadata map.
This PR refactors `RowDataSourceScanExec`, `FileSourceScanExec` will be fixed in the follow-up PR.
## How was this patch tested?
existing tests
Author: Wenchen Fan <wenchen@databricks.com>
Closes#18600 from cloud-fan/minor.
## What changes were proposed in this pull request?
During Streaming Aggregation, we have two StateStores per task, one used as read-only in
`StateStoreRestoreExec`, and one read-write used in `StateStoreSaveExec`. `StateStore.abort`
will be called for these StateStores if they haven't committed their results. We need to
make sure that `abort` in read-only store after a `commit` in the read-write store doesn't
accidentally lead to the deletion of state.
This PR adds a test for this condition.
## How was this patch tested?
This PR adds a test.
Author: Burak Yavuz <brkyvz@gmail.com>
Closes#18603 from brkyvz/ss-test.
## What changes were proposed in this pull request?
Hive interprets regular expression, e.g., `(a)?+.+` in query specification. This PR enables spark to support this feature when hive.support.quoted.identifiers is set to true.
## How was this patch tested?
- Add unittests in SQLQuerySuite.scala
- Run spark-shell tested the original failed query:
scala> hc.sql("SELECT `(a|b)?+.+` from test1").collect.foreach(println)
Author: Jane Wang <janewang@fb.com>
Closes#18023 from janewangfb/support_select_regex.
### What changes were proposed in this pull request?
This PR is to implement UDF0. `UDF0` is needed when users need to implement a JAVA UDF with no argument.
### How was this patch tested?
Added a test case
Author: gatorsmile <gatorsmile@gmail.com>
Closes#18598 from gatorsmile/udf0.
## What changes were proposed in this pull request?
This PR deals with four points as below:
- Reuse existing DDL parser APIs rather than reimplementing within PySpark
- Support DDL formatted string, `field type, field type`.
- Support case-insensitivity for parsing.
- Support nested data types as below:
**Before**
```
>>> spark.createDataFrame([[[1]]], "struct<a: struct<b: int>>").show()
...
ValueError: The strcut field string format is: 'field_name:field_type', but got: a: struct<b: int>
```
```
>>> spark.createDataFrame([[[1]]], "a: struct<b: int>").show()
...
ValueError: The strcut field string format is: 'field_name:field_type', but got: a: struct<b: int>
```
```
>>> spark.createDataFrame([[1]], "a int").show()
...
ValueError: Could not parse datatype: a int
```
**After**
```
>>> spark.createDataFrame([[[1]]], "struct<a: struct<b: int>>").show()
+---+
| a|
+---+
|[1]|
+---+
```
```
>>> spark.createDataFrame([[[1]]], "a: struct<b: int>").show()
+---+
| a|
+---+
|[1]|
+---+
```
```
>>> spark.createDataFrame([[1]], "a int").show()
+---+
| a|
+---+
| 1|
+---+
```
## How was this patch tested?
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#18590 from HyukjinKwon/deduplicate-python-ddl.
## What changes were proposed in this pull request?
Add sql test for window functions, also remove uncecessary test cases in `WindowQuerySuite`.
## How was this patch tested?
Added `window.sql` and the corresponding output file.
Author: Xingbo Jiang <xingbo.jiang@databricks.com>
Closes#18591 from jiangxb1987/window.
## What changes were proposed in this pull request?
This PR proposes to remove `NumberFormat.parse` use to disallow a case of partially parsed data. For example,
```
scala> spark.read.schema("a DOUBLE").option("mode", "FAILFAST").csv(Seq("10u12").toDS).show()
+----+
| a|
+----+
|10.0|
+----+
```
## How was this patch tested?
Unit tests added in `UnivocityParserSuite` and `CSVSuite`.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#18532 from HyukjinKwon/SPARK-21263.
## What changes were proposed in this pull request?
In current code, it is expensive to use `UnboundedFollowingWindowFunctionFrame`, because it is iterating from the start to lower bound every time calling `write` method. When traverse the iterator, it's possible to skip some spilled files thus to save some time.
## How was this patch tested?
Added unit test
Did a small test for benchmark:
Put 2000200 rows into `UnsafeExternalSorter`-- 2 spill files(each contains 1000000 rows) and inMemSorter contains 200 rows.
Move the iterator forward to index=2000001.
*With this change*:
`getIterator(2000001)`, it will cost almost 0ms~1ms;
*Without this change*:
`for(int i=0; i<2000001; i++)geIterator().loadNext()`, it will cost 300ms.
Author: jinxing <jinxing6042@126.com>
Closes#18541 from jinxing64/SPARK-21315.
### What changes were proposed in this pull request?
Users get a very confusing error when users specify a wrong number of parameters.
```Scala
val df = spark.emptyDataFrame
spark.udf.register("foo", (_: String).length)
df.selectExpr("foo(2, 3, 4)")
```
```
org.apache.spark.sql.UDFSuite$$anonfun$9$$anonfun$apply$mcV$sp$12 cannot be cast to scala.Function3
java.lang.ClassCastException: org.apache.spark.sql.UDFSuite$$anonfun$9$$anonfun$apply$mcV$sp$12 cannot be cast to scala.Function3
at org.apache.spark.sql.catalyst.expressions.ScalaUDF.<init>(ScalaUDF.scala:109)
```
This PR is to capture the exception and issue an error message that is consistent with what we did for built-in functions. After the fix, the error message is improved to
```
Invalid number of arguments for function foo; line 1 pos 0
org.apache.spark.sql.AnalysisException: Invalid number of arguments for function foo; line 1 pos 0
at org.apache.spark.sql.catalyst.analysis.SimpleFunctionRegistry.lookupFunction(FunctionRegistry.scala:119)
```
### How was this patch tested?
Added a test case
Author: gatorsmile <gatorsmile@gmail.com>
Closes#18574 from gatorsmile/statsCheck.
## What changes were proposed in this pull request?
This pr added `unionByName` in `DataSet`.
Here is how to use:
```
val df1 = Seq((1, 2, 3)).toDF("col0", "col1", "col2")
val df2 = Seq((4, 5, 6)).toDF("col1", "col2", "col0")
df1.unionByName(df2).show
// output:
// +----+----+----+
// |col0|col1|col2|
// +----+----+----+
// | 1| 2| 3|
// | 6| 4| 5|
// +----+----+----+
```
## How was this patch tested?
Added tests in `DataFrameSuite`.
Author: Takeshi Yamamuro <yamamuro@apache.org>
Closes#18300 from maropu/SPARK-21043-2.
## What changes were proposed in this pull request?
Integrate Apache Arrow with Spark to increase performance of `DataFrame.toPandas`. This has been done by using Arrow to convert data partitions on the executor JVM to Arrow payload byte arrays where they are then served to the Python process. The Python DataFrame can then collect the Arrow payloads where they are combined and converted to a Pandas DataFrame. Data types except complex, date, timestamp, and decimal are currently supported, otherwise an `UnsupportedOperation` exception is thrown.
Additions to Spark include a Scala package private method `Dataset.toArrowPayload` that will convert data partitions in the executor JVM to `ArrowPayload`s as byte arrays so they can be easily served. A package private class/object `ArrowConverters` that provide data type mappings and conversion routines. In Python, a private method `DataFrame._collectAsArrow` is added to collect Arrow payloads and a SQLConf "spark.sql.execution.arrow.enable" can be used in `toPandas()` to enable using Arrow (uses the old conversion by default).
## How was this patch tested?
Added a new test suite `ArrowConvertersSuite` that will run tests on conversion of Datasets to Arrow payloads for supported types. The suite will generate a Dataset and matching Arrow JSON data, then the dataset is converted to an Arrow payload and finally validated against the JSON data. This will ensure that the schema and data has been converted correctly.
Added PySpark tests to verify the `toPandas` method is producing equal DataFrames with and without pyarrow. A roundtrip test to ensure the pandas DataFrame produced by pyspark is equal to a one made directly with pandas.
Author: Bryan Cutler <cutlerb@gmail.com>
Author: Li Jin <ice.xelloss@gmail.com>
Author: Li Jin <li.jin@twosigma.com>
Author: Wes McKinney <wes.mckinney@twosigma.com>
Closes#18459 from BryanCutler/toPandas_with_arrow-SPARK-13534.
## What changes were proposed in this pull request?
This PR supports schema in a DDL formatted string for `from_json` in R/Python and `dapply` and `gapply` in R, which are commonly used and/or consistent with Scala APIs.
Additionally, this PR exposes `structType` in R to allow working around in other possible corner cases.
**Python**
`from_json`
```python
from pyspark.sql.functions import from_json
data = [(1, '''{"a": 1}''')]
df = spark.createDataFrame(data, ("key", "value"))
df.select(from_json(df.value, "a INT").alias("json")).show()
```
**R**
`from_json`
```R
df <- sql("SELECT named_struct('name', 'Bob') as people")
df <- mutate(df, people_json = to_json(df$people))
head(select(df, from_json(df$people_json, "name STRING")))
```
`structType.character`
```R
structType("a STRING, b INT")
```
`dapply`
```R
dapply(createDataFrame(list(list(1.0)), "a"), function(x) {x}, "a DOUBLE")
```
`gapply`
```R
gapply(createDataFrame(list(list(1.0)), "a"), "a", function(key, x) { x }, "a DOUBLE")
```
## How was this patch tested?
Doc tests for `from_json` in Python and unit tests `test_sparkSQL.R` in R.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#18498 from HyukjinKwon/SPARK-21266.
## What changes were proposed in this pull request?
Updating numOutputRows metric was missing from one return path of LeftAnti SortMergeJoin.
## How was this patch tested?
Non-zero output rows manually seen in metrics.
Author: Juliusz Sompolski <julek@databricks.com>
Closes#18494 from juliuszsompolski/SPARK-21272.
## What changes were proposed in this pull request?
This pr made it more consistent to handle column name duplication. In the current master, error handling is different when hitting column name duplication:
```
// json
scala> val schema = StructType(StructField("a", IntegerType) :: StructField("a", IntegerType) :: Nil)
scala> Seq("""{"a":1, "a":1}"""""").toDF().coalesce(1).write.mode("overwrite").text("/tmp/data")
scala> spark.read.format("json").schema(schema).load("/tmp/data").show
org.apache.spark.sql.AnalysisException: Reference 'a' is ambiguous, could be: a#12, a#13.;
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolve(LogicalPlan.scala:287)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolve(LogicalPlan.scala:181)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolve$1.apply(LogicalPlan.scala:153)
scala> spark.read.format("json").load("/tmp/data").show
org.apache.spark.sql.AnalysisException: Duplicate column(s) : "a" found, cannot save to JSON format;
at org.apache.spark.sql.execution.datasources.json.JsonDataSource.checkConstraints(JsonDataSource.scala:81)
at org.apache.spark.sql.execution.datasources.json.JsonDataSource.inferSchema(JsonDataSource.scala:63)
at org.apache.spark.sql.execution.datasources.json.JsonFileFormat.inferSchema(JsonFileFormat.scala:57)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$7.apply(DataSource.scala:176)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$7.apply(DataSource.scala:176)
// csv
scala> val schema = StructType(StructField("a", IntegerType) :: StructField("a", IntegerType) :: Nil)
scala> Seq("a,a", "1,1").toDF().coalesce(1).write.mode("overwrite").text("/tmp/data")
scala> spark.read.format("csv").schema(schema).option("header", false).load("/tmp/data").show
org.apache.spark.sql.AnalysisException: Reference 'a' is ambiguous, could be: a#41, a#42.;
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolve(LogicalPlan.scala:287)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolve(LogicalPlan.scala:181)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolve$1.apply(LogicalPlan.scala:153)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolve$1.apply(LogicalPlan.scala:152)
// If `inferSchema` is true, a CSV format is duplicate-safe (See SPARK-16896)
scala> spark.read.format("csv").option("header", true).load("/tmp/data").show
+---+---+
| a0| a1|
+---+---+
| 1| 1|
+---+---+
// parquet
scala> val schema = StructType(StructField("a", IntegerType) :: StructField("a", IntegerType) :: Nil)
scala> Seq((1, 1)).toDF("a", "b").coalesce(1).write.mode("overwrite").parquet("/tmp/data")
scala> spark.read.format("parquet").schema(schema).option("header", false).load("/tmp/data").show
org.apache.spark.sql.AnalysisException: Reference 'a' is ambiguous, could be: a#110, a#111.;
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolve(LogicalPlan.scala:287)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolve(LogicalPlan.scala:181)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolve$1.apply(LogicalPlan.scala:153)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolve$1.apply(LogicalPlan.scala:152)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
```
When this patch applied, the results change to;
```
// json
scala> val schema = StructType(StructField("a", IntegerType) :: StructField("a", IntegerType) :: Nil)
scala> Seq("""{"a":1, "a":1}"""""").toDF().coalesce(1).write.mode("overwrite").text("/tmp/data")
scala> spark.read.format("json").schema(schema).load("/tmp/data").show
org.apache.spark.sql.AnalysisException: Found duplicate column(s) in datasource: "a";
at org.apache.spark.sql.util.SchemaUtils$.checkColumnNameDuplication(SchemaUtil.scala:47)
at org.apache.spark.sql.util.SchemaUtils$.checkSchemaColumnNameDuplication(SchemaUtil.scala:33)
at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:186)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:368)
scala> spark.read.format("json").load("/tmp/data").show
org.apache.spark.sql.AnalysisException: Found duplicate column(s) in datasource: "a";
at org.apache.spark.sql.util.SchemaUtils$.checkColumnNameDuplication(SchemaUtil.scala:47)
at org.apache.spark.sql.util.SchemaUtils$.checkSchemaColumnNameDuplication(SchemaUtil.scala:33)
at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:186)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:368)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:156)
// csv
scala> val schema = StructType(StructField("a", IntegerType) :: StructField("a", IntegerType) :: Nil)
scala> Seq("a,a", "1,1").toDF().coalesce(1).write.mode("overwrite").text("/tmp/data")
scala> spark.read.format("csv").schema(schema).option("header", false).load("/tmp/data").show
org.apache.spark.sql.AnalysisException: Found duplicate column(s) in datasource: "a";
at org.apache.spark.sql.util.SchemaUtils$.checkColumnNameDuplication(SchemaUtil.scala:47)
at org.apache.spark.sql.util.SchemaUtils$.checkSchemaColumnNameDuplication(SchemaUtil.scala:33)
at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:186)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:368)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)
scala> spark.read.format("csv").option("header", true).load("/tmp/data").show
+---+---+
| a0| a1|
+---+---+
| 1| 1|
+---+---+
// parquet
scala> val schema = StructType(StructField("a", IntegerType) :: StructField("a", IntegerType) :: Nil)
scala> Seq((1, 1)).toDF("a", "b").coalesce(1).write.mode("overwrite").parquet("/tmp/data")
scala> spark.read.format("parquet").schema(schema).option("header", false).load("/tmp/data").show
org.apache.spark.sql.AnalysisException: Found duplicate column(s) in datasource: "a";
at org.apache.spark.sql.util.SchemaUtils$.checkColumnNameDuplication(SchemaUtil.scala:47)
at org.apache.spark.sql.util.SchemaUtils$.checkSchemaColumnNameDuplication(SchemaUtil.scala:33)
at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:186)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:368)
```
## How was this patch tested?
Added tests in `DataFrameReaderWriterSuite` and `SQLQueryTestSuite`.
Author: Takeshi Yamamuro <yamamuro@apache.org>
Closes#17758 from maropu/SPARK-20460.
## What changes were proposed in this pull request?
Some code cleanup and adding comments to make the code more readable. Changed the way to generate result rows, to be more clear.
## How was this patch tested?
existing tests
Author: Wenchen Fan <wenchen@databricks.com>
Closes#18570 from cloud-fan/summary.
## What changes were proposed in this pull request?
These 3 methods have to be used together, so it makes more sense to merge them into one method and then the caller side only need to call one method.
## How was this patch tested?
existing tests.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#18579 from cloud-fan/minor.
## What changes were proposed in this pull request?
Since we do not set active sessions when parsing the plan, we are unable to correctly use SQLConf.get to find the correct active session. Since https://github.com/apache/spark/pull/18531 breaks the build, I plan to revert it at first.
## How was this patch tested?
The existing test cases
Author: Xiao Li <gatorsmile@gmail.com>
Closes#18568 from gatorsmile/revert18531.
## What changes were proposed in this pull request?
We should be able to store zero size and row count after analyzing empty table.
This pr also enhances the test cases for re-analyzing tables.
## How was this patch tested?
Added a new test case and enhanced some test cases.
Author: Zhenhua Wang <wangzhenhua@huawei.com>
Closes#18292 from wzhfy/analyzeNewColumn.
## What changes were proposed in this pull request?
`SparkSessionBuilderSuite` should clean up stopped sessions. Otherwise, it leaves behind some stopped `SparkContext`s interfereing with other test suites using `ShardSQLContext`.
Recently, master branch fails consequtively.
- https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/
## How was this patch tested?
Pass the Jenkins with a updated suite.
Author: Dongjoon Hyun <dongjoon@apache.org>
Closes#18567 from dongjoon-hyun/SPARK-SESSION.
## What changes were proposed in this pull request?
This adds documentation to many functions in pyspark.sql.functions.py:
`upper`, `lower`, `reverse`, `unix_timestamp`, `from_unixtime`, `rand`, `randn`, `collect_list`, `collect_set`, `lit`
Add units to the trigonometry functions.
Renames columns in datetime examples to be more informative.
Adds links between some functions.
## How was this patch tested?
`./dev/lint-python`
`python python/pyspark/sql/functions.py`
`./python/run-tests.py --module pyspark-sql`
Author: Michael Patterson <map222@gmail.com>
Closes#17865 from map222/spark-20456.
## What changes were proposed in this pull request?
This pr modified code to use string types by default if `array` and `map` in functions have no argument. This behaviour is the same with Hive one;
```
hive> CREATE TEMPORARY TABLE t1 AS SELECT map();
hive> DESCRIBE t1;
_c0 map<string,string>
hive> CREATE TEMPORARY TABLE t2 AS SELECT array();
hive> DESCRIBE t2;
_c0 array<string>
```
## How was this patch tested?
Added tests in `DataFrameFunctionsSuite`.
Author: Takeshi Yamamuro <yamamuro@apache.org>
Closes#18516 from maropu/SPARK-21281.
## What changes were proposed in this pull request?
Adds method `summary` that allows user to specify which statistics and percentiles to calculate. By default it include the existing statistics from `describe` and quartiles (25th, 50th, and 75th percentiles) similar to Pandas. Also changes the implementation of `describe` to delegate to `summary`.
## How was this patch tested?
additional unit test
Author: Andrew Ray <ray.andrew@gmail.com>
Closes#18307 from aray/SPARK-21100.
## What changes were proposed in this pull request?
Revise rand comparison in BatchEvalPythonExecSuite
In BatchEvalPythonExecSuite, there are two cases using the case "rand() > 3"
Rand() generates a random value in [0, 1), it is wired to be compared with 3, use 0.3 instead
## How was this patch tested?
unit test
Please review http://spark.apache.org/contributing.html before opening a pull request.
Author: Wang Gengliang <ltnwgl@gmail.com>
Closes#18560 from gengliangwang/revise_BatchEvalPythonExecSuite.
## What changes were proposed in this pull request?
un-aliased subquery is supported by Spark SQL for a long time. Its semantic was not well defined and had confusing behaviors, and it's not a standard SQL syntax, so we disallowed it in https://issues.apache.org/jira/browse/SPARK-20690 .
However, this is a breaking change, and we do have existing queries using un-aliased subquery. We should add the support back and fix its semantic.
This PR fixes the un-aliased subquery by assigning a default alias name.
After this PR, there is no syntax change from branch 2.2 to master, but we invalid a weird use case:
`SELECT v.i from (SELECT i FROM v)`. Now this query will throw analysis exception because users should not be able to use the qualifier inside a subquery.
## How was this patch tested?
new regression test
Author: Wenchen Fan <wenchen@databricks.com>
Closes#18559 from cloud-fan/sub-query.
## What changes were proposed in this pull request?
Add `toString` with options for `ConsoleSink` so it shows nicely in query progress.
**BEFORE**
```
"sink" : {
"description" : "org.apache.spark.sql.execution.streaming.ConsoleSink4b340441"
}
```
**AFTER**
```
"sink" : {
"description" : "ConsoleSink[numRows=10, truncate=false]"
}
```
/cc zsxwing tdas
## How was this patch tested?
Local build
Author: Jacek Laskowski <jacek@japila.pl>
Closes#18539 from jaceklaskowski/SPARK-21313-ConsoleSink-toString.
## What changes were proposed in this pull request?
Remove time metrics since it seems no way to measure it in non per-row tracking.
## How was this patch tested?
Existing tests.
Please review http://spark.apache.org/contributing.html before opening a pull request.
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#18558 from viirya/SPARK-20703-followup.
## What changes were proposed in this pull request?
This PR implements bulk-copy for `ColumnVector.Array.to<type>Array()` methods (e.g. `toIntArray()`) in `ColumnVector.Array` by using `System.arrayCopy()` or `Platform.copyMemory()`.
Before this PR, when one of these method is called, the generic method in `ArrayData` is called. It is not fast since element-wise copy is performed.
This PR can improve performance of a benchmark program by 1.9x and 3.2x.
Without this PR
```
OpenJDK 64-Bit Server VM 1.8.0_131-8u131-b11-0ubuntu1.16.04.2-b11 on Linux 4.4.0-66-generic
Intel(R) Xeon(R) CPU E5-2667 v3 3.20GHz
Int Array Best/Avg Time(ms) Rate(M/s) Per Row(ns)
------------------------------------------------------------------------------------------------
ON_HEAP 586 / 628 14.3 69.9
OFF_HEAP 893 / 902 9.4 106.5
```
With this PR
```
OpenJDK 64-Bit Server VM 1.8.0_131-8u131-b11-0ubuntu1.16.04.2-b11 on Linux 4.4.0-66-generic
Intel(R) Xeon(R) CPU E5-2667 v3 3.20GHz
Int Array Best/Avg Time(ms) Rate(M/s) Per Row(ns)
------------------------------------------------------------------------------------------------
ON_HEAP 306 / 331 27.4 36.4
OFF_HEAP 282 / 287 29.8 33.6
```
Source program
```
(MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => {
val len = 8 * 1024 * 1024
val column = ColumnVector.allocate(len * 2, new ArrayType(IntegerType, false), memMode)
val data = column.arrayData
var i = 0
while (i < len) {
data.putInt(i, i)
i += 1
}
column.putArray(0, 0, len)
val benchmark = new Benchmark("Int Array", len, minNumIters = 20)
benchmark.addCase(s"$memMode") { iter =>
var i = 0
while (i < 50) {
column.getArray(0).toIntArray
i += 1
}
}
benchmark.run
}}
```
## How was this patch tested?
Added test suite
Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>
Closes#18425 from kiszk/SPARK-21217.
## What changes were proposed in this pull request?
Making EventTimeWatermarkExec explicitly UnaryExecNode
/cc tdas zsxwing
## How was this patch tested?
Local build.
Author: Jacek Laskowski <jacek@japila.pl>
Closes#18509 from jaceklaskowski/EventTimeWatermarkExec-UnaryExecNode.
## What changes were proposed in this pull request?
SparkContext is shared by all sessions, we should not update its conf for only one session.
## How was this patch tested?
existing tests
Author: Wenchen Fan <wenchen@databricks.com>
Closes#18536 from cloud-fan/config.
## What changes were proposed in this pull request?
Few changes to the Structured Streaming documentation
- Clarify that the entire stream input table is not materialized
- Add information for Ganglia
- Add Kafka Sink to the main docs
- Removed a couple of leftover experimental tags
- Added more associated reading material and talk videos.
In addition, https://github.com/apache/spark/pull/16856 broke the link to the RDD programming guide in several places while renaming the page. This PR fixes those sameeragarwal cloud-fan.
- Added a redirection to avoid breaking internal and possible external links.
- Removed unnecessary redirection pages that were there since the separate scala, java, and python programming guides were merged together in 2013 or 2014.
## How was this patch tested?
(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)
Please review http://spark.apache.org/contributing.html before opening a pull request.
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes#18485 from tdas/SPARK-21267.
## What changes were proposed in this pull request?
Currently we can't produce a `Dataset` containing `Set` in SparkSQL. This PR tries to support serialization/deserialization of `Set`.
Because there's no corresponding internal data type in SparkSQL for a `Set`, the most proper choice for serializing a set should be an array.
## How was this patch tested?
Added unit tests.
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#18416 from viirya/SPARK-21204.
## What changes were proposed in this pull request?
When data type is struct, InSet now uses TypeUtils.getInterpretedOrdering (similar to EqualTo) to build a TreeSet. In other cases it will use a HashSet as before (which should be faster). Similarly, In.eval uses Ordering.equiv instead of equals.
## How was this patch tested?
New test in SQLQuerySuite.
Author: Bogdan Raducanu <bogdan@databricks.com>
Closes#18455 from bogdanrdc/SPARK-21228.
## What changes were proposed in this pull request?
1. move `StatisticsCollectionTestBase` to a separate file.
2. move some test cases to `StatisticsCollectionSuite` so that `hive/StatisticsSuite` only keeps tests that need hive support.
3. clear up some test cases.
## How was this patch tested?
Existing tests.
Author: wangzhenhua <wangzhenhua@huawei.com>
Author: Zhenhua Wang <wzh_zju@163.com>
Closes#18545 from wzhfy/cleanStatSuites.
## What changes were proposed in this pull request?
Right now in the UI, after SPARK-20213, we can show the operations to write data out. However, there is no way to associate metrics with data writes. We should show relative metrics on the operations.
#### Supported commands
This change supports updating metrics for file-based data writing operations, including `InsertIntoHadoopFsRelationCommand`, `InsertIntoHiveTable`.
Supported metrics:
* number of written files
* number of dynamic partitions
* total bytes of written data
* total number of output rows
* average writing data out time (ms)
* (TODO) min/med/max number of output rows per file/partition
* (TODO) min/med/max bytes of written data per file/partition
#### Commands not supported
`InsertIntoDataSourceCommand`, `SaveIntoDataSourceCommand`:
The two commands uses DataSource APIs to write data out, i.e., the logic of writing data out is delegated to the DataSource implementations, such as `InsertableRelation.insert` and `CreatableRelationProvider.createRelation`. So we can't obtain metrics from delegated methods for now.
`CreateHiveTableAsSelectCommand`, `CreateDataSourceTableAsSelectCommand` :
The two commands invokes other commands to write data out. The invoked commands can even write to non file-based data source. We leave them as future TODO.
#### How to update metrics of writing files out
A `RunnableCommand` which wants to update metrics, needs to override its `metrics` and provide the metrics data structure to `ExecutedCommandExec`.
The metrics are prepared during the execution of `FileFormatWriter`. The callback function passed to `FileFormatWriter` will accept the metrics and update accordingly.
There is a metrics updating function in `RunnableCommand`. In runtime, the function will be bound to the spark context and `metrics` of `ExecutedCommandExec` and pass to `FileFormatWriter`.
## How was this patch tested?
Updated unit tests.
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#18159 from viirya/SPARK-20703-2.