## What changes were proposed in this pull request?
Since SPARK-21939, Apache Spark uses `TimeLimits` instead of the deprecated `Timeouts`. This PR fixes the build warning `BufferHolderSparkSubmitSuite.scala` introduced at [SPARK-22222](https://github.com/apache/spark/pull/19460/files#diff-d8cf6e0c229969db94ec8ffc31a9239cR36) by removing the redundant `Timeouts`.
```scala
trait Timeouts in package concurrent is deprecated: Please use org.scalatest.concurrent.TimeLimits instead
[warn] with Timeouts {
```
## How was this patch tested?
N/A
Author: Dongjoon Hyun <dongjoon@apache.org>
Closes#19697 from dongjoon-hyun/SPARK-22222.
## What changes were proposed in this pull request?
This PR proposes to add `errorifexists` to SparkR API and fix the rest of them describing the mode, mainly, in API documentations as well.
This PR also replaces `convertToJSaveMode` to `setWriteMode` so that string as is is passed to JVM and executes:
b034f2565f/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala (L72-L82)
and remove the duplication here:
3f958a9992/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala (L187-L194)
## How was this patch tested?
Manually checked the built documentation. These were mainly found by `` grep -r `error` `` and `grep -r 'error'`.
Also, unit tests added in `test_sparkSQL.R`.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#19673 from HyukjinKwon/SPARK-21640-followup.
## What changes were proposed in this pull request?
This PR adds support for a new function called `dayofweek` that returns the day of the week of the given argument as an integer value in the range 1-7, where 1 represents Sunday.
## How was this patch tested?
Unit tests and manual tests.
Author: ptkool <michael.styles@shopify.com>
Closes#19672 from ptkool/day_of_week_function.
## What changes were proposed in this pull request?
UDFs that can cause runtime exception on invalid data are not safe to pushdown, because its behavior depends on its position in the query plan. Pushdown of it will risk to change its original behavior.
The example reported in the JIRA and taken as test case shows this issue. We should declare UDFs that can cause runtime exception on invalid data as non-determinstic.
This updates the document of `deterministic` property in `Expression` and states clearly an UDF that can cause runtime exception on some specific input, should be declared as non-determinstic.
## How was this patch tested?
Added test. Manually test.
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#19662 from viirya/SPARK-22446.
## What changes were proposed in this pull request?
`<=>` is not supported by Hive metastore partition predicate pushdown. We should not push down it to Hive metastore when they are be using in partition predicates.
## How was this patch tested?
Added a test case
Author: gatorsmile <gatorsmile@gmail.com>
Closes#19682 from gatorsmile/fixLimitPushDown.
## What changes were proposed in this pull request?
`spark.sql.statistics.autoUpdate.size` should be `spark.sql.statistics.size.autoUpdate.enabled`. The previous name is confusing as users may treat it as a size config.
This config is in master branch only, no backward compatibility issue.
## How was this patch tested?
N/A
Author: Wenchen Fan <wenchen@databricks.com>
Closes#19667 from cloud-fan/minor.
## What changes were proposed in this pull request?
clarify exception behaviors for all data source v2 interfaces.
## How was this patch tested?
document change only
Author: Wenchen Fan <wenchen@databricks.com>
Closes#19623 from cloud-fan/data-source-exception.
## What changes were proposed in this pull request?
`CodegenContext.copyResult` is kind of a global status for whole stage codegen. But the tricky part is, it is only used to transfer an information from child to parent when calling the `consume` chain. We have to be super careful in `produce`/`consume`, to set it to true when producing multiple result rows, and set it to false in operators that start new pipeline(like sort).
This PR moves the `copyResult` to `CodegenSupport`, and call it at `WholeStageCodegenExec`. This is much easier to reason about.
## How was this patch tested?
existing tests
Author: Wenchen Fan <wenchen@databricks.com>
Closes#19656 from cloud-fan/whole-sage.
…
## What changes were proposed in this pull request?
override JDBCDialects methods quoteIdentifier, getTableExistsQuery and getSchemaQuery in AggregatedDialect
## How was this patch tested?
Test the new implementation in JDBCSuite test("Aggregated dialects")
Author: Huaxin Gao <huaxing@us.ibm.com>
Closes#19658 from huaxingao/spark-22443.
## What changes were proposed in this pull request?
It's not safe in all cases to push down a LIMIT below a FULL OUTER
JOIN. If the limit is pushed to one side of the FOJ, the physical
join operator can not tell if a row in the non-limited side would have a
match in the other side.
*If* the join operator guarantees that unmatched tuples from the limited
side are emitted before any unmatched tuples from the other side,
pushing down the limit is safe. But this is impractical for some join
implementations, e.g. SortMergeJoin.
For now, disable limit pushdown through a FULL OUTER JOIN, and we can
evaluate whether a more complicated solution is necessary in the future.
## How was this patch tested?
Ran org.apache.spark.sql.* tests. Altered full outer join tests in
LimitPushdownSuite.
Author: Henry Robinson <henry@cloudera.com>
Closes#19647 from henryr/spark-22211.
## What changes were proposed in this pull request?
Next fit decreasing bin packing algorithm is used to combine splits in DataSourceScanExec but the comment incorrectly states that first fit decreasing algorithm is used. The current implementation doesn't go back to a previously used bin other than the bin that the last element was put into.
Author: Vinitha Gankidi <vgankidi@netflix.com>
Closes#19634 from vgankidi/SPARK-22412.
## What changes were proposed in this pull request?
When we insert `BatchEvalPython` for Python UDFs into a query plan, if its child has some outputs that are not used by the original parent node, `BatchEvalPython` will still take those outputs and save into the queue. When the data for those outputs are big, it is easily to generate big spill on disk.
For example, the following reproducible code is from the JIRA ticket.
```python
from pyspark.sql.functions import *
from pyspark.sql.types import *
lines_of_file = [ "this is a line" for x in xrange(10000) ]
file_obj = [ "this_is_a_foldername/this_is_a_filename", lines_of_file ]
data = [ file_obj for x in xrange(5) ]
small_df = spark.sparkContext.parallelize(data).map(lambda x : (x[0], x[1])).toDF(["file", "lines"])
exploded = small_df.select("file", explode("lines"))
def split_key(s):
return s.split("/")[1]
split_key_udf = udf(split_key, StringType())
with_filename = exploded.withColumn("filename", split_key_udf("file"))
with_filename.explain(True)
```
The physical plan before/after this change:
Before:
```
*Project [file#0, col#5, pythonUDF0#14 AS filename#9]
+- BatchEvalPython [split_key(file#0)], [file#0, lines#1, col#5, pythonUDF0#14]
+- Generate explode(lines#1), true, false, [col#5]
+- Scan ExistingRDD[file#0,lines#1]
```
After:
```
*Project [file#0, col#5, pythonUDF0#14 AS filename#9]
+- BatchEvalPython [split_key(file#0)], [col#5, file#0, pythonUDF0#14]
+- *Project [col#5, file#0]
+- Generate explode(lines#1), true, false, [col#5]
+- Scan ExistingRDD[file#0,lines#1]
```
Before this change, `lines#1` is a redundant input to `BatchEvalPython`. This patch removes it by adding a Project.
## How was this patch tested?
Manually test.
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#19642 from viirya/SPARK-22410.
## What changes were proposed in this pull request?
Scala test source files like TestHiveSingleton.scala should be in scala source root
## How was this patch tested?
Just move scala file from java directory to scala directory
No new test case in this PR.
```
renamed: mllib/src/test/java/org/apache/spark/ml/util/IdentifiableSuite.scala -> mllib/src/test/scala/org/apache/spark/ml/util/IdentifiableSuite.scala
renamed: streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala -> streaming/src/test/scala/org/apache/spark/streaming/JavaTestUtils.scala
renamed: streaming/src/test/java/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala -> streaming/src/test/scala/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala
renamed: sql/hive/src/test/java/org/apache/spark/sql/hive/test/TestHiveSingleton.scala sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHiveSingleton.scala
```
Author: xubo245 <601450868@qq.com>
Closes#19639 from xubo245/scalaDirectory.
## What changes were proposed in this pull request?
Added a test class to check NULL handling behavior.
The expected behavior is defined as the one of the most well-known databases as specified here: https://sqlite.org/nulls.html.
SparkSQL behaves like other DBs:
- Adding anything to null gives null -> YES
- Multiplying null by zero gives null -> YES
- nulls are distinct in SELECT DISTINCT -> NO
- nulls are distinct in a UNION -> NO
- "CASE WHEN null THEN 1 ELSE 0 END" is 0? -> YES
- "null OR true" is true -> YES
- "not (null AND false)" is true -> YES
- null in aggregation are skipped -> YES
## How was this patch tested?
Added test class
Author: Marco Gaido <mgaido@hortonworks.com>
Closes#19653 from mgaido91/SPARK-22418.
forward-port https://github.com/apache/spark/pull/19622 to master branch.
This bug doesn't exist in master because we've added hive bucketing support and the hive bucketing metadata can be recognized by Spark, but we should still port it to master: 1) there may be other unsupported hive metadata removed by Spark. 2) reduce code difference between master and 2.2 to ease the backport in the feature.
***
When we alter table schema, we set the new schema to spark `CatalogTable`, convert it to hive table, and finally call `hive.alterTable`. This causes a problem in Spark 2.2, because hive bucketing metedata is not recognized by Spark, which means a Spark `CatalogTable` representing a hive table is always non-bucketed, and when we convert it to hive table and call `hive.alterTable`, the original hive bucketing metadata will be removed.
To fix this bug, we should read out the raw hive table metadata, update its schema, and call `hive.alterTable`. By doing this we can guarantee only the schema is changed, and nothing else.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#19644 from cloud-fan/infer.
## What changes were proposed in this pull request?
According to the [discussion](https://github.com/apache/spark/pull/19571#issuecomment-339472976) on SPARK-15474, we will add new OrcFileFormat in `sql/core` module and allow users to use both old and new OrcFileFormat.
To do that, `OrcOptions` should be visible in `sql/core` module, too. Previously, it was `private[orc]` in `sql/hive`. This PR removes `private[orc]` because we don't use `private[sql]` in `sql/execution` package after [SPARK-16964](https://github.com/apache/spark/pull/14554).
## How was this patch tested?
Pass the Jenkins with the existing tests.
Author: Dongjoon Hyun <dongjoon@apache.org>
Closes#19636 from dongjoon-hyun/SPARK-22416.
## What changes were proposed in this pull request?
Adding a global limit on top of the distinct values before sorting and collecting will reduce the overall work in the case where we have more distinct values. We will also eagerly perform a collect rather than a take because we know we only have at most (maxValues + 1) rows.
## How was this patch tested?
Existing tests cover sorted order
Author: Patrick Woody <pwoody@palantir.com>
Closes#19629 from pwoody/SPARK-22408.
## What changes were proposed in this pull request?
This patch includes some doc updates for data source API v2. I was reading the code and noticed some minor issues.
## How was this patch tested?
This is a doc only change.
Author: Reynold Xin <rxin@databricks.com>
Closes#19626 from rxin/dsv2-update.
## What changes were proposed in this pull request?
Write HDFSBackedStateStoreProvider.loadMap non-recursively. This prevents stack overflow if too many deltas stack up in a low memory environment.
## How was this patch tested?
existing unit tests for functional equivalence, new unit test to check for stack overflow
Author: Jose Torres <jose@databricks.com>
Closes#19611 from joseph-torres/SPARK-22305.
## What changes were proposed in this pull request?
We made a mistake in https://github.com/apache/spark/pull/16944 . In `HiveMetastoreCatalog#inferIfNeeded` we infer the data schema, merge with full schema, and return the new full schema. At caller side we treat the full schema as data schema and set it to `HadoopFsRelation`.
This doesn't cause any problem because both parquet and orc can work with a wrong data schema that has extra columns, but it's better to fix this mistake.
## How was this patch tested?
N/A
Author: Wenchen Fan <wenchen@databricks.com>
Closes#19615 from cloud-fan/infer.
## What changes were proposed in this pull request?
The current join estimation logic is only based on basic column statistics (such as ndv, etc). If we want to add estimation for other kinds of statistics (such as histograms), it's not easy to incorporate into the current algorithm:
1. When we have multiple pairs of join keys, the current algorithm computes cardinality in a single formula. But if different join keys have different kinds of stats, the computation logic for each pair of join keys become different, so the previous formula does not apply.
2. Currently it computes cardinality and updates join keys' column stats separately. It's better to do these two steps together, since both computation and update logic are different for different kinds of stats.
## How was this patch tested?
Only refactor, covered by existing tests.
Author: Zhenhua Wang <wangzhenhua@huawei.com>
Closes#19531 from wzhfy/join_est_refactor.
## What changes were proposed in this pull request?
Both `ReadSupport` and `ReadTask` have a method called `createReader`, but they create different things. This could cause some confusion for data source developers. The same issue exists between `WriteSupport` and `DataWriterFactory`, both of which have a method called `createWriter`. This PR renames the method of `ReadTask`/`DataWriterFactory` to `createDataReader`/`createDataWriter`.
Besides, the name of `RowToInternalRowDataWriterFactory` is not correct, because it actually converts `InternalRow`s to `Row`s. It should be renamed `InternalRowDataWriterFactory`.
## How was this patch tested?
Only renaming, should be covered by existing tests.
Author: Zhenhua Wang <wzh_zju@163.com>
Closes#19610 from wzhfy/rename.
## What changes were proposed in this pull request?
When Hive support is not on, users can hit unresolved plan node when trying to call `INSERT OVERWRITE DIRECTORY` using Hive format.
```
"unresolved operator 'InsertIntoDir true, Storage(Location: /private/var/folders/vx/j0ydl5rn0gd9mgrh1pljnw900000gn/T/spark-b4227606-9311-46a8-8c02-56355bf0e2bc, Serde Library: org.apache.hadoop.hive.ql.io.orc.OrcSerde, InputFormat: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat, OutputFormat: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat), hive, true;;
```
This PR is to issue a better error message.
## How was this patch tested?
Added a test case.
Author: gatorsmile <gatorsmile@gmail.com>
Closes#19608 from gatorsmile/hivesupportInsertOverwrite.
## What changes were proposed in this pull request?
In `UnsafeInMemorySorter`, one record may take 32 bytes: 1 `long` for pointer, 1 `long` for key-prefix, and another 2 `long`s as the temporary buffer for radix sort.
In `UnsafeExternalSorter`, we set the `DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD` to be `1024 * 1024 * 1024 / 2`, and hoping the max size of point array to be 8 GB. However this is wrong, `1024 * 1024 * 1024 / 2 * 32` is actually 16 GB, and if we grow the point array before reach this limitation, we may hit the max-page-size error.
Users may see exception like this on large dataset:
```
Caused by: java.lang.IllegalArgumentException: Cannot allocate a page with more than 17179869176 bytes
at org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskMemoryManager.java:241)
at org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:121)
at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:374)
at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:396)
at org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:94)
...
```
Setting `DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD` to a smaller number is not enough, users can still set the config to a big number and trigger the too large page size issue. This PR fixes it by explicitly handling the too large page size exception in the sorter and spill.
This PR also change the type of `spark.shuffle.spill.numElementsForceSpillThreshold` to int, because it's only compared with `numRecords`, which is an int. This is an internal conf so we don't have a serious compatibility issue.
## How was this patch tested?
TODO
Author: Wenchen Fan <wenchen@databricks.com>
Closes#18251 from cloud-fan/sort.
## What changes were proposed in this pull request?
This issue was discovered and investigated by Ohad Raviv and Sean Owen in https://issues.apache.org/jira/browse/SPARK-21657. The input data of `MapObjects` may be a `List` which has O(n) complexity for accessing by index. When converting input data to catalyst array, `MapObjects` gets element by index in each loop, and results to bad performance.
This PR fixes this issue by accessing elements via Iterator.
## How was this patch tested?
using the test script in https://issues.apache.org/jira/browse/SPARK-21657
```
val BASE = 100000000
val N = 100000
val df = sc.parallelize(List(("1234567890", (BASE to (BASE+N)).map(x => (x.toString, (x+1).toString, (x+2).toString, (x+3).toString)).toList ))).toDF("c1", "c_arr")
spark.time(df.queryExecution.toRdd.foreach(_ => ()))
```
We can see 50x speed up.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#19603 from cloud-fan/map-objects.
## What changes were proposed in this pull request?
Fix three deprecation warnings introduced by move to ANTLR 4.7:
* Use ParserRuleContext.addChild(TerminalNode) in preference to
deprecated ParserRuleContext.addChild(Token) interface.
* TokenStream.reset() is deprecated in favour of seek(0)
* Replace use of deprecated ANTLRInputStream with stream returned by
CharStreams.fromString()
The last item changed the way we construct ANTLR's input stream (from
direct instantiation to factory construction), so necessitated a change
to how we override the LA() method to always return an upper-case
char. The ANTLR object is now wrapped, rather than inherited-from.
* Also fix incorrect usage of CharStream.getText() which expects the rhs
of the supplied interval to be the last char to be returned, i.e. the
interval is inclusive, and work around bug in ANTLR 4.7 where empty
streams or intervals may cause getText() to throw an error.
## How was this patch tested?
Ran all the sql tests. Confirmed that LA() override has coverage by
breaking it, and noting that tests failed.
Author: Henry Robinson <henry@apache.org>
Closes#19578 from henryr/spark-21983.
## What changes were proposed in this pull request?
This PR fixes the conversion error when reads data from a PostgreSQL table that contains columns of `uuid[]`, `inet[]` and `cidr[]` data types.
For example, create a table with the uuid[] data type, and insert the test data.
```SQL
CREATE TABLE users
(
id smallint NOT NULL,
name character varying(50),
user_ids uuid[],
PRIMARY KEY (id)
)
INSERT INTO users ("id", "name","user_ids")
VALUES (1, 'foo', ARRAY
['7be8aaf8-650e-4dbb-8186-0a749840ecf2'
,'205f9bfc-018c-4452-a605-609c0cfad228']::UUID[]
)
```
Then it will throw the following exceptions when trying to load the data.
```
java.lang.ClassCastException: [Ljava.util.UUID; cannot be cast to [Ljava.lang.String;
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$14.apply(JdbcUtils.scala:459)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$14.apply(JdbcUtils.scala:458)
...
```
## How was this patch tested?
Added test in `PostgresIntegrationSuite`.
Author: Jen-Ming Chung <jenmingisme@gmail.com>
Closes#19567 from jmchung/SPARK-22291.
## What changes were proposed in this pull request?
This is a followup of https://github.com/apache/spark/pull/17075 , to fix the bug in codegen path.
## How was this patch tested?
new regression test
Author: Wenchen Fan <wenchen@databricks.com>
Closes#19576 from cloud-fan/bug.
## What changes were proposed in this pull request?
AggUtils.planStreamingAggregation has some comments about DISTINCT aggregates,
while streaming aggregation does not support DISTINCT.
This seems to have been wrongly copy-pasted over.
## How was this patch tested?
Only a comment change.
Author: Juliusz Sompolski <julek@databricks.com>
Closes#18937 from juliuszsompolski/streaming-agg-doc.
## What changes were proposed in this pull request?
`ArrowEvalPythonExec` and `FlatMapGroupsInPandasExec` are refering config values of `SQLConf` in function for `mapPartitions`/`mapPartitionsInternal`, but we should capture them in Driver.
## How was this patch tested?
Added a test and existing tests.
Author: Takuya UESHIN <ueshin@databricks.com>
Closes#19587 from ueshin/issues/SPARK-22370.
## What changes were proposed in this pull request?
Seems that end users can be confused by the union's behavior on Dataset of typed objects. We can clarity it more in the document of `union` function.
## How was this patch tested?
Only document change.
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#19570 from viirya/SPARK-22335.
## What changes were proposed in this pull request?
Canonicalized plans are not supposed to be executed. I ran into a case in which there's some code that accidentally calls execute on a canonicalized plan. This patch throws a more explicit exception when that happens.
## How was this patch tested?
Added a test case in SparkPlanSuite.
Author: Reynold Xin <rxin@databricks.com>
Closes#18828 from rxin/SPARK-21619.
## What changes were proposed in this pull request?
https://issues.apache.org/jira/browse/SPARK-22333
In current version, users can use CURRENT_DATE() and CURRENT_TIMESTAMP() without specifying braces.
However, when a table has columns named as "current_date" or "current_timestamp", it will still be parsed as function call.
There are many such cases in our production cluster. We get the wrong answer due to this inappropriate behevior. In general, ColumnReference should get higher priority than timeFunctionCall.
## How was this patch tested?
unit test
manul test
Author: donnyzone <wellfengzhu@gmail.com>
Closes#19559 from DonnyZone/master.
## What changes were proposed in this pull request?
Adds a new optimisation rule 'ReplaceExceptWithNotFilter' that replaces Except logical with Filter operator and schedule it before applying 'ReplaceExceptWithAntiJoin' rule. This way we can avoid expensive join operation if one or both of the datasets of the Except operation are fully derived out of Filters from a same parent.
## How was this patch tested?
The patch is tested locally using spark-shell + unit test.
Author: Sathiya <sathiya.kumar@polytechnique.edu>
Closes#19451 from sathiyapk/SPARK-22181-optimize-exceptWithFilter.
## What changes were proposed in this pull request?
SPARK-18016 introduced `NestedClass` to avoid that the many methods generated by `splitExpressions` contribute to the outer class' constant pool, making it growing too much. Unfortunately, despite their definition is stored in the `NestedClass`, they all are invoked in the outer class and for each method invocation, there are two entries added to the constant pool: a `Methodref` and a `Utf8` entry (you can easily check this compiling a simple sample class with `janinoc` and looking at its Constant Pool). This limits the scalability of the solution with very large methods which are split in a lot of small ones. This means that currently we are generating classes like this one:
```
class SpecificUnsafeProjection extends org.apache.spark.sql.catalyst.expressions.UnsafeProjection {
...
public UnsafeRow apply(InternalRow i) {
rowWriter.zeroOutNullBytes();
apply_0(i);
apply_1(i);
...
nestedClassInstance.apply_862(i);
nestedClassInstance.apply_863(i);
...
nestedClassInstance1.apply_1612(i);
nestedClassInstance1.apply_1613(i);
...
}
...
private class NestedClass {
private void apply_862(InternalRow i) { ... }
private void apply_863(InternalRow i) { ... }
...
}
private class NestedClass1 {
private void apply_1612(InternalRow i) { ... }
private void apply_1613(InternalRow i) { ... }
...
}
}
```
This PR reduce the Constant Pool size of the outer class by adding a new method to each nested class: in this method we invoke all the small methods generated by `splitExpression` in that nested class. In this way, in the outer class there is only one method invocation per nested class, reducing by orders of magnitude the entries in its constant pool because of method invocations. This means that after the patch the generated code becomes:
```
class SpecificUnsafeProjection extends org.apache.spark.sql.catalyst.expressions.UnsafeProjection {
...
public UnsafeRow apply(InternalRow i) {
rowWriter.zeroOutNullBytes();
apply_0(i);
apply_1(i);
...
nestedClassInstance.apply(i);
nestedClassInstance1.apply(i);
...
}
...
private class NestedClass {
private void apply_862(InternalRow i) { ... }
private void apply_863(InternalRow i) { ... }
...
private void apply(InternalRow i) {
apply_862(i);
apply_863(i);
...
}
}
private class NestedClass1 {
private void apply_1612(InternalRow i) { ... }
private void apply_1613(InternalRow i) { ... }
...
private void apply(InternalRow i) {
apply_1612(i);
apply_1613(i);
...
}
}
}
```
## How was this patch tested?
Added UT and existing UTs
Author: Marco Gaido <mgaido@hortonworks.com>
Author: Marco Gaido <marcogaido91@gmail.com>
Closes#19480 from mgaido91/SPARK-22226.
## What changes were proposed in this pull request?
This PR is to clean the related codes majorly based on the today's code review on https://github.com/apache/spark/pull/19559
## How was this patch tested?
N/A
Author: gatorsmile <gatorsmile@gmail.com>
Closes#19585 from gatorsmile/trivialFixes.
## What changes were proposed in this pull request?
Adding date and timestamp support with Arrow for `toPandas()` and `pandas_udf`s. Timestamps are stored in Arrow as UTC and manifested to the user as timezone-naive localized to the Python system timezone.
## How was this patch tested?
Added Scala tests for date and timestamp types under ArrowConverters, ArrowUtils, and ArrowWriter suites. Added Python tests for `toPandas()` and `pandas_udf`s with date and timestamp types.
Author: Bryan Cutler <cutlerb@gmail.com>
Author: Takuya UESHIN <ueshin@databricks.com>
Closes#18664 from BryanCutler/arrow-date-timestamp-SPARK-21375.
## What changes were proposed in this pull request?
It's possible that users create a `Dataset`, and call `collect` of this `Dataset` in many threads at the same time. Currently `Dataset#collect` just call `encoder.fromRow` to convert spark rows to objects of type T, and this encoder is per-dataset. This means `Dataset#collect` is not thread-safe, because the encoder uses a projection to output the object to a re-usable row.
This PR fixes this problem, by creating a new projection when calling `Dataset#collect`, so that we have the re-usable row for each method call, instead of each Dataset.
## How was this patch tested?
N/A
Author: Wenchen Fan <wenchen@databricks.com>
Closes#19577 from cloud-fan/encoder.
## What changes were proposed in this pull request?
This is a regression introduced by #14207. After Spark 2.1, we store the inferred schema when creating the table, to avoid inferring schema again at read path. However, there is one special case: overlapped columns between data and partition. For this case, it breaks the assumption of table schema that there is on ovelap between data and partition schema, and partition columns should be at the end. The result is, for Spark 2.1, the table scan has incorrect schema that puts partition columns at the end. For Spark 2.2, we add a check in CatalogTable to validate table schema, which fails at this case.
To fix this issue, a simple and safe approach is to fallback to old behavior when overlapeed columns detected, i.e. store empty schema in metastore.
## How was this patch tested?
new regression test
Author: Wenchen Fan <wenchen@databricks.com>
Closes#19579 from cloud-fan/bug2.
## What changes were proposed in this pull request?
Add a flag "spark.sql.files.ignoreMissingFiles" to parallel the existing flag "spark.sql.files.ignoreCorruptFiles".
## How was this patch tested?
new unit test
Author: Jose Torres <jose@databricks.com>
Closes#19581 from joseph-torres/SPARK-22366.
## What changes were proposed in this pull request?
Support unit tests of external code (i.e., applications that use spark) using scalatest that don't want to use FunSuite. SharedSparkContext already supports this, but SharedSQLContext does not.
I've introduced SharedSparkSession as a parent to SharedSQLContext, written in a way that it does support all scalatest styles.
## How was this patch tested?
There are three new unit test suites added that just test using FunSpec, FlatSpec, and WordSpec.
Author: Nathan Kronenfeld <nicole.oresme@gmail.com>
Closes#19529 from nkronenfeld/alternative-style-tests-2.
## What changes were proposed in this pull request?
Removed one unused method.
## How was this patch tested?
Existing tests.
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#19508 from viirya/SPARK-20783-followup.
## What changes were proposed in this pull request?
Scala 2.12's `Future` defines two new methods to implement, `transform` and `transformWith`. These can be implemented naturally in Spark's `FutureAction` extension and subclasses, but, only in terms of the new methods that don't exist in Scala 2.11. To support both at the same time, reflection is used to implement these.
## How was this patch tested?
Existing tests.
Author: Sean Owen <sowen@cloudera.com>
Closes#19561 from srowen/SPARK-22322.
## What changes were proposed in this pull request?
Rewritten error message for clarity. Added extra information in case of attribute name collision, hinting the user to double-check referencing two different tables
## How was this patch tested?
No functional changes, only final message has changed. It has been tested manually against the situation proposed in the JIRA ticket. Automated tests in repository pass.
This PR is original work from me and I license this work to the Spark project
Author: Ruben Berenguel Montoro <ruben@mostlymaths.net>
Author: Ruben Berenguel Montoro <ruben@dreamattic.com>
Author: Ruben Berenguel <ruben@mostlymaths.net>
Closes#17100 from rberenguel/SPARK-13947-error-message.
## What changes were proposed in this pull request?
We enable table cache `InMemoryTableScanExec` to provide `ColumnarBatch` now. But the cached batches are retrieved without pruning. In this case, we still need to do partition batch pruning.
## How was this patch tested?
Existing tests.
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#19569 from viirya/SPARK-22348.
## What changes were proposed in this pull request?
For performance reason, we should resolve in operation on an empty list as false in the optimizations phase, ad discussed in #19522.
## How was this patch tested?
Added UT
cc gatorsmile
Author: Marco Gaido <marcogaido91@gmail.com>
Author: Marco Gaido <mgaido@hortonworks.com>
Closes#19523 from mgaido91/SPARK-22301.
## What changes were proposed in this pull request?
Adjust Spark download in test to use Apache mirrors and respect its load balancer, and use Spark 2.1.2. This follows on a recent PMC list thread about removing the cloudfront download rather than update it further.
## How was this patch tested?
Existing tests.
Author: Sean Owen <sowen@cloudera.com>
Closes#19564 from srowen/SPARK-21936.2.
## What changes were proposed in this pull request?
During [SPARK-21912](https://issues.apache.org/jira/browse/SPARK-21912), we skipped testing 'ADD COLUMNS' on ORC tables due to ORC limitation. Since [SPARK-21929](https://issues.apache.org/jira/browse/SPARK-21929) is resolved now, we can test both `ORC` and `PARQUET` completely.
## How was this patch tested?
Pass the updated test case.
Author: Dongjoon Hyun <dongjoon@apache.org>
Closes#19562 from dongjoon-hyun/SPARK-21912-2.
## What changes were proposed in this pull request?
The current implementation of `ApproxCountDistinctForIntervals` is `ImperativeAggregate`. The number of `aggBufferAttributes` is the number of total words in the hllppHelper array. Each hllppHelper has 52 words by default relativeSD.
Since this aggregate function is used in equi-height histogram generation, and the number of buckets in histogram is usually hundreds, the number of `aggBufferAttributes` can easily reach tens of thousands or even more.
This leads to a huge method in codegen and causes error:
```
org.codehaus.janino.JaninoRuntimeException: Code of method "apply(Lorg/apache/spark/sql/catalyst/InternalRow;)Lorg/apache/spark/sql/catalyst/expressions/UnsafeRow;" of class "org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection" grows beyond 64 KB.
```
Besides, huge generated methods also result in performance regression.
In this PR, we change its implementation to `TypedImperativeAggregate`. After the fix, `ApproxCountDistinctForIntervals` can deal with more than thousands endpoints without throwing codegen error, and improve performance from `20 sec` to `2 sec` in a test case of 500 endpoints.
## How was this patch tested?
Test by an added test case and existing tests.
Author: Zhenhua Wang <wangzhenhua@huawei.com>
Closes#19506 from wzhfy/change_forIntervals_typedAgg.
TIMESTAMP (-101), BINARY_DOUBLE (101) and BINARY_FLOAT (100) are handled in OracleDialect
## What changes were proposed in this pull request?
When a oracle table contains columns whose type is BINARY_FLOAT or BINARY_DOUBLE, spark sql fails to load a table with SQLException
```
java.sql.SQLException: Unsupported type 101
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$getCatalystType(JdbcUtils.scala:235)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$8.apply(JdbcUtils.scala:292)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$8.apply(JdbcUtils.scala:292)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.getSchema(JdbcUtils.scala:291)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:64)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation.<init>(JDBCRelation.scala:113)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:47)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:306)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:146)
```
## How was this patch tested?
I updated a UT which covers type conversion test for types (-101, 100, 101), on top of that I tested this change against actual table with those columns and it was able to read and write to the table.
Author: Kohki Nishio <taroplus@me.com>
Closes#19548 from taroplus/oracle_sql_types_101.
## What changes were proposed in this pull request?
When [SPARK-19261](https://issues.apache.org/jira/browse/SPARK-19261) implements `ALTER TABLE ADD COLUMNS`, ORC data source is omitted due to SPARK-14387, SPARK-16628, and SPARK-18355. Now, those issues are fixed and Spark 2.3 is [using Spark schema to read ORC table instead of ORC file schema](e6e36004af). This PR enables `ALTER TABLE ADD COLUMNS` for ORC data source.
## How was this patch tested?
Pass the updated and added test cases.
Author: Dongjoon Hyun <dongjoon@apache.org>
Closes#19545 from dongjoon-hyun/SPARK-21929.
## What changes were proposed in this pull request?
Simplifies the test cases that were added in the PR https://github.com/apache/spark/pull/18270.
## How was this patch tested?
N/A
Author: gatorsmile <gatorsmile@gmail.com>
Closes#19546 from gatorsmile/backportSPARK-21055.
## What changes were proposed in this pull request?
This is a follow-up PR of https://github.com/apache/spark/pull/17633.
This PR is to add a conf `spark.sql.hive.advancedPartitionPredicatePushdown.enabled`, which can be used to turn the enhancement off.
## How was this patch tested?
Add a test case
Author: gatorsmile <gatorsmile@gmail.com>
Closes#19547 from gatorsmile/Spark20331FollowUp.
## What changes were proposed in this pull request?
Plan equality should be computed by `canonicalized`, so we can remove unnecessary `hashCode` and `equals` methods.
## How was this patch tested?
Existing tests.
Author: Zhenhua Wang <wangzhenhua@huawei.com>
Closes#19539 from wzhfy/remove_equals.
## What changes were proposed in this pull request?
This is a follow-up of #18732.
This pr modifies `GroupedData.apply()` method to convert pandas udf to grouped udf implicitly.
## How was this patch tested?
Exisiting tests.
Author: Takuya UESHIN <ueshin@databricks.com>
Closes#19517 from ueshin/issues/SPARK-20396/fup2.
## What changes were proposed in this pull request?
spark does not support grouping__id, it has grouping_id() instead.
But it is not convenient for hive user to change to spark-sql
so this pr is to replace grouping__id with grouping_id()
hive user need not to alter their scripts
## How was this patch tested?
test with SQLQuerySuite.scala
Author: CenYuhai <yuhai.cen@ele.me>
Closes#18270 from cenyuhai/SPARK-21055.
## What changes were proposed in this pull request?
This is a very trivial PR, simply marking `strategies` in `SparkPlanner` with the `override` keyword for clarity since it is overriding `strategies` in `QueryPlanner` two levels up in the class hierarchy. I was reading through the code to learn a bit and got stuck on this fact for a little while, so I figured this may be helpful so that another developer new to the project doesn't get stuck where I was.
I did not make a JIRA ticket for this because it is so trivial, but I'm happy to do so to adhere to the contribution guidelines if required.
## How was this patch tested?
(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)
Please review http://spark.apache.org/contributing.html before opening a pull request.
Author: Eric Perry <eric@ericjperry.com>
Closes#19537 from ericjperry/override-strategies.
## What changes were proposed in this pull request?
A working prototype for data source v2 write path.
The writing framework is similar to the reading framework. i.e. `WriteSupport` -> `DataSourceV2Writer` -> `DataWriterFactory` -> `DataWriter`.
Similar to the `FileCommitPotocol`, the writing API has job and task level commit/abort to support the transaction.
## How was this patch tested?
new tests
Author: Wenchen Fan <wenchen@databricks.com>
Closes#19269 from cloud-fan/data-source-v2-write.
## What changes were proposed in this pull request?
Fix java style issues
## How was this patch tested?
Run `./dev/lint-java` locally since it's not run on Jenkins
Author: Andrew Ash <andrew@andrewash.com>
Closes#19486 from ash211/aash/fix-lint-java.
Hive delegation tokens are only needed when the Spark driver has no access
to the kerberos TGT. That happens only in two situations:
- when using a proxy user
- when using cluster mode without a keytab
This change modifies the Hive provider so that it only generates delegation
tokens in those situations, and tweaks the YARN AM so that it makes the proper
user visible to the Hive code when running with keytabs, so that the TGT
can be used instead of a delegation token.
The effect of this change is that now it's possible to initialize multiple,
non-concurrent SparkContext instances in the same JVM. Before, the second
invocation would fail to fetch a new Hive delegation token, which then could
make the second (or third or...) application fail once the token expired.
With this change, the TGT will be used to authenticate to the HMS instead.
This change also avoids polluting the current logged in user's credentials
when launching applications. The credentials are copied only when running
applications as a proxy user. This makes it possible to implement SPARK-11035
later, where multiple threads might be launching applications, and each app
should have its own set of credentials.
Tested by verifying HDFS and Hive access in following scenarios:
- client and cluster mode
- client and cluster mode with proxy user
- client and cluster mode with principal / keytab
- long-running cluster app with principal / keytab
- pyspark app that creates (and stops) multiple SparkContext instances
through its lifetime
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#19509 from vanzin/SPARK-22290.
## What changes were proposed in this pull request?
This PR addresses the comments by gatorsmile on [the previous PR](https://github.com/apache/spark/pull/19494).
## How was this patch tested?
Previous UT and added UT.
Author: Marco Gaido <marcogaido91@gmail.com>
Closes#19522 from mgaido91/SPARK-22249_FOLLOWUP.
## What changes were proposed in this pull request?
To let the same aggregate function that appear multiple times in an Aggregate be evaluated only once, we need to deduplicate the aggregate expressions. The original code was trying to use a "distinct" call to get a set of aggregate expressions, but did not work, since the "distinct" did not compare semantic equality. And even if it did, further work should be done in result expression rewriting.
In this PR, I changed the "set" to a map mapping the semantic identity of a aggregate expression to itself. Thus, later on, when rewriting result expressions (i.e., output expressions), the aggregate expression reference can be fixed.
## How was this patch tested?
Added a new test in SQLQuerySuite
Author: maryannxue <maryann.xue@gmail.com>
Closes#19488 from maryannxue/spark-22266.
## What changes were proposed in this pull request?
Complex state-updating and/or timeout-handling logic in mapGroupsWithState functions may require taking decisions based on the current event-time watermark and/or processing time. Currently, you can use the SQL function `current_timestamp` to get the current processing time, but it needs to be passed inserted in every row with a select, and then passed through the encoder, which isn't efficient. Furthermore, there is no way to get the current watermark.
This PR exposes both of them through the GroupState API.
Additionally, it also cleans up some of the GroupState docs.
## How was this patch tested?
New unit tests
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes#19495 from tdas/SPARK-22278.
## What changes were proposed in this pull request?
In Average.scala, it has
```
override lazy val evaluateExpression = child.dataType match {
case DecimalType.Fixed(p, s) =>
// increase the precision and scale to prevent precision loss
val dt = DecimalType.bounded(p + 14, s + 4)
Cast(Cast(sum, dt) / Cast(count, dt), resultType)
case _ =>
Cast(sum, resultType) / Cast(count, resultType)
}
def setChild (newchild: Expression) = {
child = newchild
}
```
It is possible that Cast(count, dt), resultType) will make the precision of the decimal number bigger than 38, and this causes over flow. Since count is an integer and doesn't need a scale, I will cast it using DecimalType.bounded(38,0)
## How was this patch tested?
In DataFrameSuite, I will add a test case.
Please review http://spark.apache.org/contributing.html before opening a pull request.
Author: Huaxin Gao <huaxing@us.ibm.com>
Closes#19496 from huaxingao/spark-22271.
## What changes were proposed in this pull request?
Evaluate one-sided conditions early in stream-stream joins.
This is in addition to normal filter pushdown, because integrating it with the join logic allows it to take place in outer join scenarios. This means that rows which can never satisfy the join condition won't clog up the state.
## How was this patch tested?
new unit tests
Author: Jose Torres <jose@databricks.com>
Closes#19452 from joseph-torres/SPARK-22136.
## What changes were proposed in this pull request?
#### before
```scala
scala> val words = spark.read.textFile("README.md").flatMap(_.split(" "))
words: org.apache.spark.sql.Dataset[String] = [value: string]
scala> val grouped = words.groupByKey(identity)
grouped: org.apache.spark.sql.KeyValueGroupedDataset[String,String] = org.apache.spark.sql.KeyValueGroupedDataset65214862
```
#### after
```scala
scala> val words = spark.read.textFile("README.md").flatMap(_.split(" "))
words: org.apache.spark.sql.Dataset[String] = [value: string]
scala> val grouped = words.groupByKey(identity)
grouped: org.apache.spark.sql.KeyValueGroupedDataset[String,String] = [key: [value: string], value: [value: string]]
```
## How was this patch tested?
existing ut
cc gatorsmile cloud-fan
Author: Kent Yao <yaooqinn@hotmail.com>
Closes#19363 from yaooqinn/minor-dataset-tostring.
## What changes were proposed in this pull request?
As pointed out in the JIRA, there is a bug which causes an exception to be thrown if `isin` is called with an empty list on a cached DataFrame. The PR fixes it.
## How was this patch tested?
Added UT.
Author: Marco Gaido <marcogaido91@gmail.com>
Closes#19494 from mgaido91/SPARK-22249.
## What changes were proposed in this pull request?
This PR aims to improve **StatisticsSuite** to test `convertMetastore` configuration properly. Currently, some test logic in `test statistics of LogicalRelation converted from Hive serde tables` depends on the default configuration. New test case is shorter and covers both(true/false) cases explicitly.
This test case was previously modified by SPARK-17410 and SPARK-17284 in Spark 2.3.0.
- a2460be9c3 (diff-1c464c86b68c2d0b07e73b7354e74ce7R443)
## How was this patch tested?
Pass the Jenkins with the improved test case.
Author: Dongjoon Hyun <dongjoon@apache.org>
Closes#19500 from dongjoon-hyun/SPARK-22280.
## What changes were proposed in this pull request?
This PR aims to
- Rename `OrcRelation` to `OrcFileFormat` object.
- Replace `OrcRelation.ORC_COMPRESSION` with `org.apache.orc.OrcConf.COMPRESS`. Since [SPARK-21422](https://issues.apache.org/jira/browse/SPARK-21422), we can use `OrcConf.COMPRESS` instead of Hive's.
```scala
// The references of Hive's classes will be minimized.
val ORC_COMPRESSION = "orc.compress"
```
## How was this patch tested?
Pass the Jenkins with the existing and updated test cases.
Author: Dongjoon Hyun <dongjoon@apache.org>
Closes#19502 from dongjoon-hyun/SPARK-22282.
## What changes were proposed in this pull request?
`ObjectHashAggregateExec` should override `outputPartitioning` in order to avoid unnecessary shuffle.
## How was this patch tested?
Added Jenkins test.
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#19501 from viirya/SPARK-22223.
## What changes were proposed in this pull request?
In EnsureStatefulOpPartitioning, we check that the inputRDD to a SparkPlan has the expected partitioning for Streaming Stateful Operators. The problem is that we are not allowed to access this information during planning.
The reason we added that check was because CoalesceExec could actually create RDDs with 0 partitions. We should fix it such that when CoalesceExec says that there is a SinglePartition, there is in fact an inputRDD of 1 partition instead of 0 partitions.
## How was this patch tested?
Regression test in StreamingQuerySuite
Author: Burak Yavuz <brkyvz@gmail.com>
Closes#19467 from brkyvz/stateful-op.
## What changes were proposed in this pull request?
When fixing schema field names using escape characters with `addReferenceMinorObj()` at [SPARK-18952](https://issues.apache.org/jira/browse/SPARK-18952) (#16361), double-quotes around the names were remained and the names become something like `"((java.lang.String) references[1])"`.
```java
/* 055 */ private int maxSteps = 2;
/* 056 */ private int numRows = 0;
/* 057 */ private org.apache.spark.sql.types.StructType keySchema = new org.apache.spark.sql.types.StructType().add("((java.lang.String) references[1])", org.apache.spark.sql.types.DataTypes.StringType);
/* 058 */ private org.apache.spark.sql.types.StructType valueSchema = new org.apache.spark.sql.types.StructType().add("((java.lang.String) references[2])", org.apache.spark.sql.types.DataTypes.LongType);
/* 059 */ private Object emptyVBase;
```
We should remove the double-quotes to refer the values in `references` properly:
```java
/* 055 */ private int maxSteps = 2;
/* 056 */ private int numRows = 0;
/* 057 */ private org.apache.spark.sql.types.StructType keySchema = new org.apache.spark.sql.types.StructType().add(((java.lang.String) references[1]), org.apache.spark.sql.types.DataTypes.StringType);
/* 058 */ private org.apache.spark.sql.types.StructType valueSchema = new org.apache.spark.sql.types.StructType().add(((java.lang.String) references[2]), org.apache.spark.sql.types.DataTypes.LongType);
/* 059 */ private Object emptyVBase;
```
## How was this patch tested?
Existing tests.
Author: Takuya UESHIN <ueshin@databricks.com>
Closes#19491 from ueshin/issues/SPARK-22273.
## What changes were proposed in this pull request?
`BasicWriteTaskStatsTracker.getFileSize()` to catch `FileNotFoundException`, log info and then return 0 as a file size.
This ensures that if a newly created file isn't visible due to the store not always having create consistency, the metric collection doesn't cause the failure.
## How was this patch tested?
New test suite included, `BasicWriteTaskStatsTrackerSuite`. This not only checks the resilience to missing files, but verifies the existing logic as to how file statistics are gathered.
Note that in the current implementation
1. if you call `Tracker..getFinalStats()` more than once, the file size count will increase by size of the last file. This could be fixed by clearing the filename field inside `getFinalStats()` itself.
2. If you pass in an empty or null string to `Tracker.newFile(path)` then IllegalArgumentException is raised, but only in `getFinalStats()`, rather than in `newFile`. There's a test for this behaviour in the new suite, as it verifies that only FNFEs get swallowed.
Author: Steve Loughran <stevel@hortonworks.com>
Closes#18979 from steveloughran/cloud/SPARK-21762-missing-files-in-metrics.
## What changes were proposed in this pull request?
This PR changes `keyWithIndexToNumValues` to `keyWithIndexToValue`.
There will be directories on HDFS named with this `keyWithIndexToNumValues`. So if we ever want to fix this, let's fix it now.
## How was this patch tested?
existing unit test cases.
Author: Liwei Lin <lwlin7@gmail.com>
Closes#19435 from lw-lin/keyWithIndex.
## What changes were proposed in this pull request?
This is a minor folllowup of #19474 .
#19474 partially reverted #18064 but accidentally introduced a behavior change. `Command` extended `LogicalPlan` before #18064 , but #19474 made it extend `LeafNode`. This is an internal behavior change as now all `Command` subclasses can't define children, and they have to implement `computeStatistic` method.
This PR fixes this by making `Command` extend `LogicalPlan`
## How was this patch tested?
N/A
Author: Wenchen Fan <wenchen@databricks.com>
Closes#19493 from cloud-fan/minor.
## What changes were proposed in this pull request?
This is an effort to reduce the difference between Hive and Spark. Spark supports case-sensitivity in columns. Especially, for Struct types, with `spark.sql.caseSensitive=true`, the following is supported.
```scala
scala> sql("select named_struct('a', 1, 'A', 2).a").show
+--------------------------+
|named_struct(a, 1, A, 2).a|
+--------------------------+
| 1|
+--------------------------+
scala> sql("select named_struct('a', 1, 'A', 2).A").show
+--------------------------+
|named_struct(a, 1, A, 2).A|
+--------------------------+
| 2|
+--------------------------+
```
And vice versa, with `spark.sql.caseSensitive=false`, the following is supported.
```scala
scala> sql("select named_struct('a', 1).A, named_struct('A', 1).a").show
+--------------------+--------------------+
|named_struct(a, 1).A|named_struct(A, 1).a|
+--------------------+--------------------+
| 1| 1|
+--------------------+--------------------+
```
However, types are considered different. For example, SET operations fail.
```scala
scala> sql("SELECT named_struct('a',1) union all (select named_struct('A',2))").show
org.apache.spark.sql.AnalysisException: Union can only be performed on tables with the compatible column types. struct<A:int> <> struct<a:int> at the first column of the second table;;
'Union
:- Project [named_struct(a, 1) AS named_struct(a, 1)#57]
: +- OneRowRelation$
+- Project [named_struct(A, 2) AS named_struct(A, 2)#58]
+- OneRowRelation$
```
This PR aims to support case-insensitive type equality. For example, in Set operation, the above operation succeed when `spark.sql.caseSensitive=false`.
```scala
scala> sql("SELECT named_struct('a',1) union all (select named_struct('A',2))").show
+------------------+
|named_struct(a, 1)|
+------------------+
| [1]|
| [2]|
+------------------+
```
## How was this patch tested?
Pass the Jenkins with a newly add test case.
Author: Dongjoon Hyun <dongjoon@apache.org>
Closes#18460 from dongjoon-hyun/SPARK-21247.
## What changes were proposed in this pull request?
Before Hive 2.0, ORC File schema has invalid column names like `_col1` and `_col2`. This is a well-known limitation and there are several Apache Spark issues with `spark.sql.hive.convertMetastoreOrc=true`. This PR ignores ORC File schema and use Spark schema.
## How was this patch tested?
Pass the newly added test case.
Author: Dongjoon Hyun <dongjoon@apache.org>
Closes#19470 from dongjoon-hyun/SPARK-18355.
## What changes were proposed in this pull request?
For non-deterministic expressions, they should be considered as not contained in the [[ExpressionSet]].
This is consistent with how we define `semanticEquals` between two expressions.
Otherwise, combining expressions will remove non-deterministic expressions which should be reserved.
E.g.
Combine filters of
```scala
testRelation.where(Rand(0) > 0.1).where(Rand(0) > 0.1)
```
should result in
```scala
testRelation.where(Rand(0) > 0.1 && Rand(0) > 0.1)
```
## How was this patch tested?
Unit test
Author: Wang Gengliang <ltnwgl@gmail.com>
Closes#19475 from gengliangwang/non-deterministic-expressionSet.
## What changes were proposed in this pull request?
Due to optimizer removing some unnecessary aliases, the logical and physical plan may have different output attribute ids. FileFormatWriter should handle this when creating the physical sort node.
## How was this patch tested?
new regression test.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#19483 from cloud-fan/bug2.
## What changes were proposed in this pull request?
The method `deterministic` is frequently called in optimizer.
Refactor `deterministic` as lazy value, in order to avoid redundant computations.
## How was this patch tested?
Simple benchmark test over TPC-DS queries, run time from query string to optimized plan(continuous 20 runs, and get the average of last 5 results):
Before changes: 12601 ms
After changes: 11993ms
This is 4.8% performance improvement.
Also run test with Unit test.
Author: Wang Gengliang <ltnwgl@gmail.com>
Closes#19478 from gengliangwang/deterministicAsLazyVal.
## What changes were proposed in this pull request?
`ParquetFileFormat` to relax its requirement of output committer class from `org.apache.parquet.hadoop.ParquetOutputCommitter` or subclass thereof (and so implicitly Hadoop `FileOutputCommitter`) to any committer implementing `org.apache.hadoop.mapreduce.OutputCommitter`
This enables output committers which don't write to the filesystem the way `FileOutputCommitter` does to save parquet data from a dataframe: at present you cannot do this.
Before a committer which isn't a subclass of `ParquetOutputCommitter`, it checks to see if the context has requested summary metadata by setting `parquet.enable.summary-metadata`. If true, and the committer class isn't a parquet committer, it raises a RuntimeException with an error message.
(It could downgrade, of course, but raising an exception makes it clear there won't be an summary. It also makes the behaviour testable.)
Note that `SQLConf` already states that any `OutputCommitter` can be used, but that typically it's a subclass of ParquetOutputCommitter. That's not currently true. This patch will make the code consistent with the docs, adding tests to verify,
## How was this patch tested?
The patch includes a test suite, `ParquetCommitterSuite`, with a new committer, `MarkingFileOutputCommitter` which extends `FileOutputCommitter` and writes a marker file in the destination directory. The presence of the marker file can be used to verify the new committer was used. The tests then try the combinations of Parquet committer summary/no-summary and marking committer summary/no-summary.
| committer | summary | outcome |
|-----------|---------|---------|
| parquet | true | success |
| parquet | false | success |
| marking | false | success with marker |
| marking | true | exception |
All tests are happy.
Author: Steve Loughran <stevel@hortonworks.com>
Closes#19448 from steveloughran/cloud/SPARK-22217-committer.
## What changes were proposed in this pull request?
Adding the code for setting 'aggregate time' metric to non-codegen path in HashAggregateExec and to ObjectHashAggregateExces.
## How was this patch tested?
Tested manually.
Author: Ala Luszczak <ala@databricks.com>
Closes#19473 from ala/fix-agg-time.
## What changes were proposed in this pull request?
As we discussed in https://github.com/apache/spark/pull/19136#discussion_r137023744 , we should push down operators to data source before planning, so that data source can report statistics more accurate.
This PR also includes some cleanup for the read path.
## How was this patch tested?
existing tests.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#19424 from cloud-fan/follow.
## What changes were proposed in this pull request?
In https://github.com/apache/spark/pull/18064, we allowed `RunnableCommand` to have children in order to fix some UI issues. Then we made `InsertIntoXXX` commands take the input `query` as a child, when we do the actual writing, we just pass the physical plan to the writer(`FileFormatWriter.write`).
However this is problematic. In Spark SQL, optimizer and planner are allowed to change the schema names a little bit. e.g. `ColumnPruning` rule will remove no-op `Project`s, like `Project("A", Scan("a"))`, and thus change the output schema from "<A: int>" to `<a: int>`. When it comes to writing, especially for self-description data format like parquet, we may write the wrong schema to the file and cause null values at the read path.
Fortunately, in https://github.com/apache/spark/pull/18450 , we decided to allow nested execution and one query can map to multiple executions in the UI. This releases the major restriction in #18604 , and now we don't have to take the input `query` as child of `InsertIntoXXX` commands.
So the fix is simple, this PR partially revert #18064 and make `InsertIntoXXX` commands leaf nodes again.
## How was this patch tested?
new regression test
Author: Wenchen Fan <wenchen@databricks.com>
Closes#19474 from cloud-fan/bug.
## What changes were proposed in this pull request?
Implement StreamingRelation.computeStats to fix explain
## How was this patch tested?
- unit tests: `StreamingRelation.computeStats` and `StreamingExecutionRelation.computeStats`.
- regression tests: `explain join with a normal source` and `explain join with MemoryStream`.
Author: Shixiong Zhu <zsxwing@gmail.com>
Closes#19465 from zsxwing/SPARK-21988.
## What changes were proposed in this pull request?
Currently percentile_approx never returns the first element when percentile is in (relativeError, 1/N], where relativeError default 1/10000, and N is the total number of elements. But ideally, percentiles in [0, 1/N] should all return the first element as the answer.
For example, given input data 1 to 10, if a user queries 10% (or even less) percentile, it should return 1, because the first value 1 already reaches 10%. Currently it returns 2.
Based on the paper, targetError is not rounded up, and searching index should start from 0 instead of 1. By following the paper, we should be able to fix the cases mentioned above.
## How was this patch tested?
Added a new test case and fix existing test cases.
Author: Zhenhua Wang <wzh_zju@163.com>
Closes#19438 from wzhfy/improve_percentile_approx.
## What changes were proposed in this pull request?
Current `CodeGeneraor.splitExpressions` splits statements into methods if the total length of statements is more than 1024 characters. The length may include comments or empty line.
This PR excludes comment or empty line from the length to reduce the number of generated methods in a class, by using `CodeFormatter.stripExtraNewLinesAndComments()` method.
## How was this patch tested?
Existing tests
Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>
Closes#18966 from kiszk/SPARK-21751.
This change adds a new SQL config key that is equivalent to SparkContext's
"spark.extraListeners", allowing users to register QueryExecutionListener
instances through the Spark configuration system instead of having to
explicitly do it in code.
The code used by SparkContext to implement the feature was refactored into
a helper method in the Utils class, and SQL's ExecutionListenerManager was
modified to use it to initialize listener declared in the configuration.
Unit tests were added to verify all the new functionality.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#19309 from vanzin/SPARK-19558.
## What changes were proposed in this pull request?
This PR adds an apply() function on df.groupby(). apply() takes a pandas udf that is a transformation on `pandas.DataFrame` -> `pandas.DataFrame`.
Static schema
-------------------
```
schema = df.schema
pandas_udf(schema)
def normalize(df):
df = df.assign(v1 = (df.v1 - df.v1.mean()) / df.v1.std()
return df
df.groupBy('id').apply(normalize)
```
Dynamic schema
-----------------------
**This use case is removed from the PR and we will discuss this as a follow up. See discussion https://github.com/apache/spark/pull/18732#pullrequestreview-66583248**
Another example to use pd.DataFrame dtypes as output schema of the udf:
```
sample_df = df.filter(df.id == 1).toPandas()
def foo(df):
ret = # Some transformation on the input pd.DataFrame
return ret
foo_udf = pandas_udf(foo, foo(sample_df).dtypes)
df.groupBy('id').apply(foo_udf)
```
In interactive use case, user usually have a sample pd.DataFrame to test function `foo` in their notebook. Having been able to use `foo(sample_df).dtypes` frees user from specifying the output schema of `foo`.
Design doc: https://github.com/icexelloss/spark/blob/pandas-udf-doc/docs/pyspark-pandas-udf.md
## How was this patch tested?
* Added GroupbyApplyTest
Author: Li Jin <ice.xelloss@gmail.com>
Author: Takuya UESHIN <ueshin@databricks.com>
Author: Bryan Cutler <cutlerb@gmail.com>
Closes#18732 from icexelloss/groupby-apply-SPARK-20396.
## What changes were proposed in this pull request?
This is a follow-up of #19384.
In the previous pr, only definitions of the config names were modified, but we also need to modify the names in runtime or tests specified as string literal.
## How was this patch tested?
Existing tests but modified the config names.
Author: Takuya UESHIN <ueshin@databricks.com>
Closes#19462 from ueshin/issues/SPARK-22159/fup1.
## What changes were proposed in this pull request?
We should not break the assumption that the length of the allocated byte array is word rounded:
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java#L170
So we want to use `Integer.MAX_VALUE - 15` instead of `Integer.MAX_VALUE - 8` as the upper bound of an allocated byte array.
cc: srowen gatorsmile
## How was this patch tested?
Since the Spark unit test JVM has less than 1GB heap, here we run the test code as a submit job, so it can run on a JVM has 4GB memory.
Please review http://spark.apache.org/contributing.html before opening a pull request.
Author: Feng Liu <fengliu@databricks.com>
Closes#19460 from liufengdb/fix_array_max.
## What changes were proposed in this pull request?
In state store restore, for each row, put the saved state before the row in the iterator instead of after.
This fixes an issue where agg(last('attr)) will forever return the last value of 'attr from the first microbatch.
## How was this patch tested?
new unit test
Author: Jose Torres <jose@databricks.com>
Closes#19461 from joseph-torres/SPARK-22230.
## What changes were proposed in this pull request?
This updates the broadcast join code path to lazily decompress pages and
iterate through UnsafeRows to prevent all rows from being held in memory
while the broadcast table is being built.
## How was this patch tested?
Existing tests.
Author: Ryan Blue <blue@apache.org>
Closes#19394 from rdblue/broadcast-driver-memory.
## What changes were proposed in this pull request?
`monotonically_increasing_id` doesn't work in Structured Streaming. We should throw an exception if a streaming query uses it.
## How was this patch tested?
Added test.
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#19336 from viirya/SPARK-21947.
## What changes were proposed in this pull request?
In this PR we make a few changes to the list hive partitions code, to make the code more extensible.
The following changes are made:
1. In `HiveClientImpl.getPartitions()`, call `client.getPartitions` instead of `shim.getAllPartitions` when `spec` is empty;
2. In `HiveTableScanExec`, previously we always call `listPartitionsByFilter` if the config `metastorePartitionPruning` is enabled, but actually, we'd better call `listPartitions` if `partitionPruningPred` is empty;
3. We should use sessionCatalog instead of SharedState.externalCatalog in `HiveTableScanExec`.
## How was this patch tested?
Tested by existing test cases since this is code refactor, no regression or behavior change is expected.
Author: Xingbo Jiang <xingbo.jiang@databricks.com>
Closes#19444 from jiangxb1987/hivePartitions.
## What changes were proposed in this pull request?
When exceeding `spark.sql.codegen.hugeMethodLimit`, the runtime fallbacks to the Volcano iterator solution. This could cause an infinite loop when `FileSourceScanExec` can use the columnar batch to read the data. This PR is to fix the issue.
## How was this patch tested?
Added a test
Author: gatorsmile <gatorsmile@gmail.com>
Closes#19440 from gatorsmile/testt.
## What changes were proposed in this pull request?
Looks like `FlatMapGroupsInRExec.requiredChildDistribution` didn't consider empty grouping attributes. It should be a problem when running `EnsureRequirements` and `gapply` in R can't work on empty grouping columns.
## How was this patch tested?
Added test.
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#19436 from viirya/fix-flatmapinr-distribution.
## What changes were proposed in this pull request?
Currently, the group state of user-defined-type is encoded as top-level columns in the UnsafeRows stores in the state store. The timeout timestamp is also saved as (when needed) as the last top-level column. Since the group state is serialized to top-level columns, you cannot save "null" as a value of state (setting null in all the top-level columns is not equivalent). So we don't let the user set the timeout without initializing the state for a key. Based on user experience, this leads to confusion.
This PR is to change the row format such that the state is saved as nested columns. This would allow the state to be set to null, and avoid these confusing corner cases.
## How was this patch tested?
Refactored tests.
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes#19416 from tdas/SPARK-22187.
## What changes were proposed in this pull request?
By definition the table name in Spark can be something like `123x`, `25a`, etc., with exceptions for literals like `12L`, `23BD`, etc. However, Spark SQL has a special byte length literal, which stops users to use digits followed by `b`, `k`, `m`, `g` as identifiers.
byte length literal is not a standard sql literal and is only used in the `tableSample` parser rule. This PR move the parsing of byte length literal from lexer to parser, so that users can use it as identifiers.
## How was this patch tested?
regression test
Author: Wenchen Fan <wenchen@databricks.com>
Closes#19392 from cloud-fan/parser-bug.
## What changes were proposed in this pull request?
This pr added code to check actual bytecode size when compiling generated code. In #18810, we added code to give up code compilation and use interpreter execution in `SparkPlan` if the line number of generated functions goes over `maxLinesPerFunction`. But, we already have code to collect metrics for compiled bytecode size in `CodeGenerator` object. So,we could easily reuse the code for this purpose.
## How was this patch tested?
Added tests in `WholeStageCodegenSuite`.
Author: Takeshi Yamamuro <yamamuro@apache.org>
Closes#19083 from maropu/SPARK-21871.
## What changes were proposed in this pull request?
This PR abstracts data compressed by `CompressibleColumnAccessor` using `ColumnVector` in batch method. When `ColumnAccessor.decompress` is called, `ColumnVector` will have uncompressed data. This batch decompress does not use `InternalRow` to reduce the number of memory accesses.
As first step of this implementation, this JIRA supports primitive data types. Another PR will support array and other data types.
This implementation decompress data in batch into uncompressed column batch, as rxin suggested at [here](https://github.com/apache/spark/pull/18468#issuecomment-316914076). Another implementation uses adapter approach [as cloud-fan suggested](https://github.com/apache/spark/pull/18468).
## How was this patch tested?
Added test suites
Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>
Closes#18704 from kiszk/SPARK-20783a.
## What changes were proposed in this pull request?
[SPARK-22193][SQL] Minor typo fix in SortMergeJoinExec. Nothing major, but it bothered me going into.Hence fixing
## How was this patch tested?
existing tests
Author: Rekha Joshi <rekhajoshm@gmail.com>
Author: rjoshi2 <rekhajoshm@gmail.com>
Closes#19422 from rekhajoshm/SPARK-22193.
## What changes were proposed in this pull request?
Allow one-sided outer joins between two streams when a watermark is defined.
## How was this patch tested?
new unit tests
Author: Jose Torres <jose@databricks.com>
Closes#19327 from joseph-torres/outerjoin.
## What changes were proposed in this pull request?
Users could hit `java.lang.NullPointerException` when the tables were created by Hive and the table's owner is `null` that are got from Hive metastore. `DESC EXTENDED` failed with the error:
> SQLExecutionException: java.lang.NullPointerException at scala.collection.immutable.StringOps$.length$extension(StringOps.scala:47) at scala.collection.immutable.StringOps.length(StringOps.scala:47) at scala.collection.IndexedSeqOptimized$class.isEmpty(IndexedSeqOptimized.scala:27) at scala.collection.immutable.StringOps.isEmpty(StringOps.scala:29) at scala.collection.TraversableOnce$class.nonEmpty(TraversableOnce.scala:111) at scala.collection.immutable.StringOps.nonEmpty(StringOps.scala:29) at org.apache.spark.sql.catalyst.catalog.CatalogTable.toLinkedHashMap(interface.scala:300) at org.apache.spark.sql.execution.command.DescribeTableCommand.describeFormattedTableInfo(tables.scala:565) at org.apache.spark.sql.execution.command.DescribeTableCommand.run(tables.scala:543) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:66) at
## How was this patch tested?
Added a unit test case
Author: gatorsmile <gatorsmile@gmail.com>
Closes#19395 from gatorsmile/desc.
## What changes were proposed in this pull request?
The underlying tables of persistent views are not refreshed when users issue the REFRESH TABLE command against the persistent views.
## How was this patch tested?
Added a test case
Author: gatorsmile <gatorsmile@gmail.com>
Closes#19405 from gatorsmile/refreshView.
## What changes were proposed in this pull request?
The definition of `maxRows` in `LocalLimit` operator was simply wrong. This patch introduces a new `maxRowsPerPartition` method and uses that in pruning. The patch also adds more documentation on why we need local limit vs global limit.
Note that this previously has never been a bug because the way the code is structured, but future use of the maxRows could lead to bugs.
## How was this patch tested?
Should be covered by existing test cases.
Closes#18851
Author: gatorsmile <gatorsmile@gmail.com>
Author: Reynold Xin <rxin@databricks.com>
Closes#19393 from gatorsmile/pr-18851.
## What changes were proposed in this pull request?
This pr fixed an overflow issue below in `Dataset.show`:
```
scala> Seq((1, 2), (3, 4)).toDF("a", "b").show(Int.MaxValue)
org.apache.spark.sql.AnalysisException: The limit expression must be equal to or greater than 0, but got -2147483648;;
GlobalLimit -2147483648
+- LocalLimit -2147483648
+- Project [_1#27218 AS a#27221, _2#27219 AS b#27222]
+- LocalRelation [_1#27218, _2#27219]
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:41)
at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:89)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.org$apache$spark$sql$catalyst$analysis$CheckAnalysis$$checkLimitClause(CheckAnalysis.scala:70)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:234)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:80)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
```
## How was this patch tested?
Added tests in `DataFrameSuite`.
Author: Takeshi Yamamuro <yamamuro@apache.org>
Closes#19401 from maropu/MaxValueInShowString.
## What changes were proposed in this pull request?
SPARK-21690 makes one-pass `Imputer` by parallelizing the computation of all input columns. When we transform dataset with `ImputerModel`, we do `withColumn` on all input columns sequentially. We can also do this on all input columns at once by adding a `withColumns` API to `Dataset`.
The new `withColumns` API is for internal use only now.
## How was this patch tested?
Existing tests for `ImputerModel`'s change. Added tests for `withColumns` API.
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#19229 from viirya/SPARK-22001.
## What changes were proposed in this pull request?
Since the current code ignores WITH clauses to check input relations in TPCDS queries, this leads to inaccurate per-row processing time for benchmark results. For example, in `q2`, this fix could catch all the input relations: `web_sales`, `date_dim`, and `catalog_sales` (the current code catches `date_dim` only). The one-third of the TPCDS queries uses WITH clauses, so I think it is worth fixing this.
## How was this patch tested?
Manually checked.
Author: Takeshi Yamamuro <yamamuro@apache.org>
Closes#19344 from maropu/RespectWithInTPCDSBench.
### What changes were proposed in this pull request?
`tempTables` is not right. To be consistent, we need to rename the internal variable names/comments to tempViews in SessionCatalog too.
### How was this patch tested?
N/A
Author: gatorsmile <gatorsmile@gmail.com>
Closes#19117 from gatorsmile/renameTempTablesToTempViews.
## What changes were proposed in this pull request?
Added IMPALA-modified TPCDS queries to TPC-DS query suites.
- Ref: https://github.com/cloudera/impala-tpcds-kit/tree/master/queries
## How was this patch tested?
N/A
Author: gatorsmile <gatorsmile@gmail.com>
Closes#19386 from gatorsmile/addImpalaQueries.
## What changes were proposed in this pull request?
Add comments for specifying the position of batch "Check Cartesian Products", as rxin suggested in https://github.com/apache/spark/pull/19362 .
## How was this patch tested?
Unit test
Author: Wang Gengliang <ltnwgl@gmail.com>
Closes#19379 from gengliangwang/SPARK-22141-followup.
## What changes were proposed in this pull request?
Reading ORC files containing special characters like '%' fails with a FileNotFoundException.
This PR aims to fix the problem.
## How was this patch tested?
Added UT.
Author: Marco Gaido <marcogaido91@gmail.com>
Author: Marco Gaido <mgaido@hortonworks.com>
Closes#19368 from mgaido91/SPARK-22146.
## What changes were proposed in this pull request?
Spark's RangePartitioner hard codes the number of sampling points per partition to be 20. This is sometimes too low. This ticket makes it configurable, via spark.sql.execution.rangeExchange.sampleSizePerPartition, and raises the default in Spark SQL to be 100.
## How was this patch tested?
Added a pretty sophisticated test based on chi square test ...
Author: Reynold Xin <rxin@databricks.com>
Closes#19387 from rxin/SPARK-22160.
## What changes were proposed in this pull request?
spark.sql.execution.arrow.enable and spark.sql.codegen.aggregate.map.twolevel.enable -> enabled
## How was this patch tested?
N/A
Author: Reynold Xin <rxin@databricks.com>
Closes#19384 from rxin/SPARK-22159.
## What changes were proposed in this pull request?
For some reason when we added the Exec suffix to all physical operators, we missed this one. I was looking for this physical operator today and couldn't find it, because I was looking for ExchangeExec.
## How was this patch tested?
This is a simple rename and should be covered by existing tests.
Author: Reynold Xin <rxin@databricks.com>
Closes#19376 from rxin/SPARK-22153.
## What changes were proposed in this pull request?
Now, we are not running TPC-DS queries as regular test cases. Thus, we need to add a test suite using empty tables for ensuring the new code changes will not break them. For example, optimizer/analyzer batches should not exceed the max iteration.
## How was this patch tested?
N/A
Author: gatorsmile <gatorsmile@gmail.com>
Closes#19361 from gatorsmile/tpcdsQuerySuite.
## What changes were proposed in this pull request?
`WriteableColumnVector` does not close its child column vectors. This can create memory leaks for `OffHeapColumnVector` where we do not clean up the memory allocated by a vectors children. This can be especially bad for string columns (which uses a child byte column vector).
## How was this patch tested?
I have updated the existing tests to always use both on-heap and off-heap vectors. Testing and diagnoses was done locally.
Author: Herman van Hovell <hvanhovell@databricks.com>
Closes#19367 from hvanhovell/SPARK-22143.
## What changes were proposed in this pull request?
Currently we use Arrow File format to communicate with Python worker when invoking vectorized UDF but we can use Arrow Stream format.
This pr replaces the Arrow File format with the Arrow Stream format.
## How was this patch tested?
Existing tests.
Author: Takuya UESHIN <ueshin@databricks.com>
Closes#19349 from ueshin/issues/SPARK-22125.
## What changes were proposed in this pull request?
When inferring constraints from children, Join's condition can be simplified as None.
For example,
```
val testRelation = LocalRelation('a.int)
val x = testRelation.as("x")
val y = testRelation.where($"a" === 2 && !($"a" === 2)).as("y")
x.join.where($"x.a" === $"y.a")
```
The plan will become
```
Join Inner
:- LocalRelation <empty>, [a#23]
+- LocalRelation <empty>, [a#224]
```
And the Cartesian products check will throw exception for above plan.
Propagate empty relation before checking Cartesian products, and the issue is resolved.
## How was this patch tested?
Unit test
Author: Wang Gengliang <ltnwgl@gmail.com>
Closes#19362 from gengliangwang/MoveCheckCartesianProducts.
## What changes were proposed in this pull request?
Address PR comments that appeared post-merge, to rename `addExtraCode` to `addInnerClass`,
and not count the size of the inner class to the size of the outer class.
## How was this patch tested?
YOLO.
Author: Juliusz Sompolski <julek@databricks.com>
Closes#19353 from juliuszsompolski/SPARK-22103followup.
## What changes were proposed in this pull request?
We can override `usedInputs` to claim that an operator defers input evaluation. `Sample` and `Limit` are two operators which should claim it but don't. We should do it.
## How was this patch tested?
Existing tests.
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#19345 from viirya/SPARK-22124.
## What changes were proposed in this pull request?
This change disables the use of 0-parameter pandas_udfs due to the API being overly complex and awkward, and can easily be worked around by using an index column as an input argument. Also added doctests for pandas_udfs which revealed bugs for handling empty partitions and using the pandas_udf decorator.
## How was this patch tested?
Reworked existing 0-parameter test to verify error is raised, added doctest for pandas_udf, added new tests for empty partition and decorator usage.
Author: Bryan Cutler <cutlerb@gmail.com>
Closes#19325 from BryanCutler/arrow-pandas_udf-0-param-remove-SPARK-22106.
## What changes were proposed in this pull request?
During TestHiveSparkSession.reset(), which is called after each TestHiveSingleton suite, we now delete and recreate the Hive warehouse directory.
## How was this patch tested?
Ran full suite of tests locally, verified that they pass.
Author: Greg Owen <greg@databricks.com>
Closes#19341 from GregOwen/SPARK-22120.
## What changes were proposed in this pull request?
HashAggregateExec codegen uses two paths for fast hash table and a generic one.
It generates code paths for iterating over both, and both code paths generate the consume code of the parent operator, resulting in that code being expanded twice.
This leads to a long generated function that might be an issue for the compiler (see e.g. SPARK-21603).
I propose to remove the double expansion by generating the consume code in a helper function that can just be called from both iterating loops.
An issue with separating the `consume` code to a helper function was that a number of places relied and assumed on being in the scope of an outside `produce` loop and e.g. use `continue` to jump out.
I replaced such code flows with nested scopes. It is code that should be handled the same by compiler, while getting rid of depending on assumptions that are outside of the `consume`'s own scope.
## How was this patch tested?
Existing test coverage.
Author: Juliusz Sompolski <julek@databricks.com>
Closes#19324 from juliuszsompolski/aggrconsumecodegen.
## What changes were proposed in this pull request?
The `percentile_approx` function previously accepted numeric type input and output double type results.
But since all numeric types, date and timestamp types are represented as numerics internally, `percentile_approx` can support them easily.
After this PR, it supports date type, timestamp type and numeric types as input types. The result type is also changed to be the same as the input type, which is more reasonable for percentiles.
This change is also required when we generate equi-height histograms for these types.
## How was this patch tested?
Added a new test and modified some existing tests.
Author: Zhenhua Wang <wangzhenhua@huawei.com>
Closes#19321 from wzhfy/approx_percentile_support_types.
## What changes were proposed in this pull request?
Enable Scala 2.12 REPL. Fix most remaining issues with 2.12 compilation and warnings, including:
- Selecting Kafka 0.10.1+ for Scala 2.12 and patching over a minor API difference
- Fixing lots of "eta expansion of zero arg method deprecated" warnings
- Resolving the SparkContext.sequenceFile implicits compile problem
- Fixing an odd but valid jetty-server missing dependency in hive-thriftserver
## How was this patch tested?
Existing tests
Author: Sean Owen <sowen@cloudera.com>
Closes#19307 from srowen/Scala212.
## What changes were proposed in this pull request?
This PR proposes to remove `assume` in `Utils.resolveURIs` and replace `assume` to `assert` in `Utils.resolveURI` in the test cases in `UtilsSuite`.
It looks `Utils.resolveURIs` supports multiple but also single paths as input. So, it looks not meaningful to check if the input has `,`.
For the test for `Utils.resolveURI`, I replaced it to `assert` because it looks taking single path and in order to prevent future mistakes when adding more tests here.
For `assume` in `HiveDDLSuite`, it looks it should be `assert` to test at the last
## How was this patch tested?
Fixed unit tests.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#19332 from HyukjinKwon/SPARK-22093.
## What changes were proposed in this pull request?
The implemented `isCascadingTruncateTable` in `AggregatedDialect` is wrong. When no dialect claims cascading, once there is an unknown cascading truncate in the dialects, we should return unknown cascading, instead of false.
## How was this patch tested?
Added test.
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#19286 from viirya/SPARK-21338-followup.
## What changes were proposed in this pull request?
This PR proposes to enhance the documentation for `trim` functions in the function description session.
- Add more `usage`, `arguments` and `examples` for the trim function
- Adjust space in the `usage` session
After the changes, the trim function documentation will look like this:
- `trim`
```trim(str) - Removes the leading and trailing space characters from str.
trim(BOTH trimStr FROM str) - Remove the leading and trailing trimStr characters from str
trim(LEADING trimStr FROM str) - Remove the leading trimStr characters from str
trim(TRAILING trimStr FROM str) - Remove the trailing trimStr characters from str
Arguments:
str - a string expression
trimStr - the trim string characters to trim, the default value is a single space
BOTH, FROM - these are keywords to specify trimming string characters from both ends of the string
LEADING, FROM - these are keywords to specify trimming string characters from the left end of the string
TRAILING, FROM - these are keywords to specify trimming string characters from the right end of the string
Examples:
> SELECT trim(' SparkSQL ');
SparkSQL
> SELECT trim('SL', 'SSparkSQLS');
parkSQ
> SELECT trim(BOTH 'SL' FROM 'SSparkSQLS');
parkSQ
> SELECT trim(LEADING 'SL' FROM 'SSparkSQLS');
parkSQLS
> SELECT trim(TRAILING 'SL' FROM 'SSparkSQLS');
SSparkSQ
```
- `ltrim`
```ltrim
ltrim(str) - Removes the leading space characters from str.
ltrim(trimStr, str) - Removes the leading string contains the characters from the trim string
Arguments:
str - a string expression
trimStr - the trim string characters to trim, the default value is a single space
Examples:
> SELECT ltrim(' SparkSQL ');
SparkSQL
> SELECT ltrim('Sp', 'SSparkSQLS');
arkSQLS
```
- `rtrim`
```rtrim
rtrim(str) - Removes the trailing space characters from str.
rtrim(trimStr, str) - Removes the trailing string which contains the characters from the trim string from the str
Arguments:
str - a string expression
trimStr - the trim string characters to trim, the default value is a single space
Examples:
> SELECT rtrim(' SparkSQL ');
SparkSQL
> SELECT rtrim('LQSa', 'SSparkSQLS');
SSpark
```
This is the trim characters function jira: [trim function](https://issues.apache.org/jira/browse/SPARK-14878)
## How was this patch tested?
Manually tested
```
spark-sql> describe function extended trim;
17/09/22 17:03:04 INFO CodeGenerator: Code generated in 153.026533 ms
Function: trim
Class: org.apache.spark.sql.catalyst.expressions.StringTrim
Usage:
trim(str) - Removes the leading and trailing space characters from `str`.
trim(BOTH trimStr FROM str) - Remove the leading and trailing `trimStr` characters from `str`
trim(LEADING trimStr FROM str) - Remove the leading `trimStr` characters from `str`
trim(TRAILING trimStr FROM str) - Remove the trailing `trimStr` characters from `str`
Extended Usage:
Arguments:
* str - a string expression
* trimStr - the trim string characters to trim, the default value is a single space
* BOTH, FROM - these are keywords to specify trimming string characters from both ends of
the string
* LEADING, FROM - these are keywords to specify trimming string characters from the left
end of the string
* TRAILING, FROM - these are keywords to specify trimming string characters from the right
end of the string
Examples:
> SELECT trim(' SparkSQL ');
SparkSQL
> SELECT trim('SL', 'SSparkSQLS');
parkSQ
> SELECT trim(BOTH 'SL' FROM 'SSparkSQLS');
parkSQ
> SELECT trim(LEADING 'SL' FROM 'SSparkSQLS');
parkSQLS
> SELECT trim(TRAILING 'SL' FROM 'SSparkSQLS');
SSparkSQ
```
```
spark-sql> describe function extended ltrim;
Function: ltrim
Class: org.apache.spark.sql.catalyst.expressions.StringTrimLeft
Usage:
ltrim(str) - Removes the leading space characters from `str`.
ltrim(trimStr, str) - Removes the leading string contains the characters from the trim string
Extended Usage:
Arguments:
* str - a string expression
* trimStr - the trim string characters to trim, the default value is a single space
Examples:
> SELECT ltrim(' SparkSQL ');
SparkSQL
> SELECT ltrim('Sp', 'SSparkSQLS');
arkSQLS
```
```
spark-sql> describe function extended rtrim;
Function: rtrim
Class: org.apache.spark.sql.catalyst.expressions.StringTrimRight
Usage:
rtrim(str) - Removes the trailing space characters from `str`.
rtrim(trimStr, str) - Removes the trailing string which contains the characters from the trim string from the `str`
Extended Usage:
Arguments:
* str - a string expression
* trimStr - the trim string characters to trim, the default value is a single space
Examples:
> SELECT rtrim(' SparkSQL ');
SparkSQL
> SELECT rtrim('LQSa', 'SSparkSQLS');
SSpark
```
Author: Kevin Yu <qyu@us.ibm.com>
Closes#19329 from kevinyu98/spark-14878-5.
## What changes were proposed in this pull request?
This PR proposes to resolve the type conflicts in strings and timestamps in partition column values.
It looks we need to set the timezone as it needs a cast between strings and timestamps.
```scala
val df = Seq((1, "2015-01-01 00:00:00"), (2, "2014-01-01 00:00:00"), (3, "blah")).toDF("i", "str")
val path = "/tmp/test.parquet"
df.write.format("parquet").partitionBy("str").save(path)
spark.read.parquet(path).show()
```
**Before**
```
java.util.NoSuchElementException: None.get
at scala.None$.get(Option.scala:347)
at scala.None$.get(Option.scala:345)
at org.apache.spark.sql.catalyst.expressions.TimeZoneAwareExpression$class.timeZone(datetimeExpressions.scala:46)
at org.apache.spark.sql.catalyst.expressions.Cast.timeZone$lzycompute(Cast.scala:172)
at org.apache.spark.sql.catalyst.expressions.Cast.timeZone(Cast.scala:172)
at org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$castToString$3$$anonfun$apply$16.apply(Cast.scala:208)
at org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$castToString$3$$anonfun$apply$16.apply(Cast.scala:208)
at org.apache.spark.sql.catalyst.expressions.Cast.org$apache$spark$sql$catalyst$expressions$Cast$$buildCast(Cast.scala:201)
at org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$castToString$3.apply(Cast.scala:207)
at org.apache.spark.sql.catalyst.expressions.Cast.nullSafeEval(Cast.scala:533)
at org.apache.spark.sql.catalyst.expressions.UnaryExpression.eval(Expression.scala:331)
at org.apache.spark.sql.execution.datasources.PartitioningUtils$$anonfun$org$apache$spark$sql$execution$datasources$PartitioningUtils$$resolveTypeConflicts$1.apply(PartitioningUtils.scala:481)
at org.apache.spark.sql.execution.datasources.PartitioningUtils$$anonfun$org$apache$spark$sql$execution$datasources$PartitioningUtils$$resolveTypeConflicts$1.apply(PartitioningUtils.scala:480)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
```
**After**
```
+---+-------------------+
| i| str|
+---+-------------------+
| 2|2014-01-01 00:00:00|
| 1|2015-01-01 00:00:00|
| 3| blah|
+---+-------------------+
```
## How was this patch tested?
Unit tests added in `ParquetPartitionDiscoverySuite` and manual tests.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#19331 from HyukjinKwon/SPARK-22109.
## What changes were proposed in this pull request?
Try to avoid allocating an array bigger than Integer.MAX_VALUE - 8, which is the actual max size on some JVMs, in several places
## How was this patch tested?
Existing tests
Author: Sean Owen <sowen@cloudera.com>
Closes#19266 from srowen/SPARK-22033.
## What changes were proposed in this pull request?
`OffHeapColumnVector.reserveInternal()` will only copy already inserted values during reallocation if `data != null`. In vectors containing arrays or structs this is incorrect, since there field `data` is not used at all. We need to check `nulls` instead.
## How was this patch tested?
Adds new tests to `ColumnVectorSuite` that reproduce the errors.
Author: Ala Luszczak <ala@databricks.com>
Closes#19308 from ala/vector-realloc.
This PR adds vectorized UDFs to the Python API
**Proposed API**
Introduce a flag to turn on vectorization for a defined UDF, for example:
```
pandas_udf(DoubleType())
def plus(a, b)
return a + b
```
or
```
plus = pandas_udf(lambda a, b: a + b, DoubleType())
```
Usage is the same as normal UDFs
0-parameter UDFs
pandas_udf functions can declare an optional `**kwargs` and when evaluated, will contain a key "size" that will give the required length of the output. For example:
```
pandas_udf(LongType())
def f0(**kwargs):
return pd.Series(1).repeat(kwargs["size"])
df.select(f0())
```
Added new unit tests in pyspark.sql that are enabled if pyarrow and Pandas are available.
- [x] Fix support for promoted types with null values
- [ ] Discuss 0-param UDF API (use of kwargs)
- [x] Add tests for chained UDFs
- [ ] Discuss behavior when pyarrow not installed / enabled
- [ ] Cleanup pydoc and add user docs
Author: Bryan Cutler <cutlerb@gmail.com>
Author: Takuya UESHIN <ueshin@databricks.com>
Closes#18659 from BryanCutler/arrow-vectorized-udfs-SPARK-21404.
## What changes were proposed in this pull request?
Right now the calculation of SortMergeJoinExec's outputOrdering relies on the fact that its children have already been sorted on the join keys, while this is often not true until EnsureRequirements has been applied. So we ended up not getting the correct outputOrdering during physical planning stage before Sort nodes are added to the children.
For example, J = {A join B on key1 = key2}
1. if A is NOT ordered on key1 ASC, J's outputOrdering should include "key1 ASC"
2. if A is ordered on key1 ASC, J's outputOrdering should include "key1 ASC"
3. if A is ordered on key1 ASC, with sameOrderExp=c1, J's outputOrdering should include "key1 ASC, sameOrderExp=c1"
So to fix this I changed the behavior of <code>getKeyOrdering(keys, childOutputOrdering)</code> to:
1. If the childOutputOrdering satisfies (is a superset of) the required child ordering => childOutputOrdering
2. Otherwise => required child ordering
In addition, I organized the logic for deciding the relationship between two orderings into SparkPlan, so that it can be reused by EnsureRequirements and SortMergeJoinExec, and potentially other classes.
## How was this patch tested?
Added new test cases.
Passed all integration tests.
Author: maryannxue <maryann.xue@gmail.com>
Closes#19281 from maryannxue/spark-21998.
## What changes were proposed in this pull request?
`processAllAvailable` should also check the query state and if the query is stopped, it should return.
## How was this patch tested?
The new unit test.
Author: Shixiong Zhu <zsxwing@gmail.com>
Closes#19314 from zsxwing/SPARK-22094.
## What changes were proposed in this pull request?
#### Architecture
This PR implements stream-stream inner join using a two-way symmetric hash join. At a high level, we want to do the following.
1. For each stream, we maintain the past rows as state in State Store.
- For each joining key, there can be multiple rows that have been received.
- So, we have to effectively maintain a key-to-list-of-values multimap as state for each stream.
2. In each batch, for each input row in each stream
- Look up the other streams state to see if there are matching rows, and output them if they satisfy the joining condition
- Add the input row to corresponding stream’s state.
- If the data has a timestamp/window column with watermark, then we will use that to calculate the threshold for keys that are required to buffered for future matches and drop the rest from the state.
Cleaning up old unnecessary state rows depends completely on whether watermark has been defined and what are join conditions. We definitely want to support state clean up two types of queries that are likely to be common.
- Queries to time range conditions - E.g. `SELECT * FROM leftTable, rightTable ON leftKey = rightKey AND leftTime > rightTime - INTERVAL 8 MINUTES AND leftTime < rightTime + INTERVAL 1 HOUR`
- Queries with windows as the matching key - E.g. `SELECT * FROM leftTable, rightTable ON leftKey = rightKey AND window(leftTime, "1 hour") = window(rightTime, "1 hour")` (pseudo-SQL)
#### Implementation
The stream-stream join is primarily implemented in three classes
- `StreamingSymmetricHashJoinExec` implements the above symmetric join algorithm.
- `SymmetricsHashJoinStateManagers` manages the streaming state for the join. This essentially is a fault-tolerant key-to-list-of-values multimap built on the StateStore APIs. `StreamingSymmetricHashJoinExec` instantiates two such managers, one for each join side.
- `StreamingSymmetricHashJoinExecHelper` is a helper class to extract threshold for the state based on the join conditions and the event watermark.
Refer to the scaladocs class for more implementation details.
Besides the implementation of stream-stream inner join SparkPlan. Some additional changes are
- Allowed inner join in append mode in UnsupportedOperationChecker
- Prevented stream-stream join on an empty batch dataframe to be collapsed by the optimizer
## How was this patch tested?
- New tests in StreamingJoinSuite
- Updated tests UnsupportedOperationSuite
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes#19271 from tdas/SPARK-22053.
## What changes were proposed in this pull request?
I test on a dataset of about 13M instances, and found that using `treeAggregate` give a speedup in following algs:
|Algs| SpeedUp |
|------|-----------|
|OneHotEncoder| 5% |
|StatFunctions.calculateCov| 7% |
|StatFunctions.multipleApproxQuantiles| 9% |
|RegressionEvaluator| 8% |
## How was this patch tested?
existing tests
Author: Zheng RuiFeng <ruifengz@foxmail.com>
Closes#19232 from zhengruifeng/use_treeAggregate.
## What changes were proposed in this pull request?
There is an incorrect `scalastyle:on` comment in `stringExpressions.scala` and causes the line size limit check ineffective in the file. There are many lines of code and comment which are more than 100 chars.
## How was this patch tested?
Code style change only.
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#19305 from viirya/fix-wrong-style.
## What changes were proposed in this pull request?
Adjust EnsureStatefulOpPartitioningSuite to use scalatest lifecycle normally instead of constructor; fixes:
```
*** RUN ABORTED ***
org.apache.spark.SparkException: Only one SparkContext may be running in this JVM (see SPARK-2243). To ignore this error, set spark.driver.allowMultipleContexts = true. The currently running SparkContext was created at:
org.apache.spark.sql.streaming.EnsureStatefulOpPartitioningSuite.<init>(EnsureStatefulOpPartitioningSuite.scala:35)
```
## How was this patch tested?
Existing tests
Author: Sean Owen <sowen@cloudera.com>
Closes#19306 from srowen/SPARK-21977.2.
## What changes were proposed in this pull request?
In SQL conditional expressions, only CASE WHEN lacks for expression description. This patch fills the gap.
## How was this patch tested?
Only documentation change.
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#19304 from viirya/casewhen-doc.