## What changes were proposed in this pull request?
Rewritten error message for clarity. Added extra information in case of attribute name collision, hinting the user to double-check referencing two different tables
## How was this patch tested?
No functional changes, only final message has changed. It has been tested manually against the situation proposed in the JIRA ticket. Automated tests in repository pass.
This PR is original work from me and I license this work to the Spark project
Author: Ruben Berenguel Montoro <ruben@mostlymaths.net>
Author: Ruben Berenguel Montoro <ruben@dreamattic.com>
Author: Ruben Berenguel <ruben@mostlymaths.net>
Closes#17100 from rberenguel/SPARK-13947-error-message.
## What changes were proposed in this pull request?
We enable table cache `InMemoryTableScanExec` to provide `ColumnarBatch` now. But the cached batches are retrieved without pruning. In this case, we still need to do partition batch pruning.
## How was this patch tested?
Existing tests.
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#19569 from viirya/SPARK-22348.
## What changes were proposed in this pull request?
For performance reason, we should resolve in operation on an empty list as false in the optimizations phase, ad discussed in #19522.
## How was this patch tested?
Added UT
cc gatorsmile
Author: Marco Gaido <marcogaido91@gmail.com>
Author: Marco Gaido <mgaido@hortonworks.com>
Closes#19523 from mgaido91/SPARK-22301.
## What changes were proposed in this pull request?
Adjust Spark download in test to use Apache mirrors and respect its load balancer, and use Spark 2.1.2. This follows on a recent PMC list thread about removing the cloudfront download rather than update it further.
## How was this patch tested?
Existing tests.
Author: Sean Owen <sowen@cloudera.com>
Closes#19564 from srowen/SPARK-21936.2.
## What changes were proposed in this pull request?
During [SPARK-21912](https://issues.apache.org/jira/browse/SPARK-21912), we skipped testing 'ADD COLUMNS' on ORC tables due to ORC limitation. Since [SPARK-21929](https://issues.apache.org/jira/browse/SPARK-21929) is resolved now, we can test both `ORC` and `PARQUET` completely.
## How was this patch tested?
Pass the updated test case.
Author: Dongjoon Hyun <dongjoon@apache.org>
Closes#19562 from dongjoon-hyun/SPARK-21912-2.
## What changes were proposed in this pull request?
The current implementation of `ApproxCountDistinctForIntervals` is `ImperativeAggregate`. The number of `aggBufferAttributes` is the number of total words in the hllppHelper array. Each hllppHelper has 52 words by default relativeSD.
Since this aggregate function is used in equi-height histogram generation, and the number of buckets in histogram is usually hundreds, the number of `aggBufferAttributes` can easily reach tens of thousands or even more.
This leads to a huge method in codegen and causes error:
```
org.codehaus.janino.JaninoRuntimeException: Code of method "apply(Lorg/apache/spark/sql/catalyst/InternalRow;)Lorg/apache/spark/sql/catalyst/expressions/UnsafeRow;" of class "org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection" grows beyond 64 KB.
```
Besides, huge generated methods also result in performance regression.
In this PR, we change its implementation to `TypedImperativeAggregate`. After the fix, `ApproxCountDistinctForIntervals` can deal with more than thousands endpoints without throwing codegen error, and improve performance from `20 sec` to `2 sec` in a test case of 500 endpoints.
## How was this patch tested?
Test by an added test case and existing tests.
Author: Zhenhua Wang <wangzhenhua@huawei.com>
Closes#19506 from wzhfy/change_forIntervals_typedAgg.
TIMESTAMP (-101), BINARY_DOUBLE (101) and BINARY_FLOAT (100) are handled in OracleDialect
## What changes were proposed in this pull request?
When a oracle table contains columns whose type is BINARY_FLOAT or BINARY_DOUBLE, spark sql fails to load a table with SQLException
```
java.sql.SQLException: Unsupported type 101
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$getCatalystType(JdbcUtils.scala:235)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$8.apply(JdbcUtils.scala:292)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$8.apply(JdbcUtils.scala:292)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.getSchema(JdbcUtils.scala:291)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:64)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation.<init>(JDBCRelation.scala:113)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:47)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:306)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:146)
```
## How was this patch tested?
I updated a UT which covers type conversion test for types (-101, 100, 101), on top of that I tested this change against actual table with those columns and it was able to read and write to the table.
Author: Kohki Nishio <taroplus@me.com>
Closes#19548 from taroplus/oracle_sql_types_101.
## What changes were proposed in this pull request?
When [SPARK-19261](https://issues.apache.org/jira/browse/SPARK-19261) implements `ALTER TABLE ADD COLUMNS`, ORC data source is omitted due to SPARK-14387, SPARK-16628, and SPARK-18355. Now, those issues are fixed and Spark 2.3 is [using Spark schema to read ORC table instead of ORC file schema](e6e36004af). This PR enables `ALTER TABLE ADD COLUMNS` for ORC data source.
## How was this patch tested?
Pass the updated and added test cases.
Author: Dongjoon Hyun <dongjoon@apache.org>
Closes#19545 from dongjoon-hyun/SPARK-21929.
## What changes were proposed in this pull request?
Simplifies the test cases that were added in the PR https://github.com/apache/spark/pull/18270.
## How was this patch tested?
N/A
Author: gatorsmile <gatorsmile@gmail.com>
Closes#19546 from gatorsmile/backportSPARK-21055.
## What changes were proposed in this pull request?
This is a follow-up PR of https://github.com/apache/spark/pull/17633.
This PR is to add a conf `spark.sql.hive.advancedPartitionPredicatePushdown.enabled`, which can be used to turn the enhancement off.
## How was this patch tested?
Add a test case
Author: gatorsmile <gatorsmile@gmail.com>
Closes#19547 from gatorsmile/Spark20331FollowUp.
## What changes were proposed in this pull request?
Plan equality should be computed by `canonicalized`, so we can remove unnecessary `hashCode` and `equals` methods.
## How was this patch tested?
Existing tests.
Author: Zhenhua Wang <wangzhenhua@huawei.com>
Closes#19539 from wzhfy/remove_equals.
## What changes were proposed in this pull request?
This is a follow-up of #18732.
This pr modifies `GroupedData.apply()` method to convert pandas udf to grouped udf implicitly.
## How was this patch tested?
Exisiting tests.
Author: Takuya UESHIN <ueshin@databricks.com>
Closes#19517 from ueshin/issues/SPARK-20396/fup2.
## What changes were proposed in this pull request?
spark does not support grouping__id, it has grouping_id() instead.
But it is not convenient for hive user to change to spark-sql
so this pr is to replace grouping__id with grouping_id()
hive user need not to alter their scripts
## How was this patch tested?
test with SQLQuerySuite.scala
Author: CenYuhai <yuhai.cen@ele.me>
Closes#18270 from cenyuhai/SPARK-21055.
## What changes were proposed in this pull request?
This is a very trivial PR, simply marking `strategies` in `SparkPlanner` with the `override` keyword for clarity since it is overriding `strategies` in `QueryPlanner` two levels up in the class hierarchy. I was reading through the code to learn a bit and got stuck on this fact for a little while, so I figured this may be helpful so that another developer new to the project doesn't get stuck where I was.
I did not make a JIRA ticket for this because it is so trivial, but I'm happy to do so to adhere to the contribution guidelines if required.
## How was this patch tested?
(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)
Please review http://spark.apache.org/contributing.html before opening a pull request.
Author: Eric Perry <eric@ericjperry.com>
Closes#19537 from ericjperry/override-strategies.
## What changes were proposed in this pull request?
A working prototype for data source v2 write path.
The writing framework is similar to the reading framework. i.e. `WriteSupport` -> `DataSourceV2Writer` -> `DataWriterFactory` -> `DataWriter`.
Similar to the `FileCommitPotocol`, the writing API has job and task level commit/abort to support the transaction.
## How was this patch tested?
new tests
Author: Wenchen Fan <wenchen@databricks.com>
Closes#19269 from cloud-fan/data-source-v2-write.
## What changes were proposed in this pull request?
Fix java style issues
## How was this patch tested?
Run `./dev/lint-java` locally since it's not run on Jenkins
Author: Andrew Ash <andrew@andrewash.com>
Closes#19486 from ash211/aash/fix-lint-java.
Hive delegation tokens are only needed when the Spark driver has no access
to the kerberos TGT. That happens only in two situations:
- when using a proxy user
- when using cluster mode without a keytab
This change modifies the Hive provider so that it only generates delegation
tokens in those situations, and tweaks the YARN AM so that it makes the proper
user visible to the Hive code when running with keytabs, so that the TGT
can be used instead of a delegation token.
The effect of this change is that now it's possible to initialize multiple,
non-concurrent SparkContext instances in the same JVM. Before, the second
invocation would fail to fetch a new Hive delegation token, which then could
make the second (or third or...) application fail once the token expired.
With this change, the TGT will be used to authenticate to the HMS instead.
This change also avoids polluting the current logged in user's credentials
when launching applications. The credentials are copied only when running
applications as a proxy user. This makes it possible to implement SPARK-11035
later, where multiple threads might be launching applications, and each app
should have its own set of credentials.
Tested by verifying HDFS and Hive access in following scenarios:
- client and cluster mode
- client and cluster mode with proxy user
- client and cluster mode with principal / keytab
- long-running cluster app with principal / keytab
- pyspark app that creates (and stops) multiple SparkContext instances
through its lifetime
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#19509 from vanzin/SPARK-22290.
## What changes were proposed in this pull request?
This PR addresses the comments by gatorsmile on [the previous PR](https://github.com/apache/spark/pull/19494).
## How was this patch tested?
Previous UT and added UT.
Author: Marco Gaido <marcogaido91@gmail.com>
Closes#19522 from mgaido91/SPARK-22249_FOLLOWUP.
## What changes were proposed in this pull request?
To let the same aggregate function that appear multiple times in an Aggregate be evaluated only once, we need to deduplicate the aggregate expressions. The original code was trying to use a "distinct" call to get a set of aggregate expressions, but did not work, since the "distinct" did not compare semantic equality. And even if it did, further work should be done in result expression rewriting.
In this PR, I changed the "set" to a map mapping the semantic identity of a aggregate expression to itself. Thus, later on, when rewriting result expressions (i.e., output expressions), the aggregate expression reference can be fixed.
## How was this patch tested?
Added a new test in SQLQuerySuite
Author: maryannxue <maryann.xue@gmail.com>
Closes#19488 from maryannxue/spark-22266.
## What changes were proposed in this pull request?
Complex state-updating and/or timeout-handling logic in mapGroupsWithState functions may require taking decisions based on the current event-time watermark and/or processing time. Currently, you can use the SQL function `current_timestamp` to get the current processing time, but it needs to be passed inserted in every row with a select, and then passed through the encoder, which isn't efficient. Furthermore, there is no way to get the current watermark.
This PR exposes both of them through the GroupState API.
Additionally, it also cleans up some of the GroupState docs.
## How was this patch tested?
New unit tests
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes#19495 from tdas/SPARK-22278.
## What changes were proposed in this pull request?
In Average.scala, it has
```
override lazy val evaluateExpression = child.dataType match {
case DecimalType.Fixed(p, s) =>
// increase the precision and scale to prevent precision loss
val dt = DecimalType.bounded(p + 14, s + 4)
Cast(Cast(sum, dt) / Cast(count, dt), resultType)
case _ =>
Cast(sum, resultType) / Cast(count, resultType)
}
def setChild (newchild: Expression) = {
child = newchild
}
```
It is possible that Cast(count, dt), resultType) will make the precision of the decimal number bigger than 38, and this causes over flow. Since count is an integer and doesn't need a scale, I will cast it using DecimalType.bounded(38,0)
## How was this patch tested?
In DataFrameSuite, I will add a test case.
Please review http://spark.apache.org/contributing.html before opening a pull request.
Author: Huaxin Gao <huaxing@us.ibm.com>
Closes#19496 from huaxingao/spark-22271.
## What changes were proposed in this pull request?
Evaluate one-sided conditions early in stream-stream joins.
This is in addition to normal filter pushdown, because integrating it with the join logic allows it to take place in outer join scenarios. This means that rows which can never satisfy the join condition won't clog up the state.
## How was this patch tested?
new unit tests
Author: Jose Torres <jose@databricks.com>
Closes#19452 from joseph-torres/SPARK-22136.
## What changes were proposed in this pull request?
#### before
```scala
scala> val words = spark.read.textFile("README.md").flatMap(_.split(" "))
words: org.apache.spark.sql.Dataset[String] = [value: string]
scala> val grouped = words.groupByKey(identity)
grouped: org.apache.spark.sql.KeyValueGroupedDataset[String,String] = org.apache.spark.sql.KeyValueGroupedDataset65214862
```
#### after
```scala
scala> val words = spark.read.textFile("README.md").flatMap(_.split(" "))
words: org.apache.spark.sql.Dataset[String] = [value: string]
scala> val grouped = words.groupByKey(identity)
grouped: org.apache.spark.sql.KeyValueGroupedDataset[String,String] = [key: [value: string], value: [value: string]]
```
## How was this patch tested?
existing ut
cc gatorsmile cloud-fan
Author: Kent Yao <yaooqinn@hotmail.com>
Closes#19363 from yaooqinn/minor-dataset-tostring.
## What changes were proposed in this pull request?
As pointed out in the JIRA, there is a bug which causes an exception to be thrown if `isin` is called with an empty list on a cached DataFrame. The PR fixes it.
## How was this patch tested?
Added UT.
Author: Marco Gaido <marcogaido91@gmail.com>
Closes#19494 from mgaido91/SPARK-22249.
## What changes were proposed in this pull request?
This PR aims to improve **StatisticsSuite** to test `convertMetastore` configuration properly. Currently, some test logic in `test statistics of LogicalRelation converted from Hive serde tables` depends on the default configuration. New test case is shorter and covers both(true/false) cases explicitly.
This test case was previously modified by SPARK-17410 and SPARK-17284 in Spark 2.3.0.
- a2460be9c3 (diff-1c464c86b68c2d0b07e73b7354e74ce7R443)
## How was this patch tested?
Pass the Jenkins with the improved test case.
Author: Dongjoon Hyun <dongjoon@apache.org>
Closes#19500 from dongjoon-hyun/SPARK-22280.
## What changes were proposed in this pull request?
This PR aims to
- Rename `OrcRelation` to `OrcFileFormat` object.
- Replace `OrcRelation.ORC_COMPRESSION` with `org.apache.orc.OrcConf.COMPRESS`. Since [SPARK-21422](https://issues.apache.org/jira/browse/SPARK-21422), we can use `OrcConf.COMPRESS` instead of Hive's.
```scala
// The references of Hive's classes will be minimized.
val ORC_COMPRESSION = "orc.compress"
```
## How was this patch tested?
Pass the Jenkins with the existing and updated test cases.
Author: Dongjoon Hyun <dongjoon@apache.org>
Closes#19502 from dongjoon-hyun/SPARK-22282.
## What changes were proposed in this pull request?
`ObjectHashAggregateExec` should override `outputPartitioning` in order to avoid unnecessary shuffle.
## How was this patch tested?
Added Jenkins test.
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#19501 from viirya/SPARK-22223.
## What changes were proposed in this pull request?
In EnsureStatefulOpPartitioning, we check that the inputRDD to a SparkPlan has the expected partitioning for Streaming Stateful Operators. The problem is that we are not allowed to access this information during planning.
The reason we added that check was because CoalesceExec could actually create RDDs with 0 partitions. We should fix it such that when CoalesceExec says that there is a SinglePartition, there is in fact an inputRDD of 1 partition instead of 0 partitions.
## How was this patch tested?
Regression test in StreamingQuerySuite
Author: Burak Yavuz <brkyvz@gmail.com>
Closes#19467 from brkyvz/stateful-op.
## What changes were proposed in this pull request?
When fixing schema field names using escape characters with `addReferenceMinorObj()` at [SPARK-18952](https://issues.apache.org/jira/browse/SPARK-18952) (#16361), double-quotes around the names were remained and the names become something like `"((java.lang.String) references[1])"`.
```java
/* 055 */ private int maxSteps = 2;
/* 056 */ private int numRows = 0;
/* 057 */ private org.apache.spark.sql.types.StructType keySchema = new org.apache.spark.sql.types.StructType().add("((java.lang.String) references[1])", org.apache.spark.sql.types.DataTypes.StringType);
/* 058 */ private org.apache.spark.sql.types.StructType valueSchema = new org.apache.spark.sql.types.StructType().add("((java.lang.String) references[2])", org.apache.spark.sql.types.DataTypes.LongType);
/* 059 */ private Object emptyVBase;
```
We should remove the double-quotes to refer the values in `references` properly:
```java
/* 055 */ private int maxSteps = 2;
/* 056 */ private int numRows = 0;
/* 057 */ private org.apache.spark.sql.types.StructType keySchema = new org.apache.spark.sql.types.StructType().add(((java.lang.String) references[1]), org.apache.spark.sql.types.DataTypes.StringType);
/* 058 */ private org.apache.spark.sql.types.StructType valueSchema = new org.apache.spark.sql.types.StructType().add(((java.lang.String) references[2]), org.apache.spark.sql.types.DataTypes.LongType);
/* 059 */ private Object emptyVBase;
```
## How was this patch tested?
Existing tests.
Author: Takuya UESHIN <ueshin@databricks.com>
Closes#19491 from ueshin/issues/SPARK-22273.
## What changes were proposed in this pull request?
`BasicWriteTaskStatsTracker.getFileSize()` to catch `FileNotFoundException`, log info and then return 0 as a file size.
This ensures that if a newly created file isn't visible due to the store not always having create consistency, the metric collection doesn't cause the failure.
## How was this patch tested?
New test suite included, `BasicWriteTaskStatsTrackerSuite`. This not only checks the resilience to missing files, but verifies the existing logic as to how file statistics are gathered.
Note that in the current implementation
1. if you call `Tracker..getFinalStats()` more than once, the file size count will increase by size of the last file. This could be fixed by clearing the filename field inside `getFinalStats()` itself.
2. If you pass in an empty or null string to `Tracker.newFile(path)` then IllegalArgumentException is raised, but only in `getFinalStats()`, rather than in `newFile`. There's a test for this behaviour in the new suite, as it verifies that only FNFEs get swallowed.
Author: Steve Loughran <stevel@hortonworks.com>
Closes#18979 from steveloughran/cloud/SPARK-21762-missing-files-in-metrics.
## What changes were proposed in this pull request?
This PR changes `keyWithIndexToNumValues` to `keyWithIndexToValue`.
There will be directories on HDFS named with this `keyWithIndexToNumValues`. So if we ever want to fix this, let's fix it now.
## How was this patch tested?
existing unit test cases.
Author: Liwei Lin <lwlin7@gmail.com>
Closes#19435 from lw-lin/keyWithIndex.
## What changes were proposed in this pull request?
This is a minor folllowup of #19474 .
#19474 partially reverted #18064 but accidentally introduced a behavior change. `Command` extended `LogicalPlan` before #18064 , but #19474 made it extend `LeafNode`. This is an internal behavior change as now all `Command` subclasses can't define children, and they have to implement `computeStatistic` method.
This PR fixes this by making `Command` extend `LogicalPlan`
## How was this patch tested?
N/A
Author: Wenchen Fan <wenchen@databricks.com>
Closes#19493 from cloud-fan/minor.
## What changes were proposed in this pull request?
This is an effort to reduce the difference between Hive and Spark. Spark supports case-sensitivity in columns. Especially, for Struct types, with `spark.sql.caseSensitive=true`, the following is supported.
```scala
scala> sql("select named_struct('a', 1, 'A', 2).a").show
+--------------------------+
|named_struct(a, 1, A, 2).a|
+--------------------------+
| 1|
+--------------------------+
scala> sql("select named_struct('a', 1, 'A', 2).A").show
+--------------------------+
|named_struct(a, 1, A, 2).A|
+--------------------------+
| 2|
+--------------------------+
```
And vice versa, with `spark.sql.caseSensitive=false`, the following is supported.
```scala
scala> sql("select named_struct('a', 1).A, named_struct('A', 1).a").show
+--------------------+--------------------+
|named_struct(a, 1).A|named_struct(A, 1).a|
+--------------------+--------------------+
| 1| 1|
+--------------------+--------------------+
```
However, types are considered different. For example, SET operations fail.
```scala
scala> sql("SELECT named_struct('a',1) union all (select named_struct('A',2))").show
org.apache.spark.sql.AnalysisException: Union can only be performed on tables with the compatible column types. struct<A:int> <> struct<a:int> at the first column of the second table;;
'Union
:- Project [named_struct(a, 1) AS named_struct(a, 1)#57]
: +- OneRowRelation$
+- Project [named_struct(A, 2) AS named_struct(A, 2)#58]
+- OneRowRelation$
```
This PR aims to support case-insensitive type equality. For example, in Set operation, the above operation succeed when `spark.sql.caseSensitive=false`.
```scala
scala> sql("SELECT named_struct('a',1) union all (select named_struct('A',2))").show
+------------------+
|named_struct(a, 1)|
+------------------+
| [1]|
| [2]|
+------------------+
```
## How was this patch tested?
Pass the Jenkins with a newly add test case.
Author: Dongjoon Hyun <dongjoon@apache.org>
Closes#18460 from dongjoon-hyun/SPARK-21247.
## What changes were proposed in this pull request?
Before Hive 2.0, ORC File schema has invalid column names like `_col1` and `_col2`. This is a well-known limitation and there are several Apache Spark issues with `spark.sql.hive.convertMetastoreOrc=true`. This PR ignores ORC File schema and use Spark schema.
## How was this patch tested?
Pass the newly added test case.
Author: Dongjoon Hyun <dongjoon@apache.org>
Closes#19470 from dongjoon-hyun/SPARK-18355.
## What changes were proposed in this pull request?
For non-deterministic expressions, they should be considered as not contained in the [[ExpressionSet]].
This is consistent with how we define `semanticEquals` between two expressions.
Otherwise, combining expressions will remove non-deterministic expressions which should be reserved.
E.g.
Combine filters of
```scala
testRelation.where(Rand(0) > 0.1).where(Rand(0) > 0.1)
```
should result in
```scala
testRelation.where(Rand(0) > 0.1 && Rand(0) > 0.1)
```
## How was this patch tested?
Unit test
Author: Wang Gengliang <ltnwgl@gmail.com>
Closes#19475 from gengliangwang/non-deterministic-expressionSet.
## What changes were proposed in this pull request?
Due to optimizer removing some unnecessary aliases, the logical and physical plan may have different output attribute ids. FileFormatWriter should handle this when creating the physical sort node.
## How was this patch tested?
new regression test.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#19483 from cloud-fan/bug2.
## What changes were proposed in this pull request?
The method `deterministic` is frequently called in optimizer.
Refactor `deterministic` as lazy value, in order to avoid redundant computations.
## How was this patch tested?
Simple benchmark test over TPC-DS queries, run time from query string to optimized plan(continuous 20 runs, and get the average of last 5 results):
Before changes: 12601 ms
After changes: 11993ms
This is 4.8% performance improvement.
Also run test with Unit test.
Author: Wang Gengliang <ltnwgl@gmail.com>
Closes#19478 from gengliangwang/deterministicAsLazyVal.
## What changes were proposed in this pull request?
`ParquetFileFormat` to relax its requirement of output committer class from `org.apache.parquet.hadoop.ParquetOutputCommitter` or subclass thereof (and so implicitly Hadoop `FileOutputCommitter`) to any committer implementing `org.apache.hadoop.mapreduce.OutputCommitter`
This enables output committers which don't write to the filesystem the way `FileOutputCommitter` does to save parquet data from a dataframe: at present you cannot do this.
Before a committer which isn't a subclass of `ParquetOutputCommitter`, it checks to see if the context has requested summary metadata by setting `parquet.enable.summary-metadata`. If true, and the committer class isn't a parquet committer, it raises a RuntimeException with an error message.
(It could downgrade, of course, but raising an exception makes it clear there won't be an summary. It also makes the behaviour testable.)
Note that `SQLConf` already states that any `OutputCommitter` can be used, but that typically it's a subclass of ParquetOutputCommitter. That's not currently true. This patch will make the code consistent with the docs, adding tests to verify,
## How was this patch tested?
The patch includes a test suite, `ParquetCommitterSuite`, with a new committer, `MarkingFileOutputCommitter` which extends `FileOutputCommitter` and writes a marker file in the destination directory. The presence of the marker file can be used to verify the new committer was used. The tests then try the combinations of Parquet committer summary/no-summary and marking committer summary/no-summary.
| committer | summary | outcome |
|-----------|---------|---------|
| parquet | true | success |
| parquet | false | success |
| marking | false | success with marker |
| marking | true | exception |
All tests are happy.
Author: Steve Loughran <stevel@hortonworks.com>
Closes#19448 from steveloughran/cloud/SPARK-22217-committer.
## What changes were proposed in this pull request?
Adding the code for setting 'aggregate time' metric to non-codegen path in HashAggregateExec and to ObjectHashAggregateExces.
## How was this patch tested?
Tested manually.
Author: Ala Luszczak <ala@databricks.com>
Closes#19473 from ala/fix-agg-time.
## What changes were proposed in this pull request?
As we discussed in https://github.com/apache/spark/pull/19136#discussion_r137023744 , we should push down operators to data source before planning, so that data source can report statistics more accurate.
This PR also includes some cleanup for the read path.
## How was this patch tested?
existing tests.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#19424 from cloud-fan/follow.
## What changes were proposed in this pull request?
In https://github.com/apache/spark/pull/18064, we allowed `RunnableCommand` to have children in order to fix some UI issues. Then we made `InsertIntoXXX` commands take the input `query` as a child, when we do the actual writing, we just pass the physical plan to the writer(`FileFormatWriter.write`).
However this is problematic. In Spark SQL, optimizer and planner are allowed to change the schema names a little bit. e.g. `ColumnPruning` rule will remove no-op `Project`s, like `Project("A", Scan("a"))`, and thus change the output schema from "<A: int>" to `<a: int>`. When it comes to writing, especially for self-description data format like parquet, we may write the wrong schema to the file and cause null values at the read path.
Fortunately, in https://github.com/apache/spark/pull/18450 , we decided to allow nested execution and one query can map to multiple executions in the UI. This releases the major restriction in #18604 , and now we don't have to take the input `query` as child of `InsertIntoXXX` commands.
So the fix is simple, this PR partially revert #18064 and make `InsertIntoXXX` commands leaf nodes again.
## How was this patch tested?
new regression test
Author: Wenchen Fan <wenchen@databricks.com>
Closes#19474 from cloud-fan/bug.
## What changes were proposed in this pull request?
Implement StreamingRelation.computeStats to fix explain
## How was this patch tested?
- unit tests: `StreamingRelation.computeStats` and `StreamingExecutionRelation.computeStats`.
- regression tests: `explain join with a normal source` and `explain join with MemoryStream`.
Author: Shixiong Zhu <zsxwing@gmail.com>
Closes#19465 from zsxwing/SPARK-21988.
## What changes were proposed in this pull request?
Currently percentile_approx never returns the first element when percentile is in (relativeError, 1/N], where relativeError default 1/10000, and N is the total number of elements. But ideally, percentiles in [0, 1/N] should all return the first element as the answer.
For example, given input data 1 to 10, if a user queries 10% (or even less) percentile, it should return 1, because the first value 1 already reaches 10%. Currently it returns 2.
Based on the paper, targetError is not rounded up, and searching index should start from 0 instead of 1. By following the paper, we should be able to fix the cases mentioned above.
## How was this patch tested?
Added a new test case and fix existing test cases.
Author: Zhenhua Wang <wzh_zju@163.com>
Closes#19438 from wzhfy/improve_percentile_approx.
## What changes were proposed in this pull request?
Current `CodeGeneraor.splitExpressions` splits statements into methods if the total length of statements is more than 1024 characters. The length may include comments or empty line.
This PR excludes comment or empty line from the length to reduce the number of generated methods in a class, by using `CodeFormatter.stripExtraNewLinesAndComments()` method.
## How was this patch tested?
Existing tests
Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>
Closes#18966 from kiszk/SPARK-21751.
This change adds a new SQL config key that is equivalent to SparkContext's
"spark.extraListeners", allowing users to register QueryExecutionListener
instances through the Spark configuration system instead of having to
explicitly do it in code.
The code used by SparkContext to implement the feature was refactored into
a helper method in the Utils class, and SQL's ExecutionListenerManager was
modified to use it to initialize listener declared in the configuration.
Unit tests were added to verify all the new functionality.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#19309 from vanzin/SPARK-19558.
## What changes were proposed in this pull request?
This PR adds an apply() function on df.groupby(). apply() takes a pandas udf that is a transformation on `pandas.DataFrame` -> `pandas.DataFrame`.
Static schema
-------------------
```
schema = df.schema
pandas_udf(schema)
def normalize(df):
df = df.assign(v1 = (df.v1 - df.v1.mean()) / df.v1.std()
return df
df.groupBy('id').apply(normalize)
```
Dynamic schema
-----------------------
**This use case is removed from the PR and we will discuss this as a follow up. See discussion https://github.com/apache/spark/pull/18732#pullrequestreview-66583248**
Another example to use pd.DataFrame dtypes as output schema of the udf:
```
sample_df = df.filter(df.id == 1).toPandas()
def foo(df):
ret = # Some transformation on the input pd.DataFrame
return ret
foo_udf = pandas_udf(foo, foo(sample_df).dtypes)
df.groupBy('id').apply(foo_udf)
```
In interactive use case, user usually have a sample pd.DataFrame to test function `foo` in their notebook. Having been able to use `foo(sample_df).dtypes` frees user from specifying the output schema of `foo`.
Design doc: https://github.com/icexelloss/spark/blob/pandas-udf-doc/docs/pyspark-pandas-udf.md
## How was this patch tested?
* Added GroupbyApplyTest
Author: Li Jin <ice.xelloss@gmail.com>
Author: Takuya UESHIN <ueshin@databricks.com>
Author: Bryan Cutler <cutlerb@gmail.com>
Closes#18732 from icexelloss/groupby-apply-SPARK-20396.
## What changes were proposed in this pull request?
This is a follow-up of #19384.
In the previous pr, only definitions of the config names were modified, but we also need to modify the names in runtime or tests specified as string literal.
## How was this patch tested?
Existing tests but modified the config names.
Author: Takuya UESHIN <ueshin@databricks.com>
Closes#19462 from ueshin/issues/SPARK-22159/fup1.
## What changes were proposed in this pull request?
We should not break the assumption that the length of the allocated byte array is word rounded:
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java#L170
So we want to use `Integer.MAX_VALUE - 15` instead of `Integer.MAX_VALUE - 8` as the upper bound of an allocated byte array.
cc: srowen gatorsmile
## How was this patch tested?
Since the Spark unit test JVM has less than 1GB heap, here we run the test code as a submit job, so it can run on a JVM has 4GB memory.
Please review http://spark.apache.org/contributing.html before opening a pull request.
Author: Feng Liu <fengliu@databricks.com>
Closes#19460 from liufengdb/fix_array_max.
## What changes were proposed in this pull request?
In state store restore, for each row, put the saved state before the row in the iterator instead of after.
This fixes an issue where agg(last('attr)) will forever return the last value of 'attr from the first microbatch.
## How was this patch tested?
new unit test
Author: Jose Torres <jose@databricks.com>
Closes#19461 from joseph-torres/SPARK-22230.
## What changes were proposed in this pull request?
This updates the broadcast join code path to lazily decompress pages and
iterate through UnsafeRows to prevent all rows from being held in memory
while the broadcast table is being built.
## How was this patch tested?
Existing tests.
Author: Ryan Blue <blue@apache.org>
Closes#19394 from rdblue/broadcast-driver-memory.
## What changes were proposed in this pull request?
`monotonically_increasing_id` doesn't work in Structured Streaming. We should throw an exception if a streaming query uses it.
## How was this patch tested?
Added test.
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#19336 from viirya/SPARK-21947.
## What changes were proposed in this pull request?
In this PR we make a few changes to the list hive partitions code, to make the code more extensible.
The following changes are made:
1. In `HiveClientImpl.getPartitions()`, call `client.getPartitions` instead of `shim.getAllPartitions` when `spec` is empty;
2. In `HiveTableScanExec`, previously we always call `listPartitionsByFilter` if the config `metastorePartitionPruning` is enabled, but actually, we'd better call `listPartitions` if `partitionPruningPred` is empty;
3. We should use sessionCatalog instead of SharedState.externalCatalog in `HiveTableScanExec`.
## How was this patch tested?
Tested by existing test cases since this is code refactor, no regression or behavior change is expected.
Author: Xingbo Jiang <xingbo.jiang@databricks.com>
Closes#19444 from jiangxb1987/hivePartitions.
## What changes were proposed in this pull request?
When exceeding `spark.sql.codegen.hugeMethodLimit`, the runtime fallbacks to the Volcano iterator solution. This could cause an infinite loop when `FileSourceScanExec` can use the columnar batch to read the data. This PR is to fix the issue.
## How was this patch tested?
Added a test
Author: gatorsmile <gatorsmile@gmail.com>
Closes#19440 from gatorsmile/testt.
## What changes were proposed in this pull request?
Looks like `FlatMapGroupsInRExec.requiredChildDistribution` didn't consider empty grouping attributes. It should be a problem when running `EnsureRequirements` and `gapply` in R can't work on empty grouping columns.
## How was this patch tested?
Added test.
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#19436 from viirya/fix-flatmapinr-distribution.
## What changes were proposed in this pull request?
Currently, the group state of user-defined-type is encoded as top-level columns in the UnsafeRows stores in the state store. The timeout timestamp is also saved as (when needed) as the last top-level column. Since the group state is serialized to top-level columns, you cannot save "null" as a value of state (setting null in all the top-level columns is not equivalent). So we don't let the user set the timeout without initializing the state for a key. Based on user experience, this leads to confusion.
This PR is to change the row format such that the state is saved as nested columns. This would allow the state to be set to null, and avoid these confusing corner cases.
## How was this patch tested?
Refactored tests.
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes#19416 from tdas/SPARK-22187.
## What changes were proposed in this pull request?
By definition the table name in Spark can be something like `123x`, `25a`, etc., with exceptions for literals like `12L`, `23BD`, etc. However, Spark SQL has a special byte length literal, which stops users to use digits followed by `b`, `k`, `m`, `g` as identifiers.
byte length literal is not a standard sql literal and is only used in the `tableSample` parser rule. This PR move the parsing of byte length literal from lexer to parser, so that users can use it as identifiers.
## How was this patch tested?
regression test
Author: Wenchen Fan <wenchen@databricks.com>
Closes#19392 from cloud-fan/parser-bug.
## What changes were proposed in this pull request?
This pr added code to check actual bytecode size when compiling generated code. In #18810, we added code to give up code compilation and use interpreter execution in `SparkPlan` if the line number of generated functions goes over `maxLinesPerFunction`. But, we already have code to collect metrics for compiled bytecode size in `CodeGenerator` object. So,we could easily reuse the code for this purpose.
## How was this patch tested?
Added tests in `WholeStageCodegenSuite`.
Author: Takeshi Yamamuro <yamamuro@apache.org>
Closes#19083 from maropu/SPARK-21871.
## What changes were proposed in this pull request?
This PR abstracts data compressed by `CompressibleColumnAccessor` using `ColumnVector` in batch method. When `ColumnAccessor.decompress` is called, `ColumnVector` will have uncompressed data. This batch decompress does not use `InternalRow` to reduce the number of memory accesses.
As first step of this implementation, this JIRA supports primitive data types. Another PR will support array and other data types.
This implementation decompress data in batch into uncompressed column batch, as rxin suggested at [here](https://github.com/apache/spark/pull/18468#issuecomment-316914076). Another implementation uses adapter approach [as cloud-fan suggested](https://github.com/apache/spark/pull/18468).
## How was this patch tested?
Added test suites
Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>
Closes#18704 from kiszk/SPARK-20783a.
## What changes were proposed in this pull request?
[SPARK-22193][SQL] Minor typo fix in SortMergeJoinExec. Nothing major, but it bothered me going into.Hence fixing
## How was this patch tested?
existing tests
Author: Rekha Joshi <rekhajoshm@gmail.com>
Author: rjoshi2 <rekhajoshm@gmail.com>
Closes#19422 from rekhajoshm/SPARK-22193.
## What changes were proposed in this pull request?
Allow one-sided outer joins between two streams when a watermark is defined.
## How was this patch tested?
new unit tests
Author: Jose Torres <jose@databricks.com>
Closes#19327 from joseph-torres/outerjoin.
## What changes were proposed in this pull request?
Users could hit `java.lang.NullPointerException` when the tables were created by Hive and the table's owner is `null` that are got from Hive metastore. `DESC EXTENDED` failed with the error:
> SQLExecutionException: java.lang.NullPointerException at scala.collection.immutable.StringOps$.length$extension(StringOps.scala:47) at scala.collection.immutable.StringOps.length(StringOps.scala:47) at scala.collection.IndexedSeqOptimized$class.isEmpty(IndexedSeqOptimized.scala:27) at scala.collection.immutable.StringOps.isEmpty(StringOps.scala:29) at scala.collection.TraversableOnce$class.nonEmpty(TraversableOnce.scala:111) at scala.collection.immutable.StringOps.nonEmpty(StringOps.scala:29) at org.apache.spark.sql.catalyst.catalog.CatalogTable.toLinkedHashMap(interface.scala:300) at org.apache.spark.sql.execution.command.DescribeTableCommand.describeFormattedTableInfo(tables.scala:565) at org.apache.spark.sql.execution.command.DescribeTableCommand.run(tables.scala:543) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:66) at
## How was this patch tested?
Added a unit test case
Author: gatorsmile <gatorsmile@gmail.com>
Closes#19395 from gatorsmile/desc.
## What changes were proposed in this pull request?
The underlying tables of persistent views are not refreshed when users issue the REFRESH TABLE command against the persistent views.
## How was this patch tested?
Added a test case
Author: gatorsmile <gatorsmile@gmail.com>
Closes#19405 from gatorsmile/refreshView.
## What changes were proposed in this pull request?
The definition of `maxRows` in `LocalLimit` operator was simply wrong. This patch introduces a new `maxRowsPerPartition` method and uses that in pruning. The patch also adds more documentation on why we need local limit vs global limit.
Note that this previously has never been a bug because the way the code is structured, but future use of the maxRows could lead to bugs.
## How was this patch tested?
Should be covered by existing test cases.
Closes#18851
Author: gatorsmile <gatorsmile@gmail.com>
Author: Reynold Xin <rxin@databricks.com>
Closes#19393 from gatorsmile/pr-18851.
## What changes were proposed in this pull request?
This pr fixed an overflow issue below in `Dataset.show`:
```
scala> Seq((1, 2), (3, 4)).toDF("a", "b").show(Int.MaxValue)
org.apache.spark.sql.AnalysisException: The limit expression must be equal to or greater than 0, but got -2147483648;;
GlobalLimit -2147483648
+- LocalLimit -2147483648
+- Project [_1#27218 AS a#27221, _2#27219 AS b#27222]
+- LocalRelation [_1#27218, _2#27219]
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:41)
at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:89)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.org$apache$spark$sql$catalyst$analysis$CheckAnalysis$$checkLimitClause(CheckAnalysis.scala:70)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:234)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:80)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
```
## How was this patch tested?
Added tests in `DataFrameSuite`.
Author: Takeshi Yamamuro <yamamuro@apache.org>
Closes#19401 from maropu/MaxValueInShowString.
## What changes were proposed in this pull request?
SPARK-21690 makes one-pass `Imputer` by parallelizing the computation of all input columns. When we transform dataset with `ImputerModel`, we do `withColumn` on all input columns sequentially. We can also do this on all input columns at once by adding a `withColumns` API to `Dataset`.
The new `withColumns` API is for internal use only now.
## How was this patch tested?
Existing tests for `ImputerModel`'s change. Added tests for `withColumns` API.
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#19229 from viirya/SPARK-22001.
## What changes were proposed in this pull request?
Since the current code ignores WITH clauses to check input relations in TPCDS queries, this leads to inaccurate per-row processing time for benchmark results. For example, in `q2`, this fix could catch all the input relations: `web_sales`, `date_dim`, and `catalog_sales` (the current code catches `date_dim` only). The one-third of the TPCDS queries uses WITH clauses, so I think it is worth fixing this.
## How was this patch tested?
Manually checked.
Author: Takeshi Yamamuro <yamamuro@apache.org>
Closes#19344 from maropu/RespectWithInTPCDSBench.
### What changes were proposed in this pull request?
`tempTables` is not right. To be consistent, we need to rename the internal variable names/comments to tempViews in SessionCatalog too.
### How was this patch tested?
N/A
Author: gatorsmile <gatorsmile@gmail.com>
Closes#19117 from gatorsmile/renameTempTablesToTempViews.
## What changes were proposed in this pull request?
Added IMPALA-modified TPCDS queries to TPC-DS query suites.
- Ref: https://github.com/cloudera/impala-tpcds-kit/tree/master/queries
## How was this patch tested?
N/A
Author: gatorsmile <gatorsmile@gmail.com>
Closes#19386 from gatorsmile/addImpalaQueries.
## What changes were proposed in this pull request?
Add comments for specifying the position of batch "Check Cartesian Products", as rxin suggested in https://github.com/apache/spark/pull/19362 .
## How was this patch tested?
Unit test
Author: Wang Gengliang <ltnwgl@gmail.com>
Closes#19379 from gengliangwang/SPARK-22141-followup.
## What changes were proposed in this pull request?
Reading ORC files containing special characters like '%' fails with a FileNotFoundException.
This PR aims to fix the problem.
## How was this patch tested?
Added UT.
Author: Marco Gaido <marcogaido91@gmail.com>
Author: Marco Gaido <mgaido@hortonworks.com>
Closes#19368 from mgaido91/SPARK-22146.
## What changes were proposed in this pull request?
Spark's RangePartitioner hard codes the number of sampling points per partition to be 20. This is sometimes too low. This ticket makes it configurable, via spark.sql.execution.rangeExchange.sampleSizePerPartition, and raises the default in Spark SQL to be 100.
## How was this patch tested?
Added a pretty sophisticated test based on chi square test ...
Author: Reynold Xin <rxin@databricks.com>
Closes#19387 from rxin/SPARK-22160.
## What changes were proposed in this pull request?
spark.sql.execution.arrow.enable and spark.sql.codegen.aggregate.map.twolevel.enable -> enabled
## How was this patch tested?
N/A
Author: Reynold Xin <rxin@databricks.com>
Closes#19384 from rxin/SPARK-22159.
## What changes were proposed in this pull request?
For some reason when we added the Exec suffix to all physical operators, we missed this one. I was looking for this physical operator today and couldn't find it, because I was looking for ExchangeExec.
## How was this patch tested?
This is a simple rename and should be covered by existing tests.
Author: Reynold Xin <rxin@databricks.com>
Closes#19376 from rxin/SPARK-22153.
## What changes were proposed in this pull request?
Now, we are not running TPC-DS queries as regular test cases. Thus, we need to add a test suite using empty tables for ensuring the new code changes will not break them. For example, optimizer/analyzer batches should not exceed the max iteration.
## How was this patch tested?
N/A
Author: gatorsmile <gatorsmile@gmail.com>
Closes#19361 from gatorsmile/tpcdsQuerySuite.
## What changes were proposed in this pull request?
`WriteableColumnVector` does not close its child column vectors. This can create memory leaks for `OffHeapColumnVector` where we do not clean up the memory allocated by a vectors children. This can be especially bad for string columns (which uses a child byte column vector).
## How was this patch tested?
I have updated the existing tests to always use both on-heap and off-heap vectors. Testing and diagnoses was done locally.
Author: Herman van Hovell <hvanhovell@databricks.com>
Closes#19367 from hvanhovell/SPARK-22143.
## What changes were proposed in this pull request?
Currently we use Arrow File format to communicate with Python worker when invoking vectorized UDF but we can use Arrow Stream format.
This pr replaces the Arrow File format with the Arrow Stream format.
## How was this patch tested?
Existing tests.
Author: Takuya UESHIN <ueshin@databricks.com>
Closes#19349 from ueshin/issues/SPARK-22125.
## What changes were proposed in this pull request?
When inferring constraints from children, Join's condition can be simplified as None.
For example,
```
val testRelation = LocalRelation('a.int)
val x = testRelation.as("x")
val y = testRelation.where($"a" === 2 && !($"a" === 2)).as("y")
x.join.where($"x.a" === $"y.a")
```
The plan will become
```
Join Inner
:- LocalRelation <empty>, [a#23]
+- LocalRelation <empty>, [a#224]
```
And the Cartesian products check will throw exception for above plan.
Propagate empty relation before checking Cartesian products, and the issue is resolved.
## How was this patch tested?
Unit test
Author: Wang Gengliang <ltnwgl@gmail.com>
Closes#19362 from gengliangwang/MoveCheckCartesianProducts.
## What changes were proposed in this pull request?
Address PR comments that appeared post-merge, to rename `addExtraCode` to `addInnerClass`,
and not count the size of the inner class to the size of the outer class.
## How was this patch tested?
YOLO.
Author: Juliusz Sompolski <julek@databricks.com>
Closes#19353 from juliuszsompolski/SPARK-22103followup.
## What changes were proposed in this pull request?
We can override `usedInputs` to claim that an operator defers input evaluation. `Sample` and `Limit` are two operators which should claim it but don't. We should do it.
## How was this patch tested?
Existing tests.
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#19345 from viirya/SPARK-22124.
## What changes were proposed in this pull request?
This change disables the use of 0-parameter pandas_udfs due to the API being overly complex and awkward, and can easily be worked around by using an index column as an input argument. Also added doctests for pandas_udfs which revealed bugs for handling empty partitions and using the pandas_udf decorator.
## How was this patch tested?
Reworked existing 0-parameter test to verify error is raised, added doctest for pandas_udf, added new tests for empty partition and decorator usage.
Author: Bryan Cutler <cutlerb@gmail.com>
Closes#19325 from BryanCutler/arrow-pandas_udf-0-param-remove-SPARK-22106.
## What changes were proposed in this pull request?
During TestHiveSparkSession.reset(), which is called after each TestHiveSingleton suite, we now delete and recreate the Hive warehouse directory.
## How was this patch tested?
Ran full suite of tests locally, verified that they pass.
Author: Greg Owen <greg@databricks.com>
Closes#19341 from GregOwen/SPARK-22120.
## What changes were proposed in this pull request?
HashAggregateExec codegen uses two paths for fast hash table and a generic one.
It generates code paths for iterating over both, and both code paths generate the consume code of the parent operator, resulting in that code being expanded twice.
This leads to a long generated function that might be an issue for the compiler (see e.g. SPARK-21603).
I propose to remove the double expansion by generating the consume code in a helper function that can just be called from both iterating loops.
An issue with separating the `consume` code to a helper function was that a number of places relied and assumed on being in the scope of an outside `produce` loop and e.g. use `continue` to jump out.
I replaced such code flows with nested scopes. It is code that should be handled the same by compiler, while getting rid of depending on assumptions that are outside of the `consume`'s own scope.
## How was this patch tested?
Existing test coverage.
Author: Juliusz Sompolski <julek@databricks.com>
Closes#19324 from juliuszsompolski/aggrconsumecodegen.
## What changes were proposed in this pull request?
The `percentile_approx` function previously accepted numeric type input and output double type results.
But since all numeric types, date and timestamp types are represented as numerics internally, `percentile_approx` can support them easily.
After this PR, it supports date type, timestamp type and numeric types as input types. The result type is also changed to be the same as the input type, which is more reasonable for percentiles.
This change is also required when we generate equi-height histograms for these types.
## How was this patch tested?
Added a new test and modified some existing tests.
Author: Zhenhua Wang <wangzhenhua@huawei.com>
Closes#19321 from wzhfy/approx_percentile_support_types.
## What changes were proposed in this pull request?
Enable Scala 2.12 REPL. Fix most remaining issues with 2.12 compilation and warnings, including:
- Selecting Kafka 0.10.1+ for Scala 2.12 and patching over a minor API difference
- Fixing lots of "eta expansion of zero arg method deprecated" warnings
- Resolving the SparkContext.sequenceFile implicits compile problem
- Fixing an odd but valid jetty-server missing dependency in hive-thriftserver
## How was this patch tested?
Existing tests
Author: Sean Owen <sowen@cloudera.com>
Closes#19307 from srowen/Scala212.
## What changes were proposed in this pull request?
This PR proposes to remove `assume` in `Utils.resolveURIs` and replace `assume` to `assert` in `Utils.resolveURI` in the test cases in `UtilsSuite`.
It looks `Utils.resolveURIs` supports multiple but also single paths as input. So, it looks not meaningful to check if the input has `,`.
For the test for `Utils.resolveURI`, I replaced it to `assert` because it looks taking single path and in order to prevent future mistakes when adding more tests here.
For `assume` in `HiveDDLSuite`, it looks it should be `assert` to test at the last
## How was this patch tested?
Fixed unit tests.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#19332 from HyukjinKwon/SPARK-22093.
## What changes were proposed in this pull request?
The implemented `isCascadingTruncateTable` in `AggregatedDialect` is wrong. When no dialect claims cascading, once there is an unknown cascading truncate in the dialects, we should return unknown cascading, instead of false.
## How was this patch tested?
Added test.
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#19286 from viirya/SPARK-21338-followup.
## What changes were proposed in this pull request?
This PR proposes to enhance the documentation for `trim` functions in the function description session.
- Add more `usage`, `arguments` and `examples` for the trim function
- Adjust space in the `usage` session
After the changes, the trim function documentation will look like this:
- `trim`
```trim(str) - Removes the leading and trailing space characters from str.
trim(BOTH trimStr FROM str) - Remove the leading and trailing trimStr characters from str
trim(LEADING trimStr FROM str) - Remove the leading trimStr characters from str
trim(TRAILING trimStr FROM str) - Remove the trailing trimStr characters from str
Arguments:
str - a string expression
trimStr - the trim string characters to trim, the default value is a single space
BOTH, FROM - these are keywords to specify trimming string characters from both ends of the string
LEADING, FROM - these are keywords to specify trimming string characters from the left end of the string
TRAILING, FROM - these are keywords to specify trimming string characters from the right end of the string
Examples:
> SELECT trim(' SparkSQL ');
SparkSQL
> SELECT trim('SL', 'SSparkSQLS');
parkSQ
> SELECT trim(BOTH 'SL' FROM 'SSparkSQLS');
parkSQ
> SELECT trim(LEADING 'SL' FROM 'SSparkSQLS');
parkSQLS
> SELECT trim(TRAILING 'SL' FROM 'SSparkSQLS');
SSparkSQ
```
- `ltrim`
```ltrim
ltrim(str) - Removes the leading space characters from str.
ltrim(trimStr, str) - Removes the leading string contains the characters from the trim string
Arguments:
str - a string expression
trimStr - the trim string characters to trim, the default value is a single space
Examples:
> SELECT ltrim(' SparkSQL ');
SparkSQL
> SELECT ltrim('Sp', 'SSparkSQLS');
arkSQLS
```
- `rtrim`
```rtrim
rtrim(str) - Removes the trailing space characters from str.
rtrim(trimStr, str) - Removes the trailing string which contains the characters from the trim string from the str
Arguments:
str - a string expression
trimStr - the trim string characters to trim, the default value is a single space
Examples:
> SELECT rtrim(' SparkSQL ');
SparkSQL
> SELECT rtrim('LQSa', 'SSparkSQLS');
SSpark
```
This is the trim characters function jira: [trim function](https://issues.apache.org/jira/browse/SPARK-14878)
## How was this patch tested?
Manually tested
```
spark-sql> describe function extended trim;
17/09/22 17:03:04 INFO CodeGenerator: Code generated in 153.026533 ms
Function: trim
Class: org.apache.spark.sql.catalyst.expressions.StringTrim
Usage:
trim(str) - Removes the leading and trailing space characters from `str`.
trim(BOTH trimStr FROM str) - Remove the leading and trailing `trimStr` characters from `str`
trim(LEADING trimStr FROM str) - Remove the leading `trimStr` characters from `str`
trim(TRAILING trimStr FROM str) - Remove the trailing `trimStr` characters from `str`
Extended Usage:
Arguments:
* str - a string expression
* trimStr - the trim string characters to trim, the default value is a single space
* BOTH, FROM - these are keywords to specify trimming string characters from both ends of
the string
* LEADING, FROM - these are keywords to specify trimming string characters from the left
end of the string
* TRAILING, FROM - these are keywords to specify trimming string characters from the right
end of the string
Examples:
> SELECT trim(' SparkSQL ');
SparkSQL
> SELECT trim('SL', 'SSparkSQLS');
parkSQ
> SELECT trim(BOTH 'SL' FROM 'SSparkSQLS');
parkSQ
> SELECT trim(LEADING 'SL' FROM 'SSparkSQLS');
parkSQLS
> SELECT trim(TRAILING 'SL' FROM 'SSparkSQLS');
SSparkSQ
```
```
spark-sql> describe function extended ltrim;
Function: ltrim
Class: org.apache.spark.sql.catalyst.expressions.StringTrimLeft
Usage:
ltrim(str) - Removes the leading space characters from `str`.
ltrim(trimStr, str) - Removes the leading string contains the characters from the trim string
Extended Usage:
Arguments:
* str - a string expression
* trimStr - the trim string characters to trim, the default value is a single space
Examples:
> SELECT ltrim(' SparkSQL ');
SparkSQL
> SELECT ltrim('Sp', 'SSparkSQLS');
arkSQLS
```
```
spark-sql> describe function extended rtrim;
Function: rtrim
Class: org.apache.spark.sql.catalyst.expressions.StringTrimRight
Usage:
rtrim(str) - Removes the trailing space characters from `str`.
rtrim(trimStr, str) - Removes the trailing string which contains the characters from the trim string from the `str`
Extended Usage:
Arguments:
* str - a string expression
* trimStr - the trim string characters to trim, the default value is a single space
Examples:
> SELECT rtrim(' SparkSQL ');
SparkSQL
> SELECT rtrim('LQSa', 'SSparkSQLS');
SSpark
```
Author: Kevin Yu <qyu@us.ibm.com>
Closes#19329 from kevinyu98/spark-14878-5.
## What changes were proposed in this pull request?
This PR proposes to resolve the type conflicts in strings and timestamps in partition column values.
It looks we need to set the timezone as it needs a cast between strings and timestamps.
```scala
val df = Seq((1, "2015-01-01 00:00:00"), (2, "2014-01-01 00:00:00"), (3, "blah")).toDF("i", "str")
val path = "/tmp/test.parquet"
df.write.format("parquet").partitionBy("str").save(path)
spark.read.parquet(path).show()
```
**Before**
```
java.util.NoSuchElementException: None.get
at scala.None$.get(Option.scala:347)
at scala.None$.get(Option.scala:345)
at org.apache.spark.sql.catalyst.expressions.TimeZoneAwareExpression$class.timeZone(datetimeExpressions.scala:46)
at org.apache.spark.sql.catalyst.expressions.Cast.timeZone$lzycompute(Cast.scala:172)
at org.apache.spark.sql.catalyst.expressions.Cast.timeZone(Cast.scala:172)
at org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$castToString$3$$anonfun$apply$16.apply(Cast.scala:208)
at org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$castToString$3$$anonfun$apply$16.apply(Cast.scala:208)
at org.apache.spark.sql.catalyst.expressions.Cast.org$apache$spark$sql$catalyst$expressions$Cast$$buildCast(Cast.scala:201)
at org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$castToString$3.apply(Cast.scala:207)
at org.apache.spark.sql.catalyst.expressions.Cast.nullSafeEval(Cast.scala:533)
at org.apache.spark.sql.catalyst.expressions.UnaryExpression.eval(Expression.scala:331)
at org.apache.spark.sql.execution.datasources.PartitioningUtils$$anonfun$org$apache$spark$sql$execution$datasources$PartitioningUtils$$resolveTypeConflicts$1.apply(PartitioningUtils.scala:481)
at org.apache.spark.sql.execution.datasources.PartitioningUtils$$anonfun$org$apache$spark$sql$execution$datasources$PartitioningUtils$$resolveTypeConflicts$1.apply(PartitioningUtils.scala:480)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
```
**After**
```
+---+-------------------+
| i| str|
+---+-------------------+
| 2|2014-01-01 00:00:00|
| 1|2015-01-01 00:00:00|
| 3| blah|
+---+-------------------+
```
## How was this patch tested?
Unit tests added in `ParquetPartitionDiscoverySuite` and manual tests.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#19331 from HyukjinKwon/SPARK-22109.
## What changes were proposed in this pull request?
Try to avoid allocating an array bigger than Integer.MAX_VALUE - 8, which is the actual max size on some JVMs, in several places
## How was this patch tested?
Existing tests
Author: Sean Owen <sowen@cloudera.com>
Closes#19266 from srowen/SPARK-22033.
## What changes were proposed in this pull request?
`OffHeapColumnVector.reserveInternal()` will only copy already inserted values during reallocation if `data != null`. In vectors containing arrays or structs this is incorrect, since there field `data` is not used at all. We need to check `nulls` instead.
## How was this patch tested?
Adds new tests to `ColumnVectorSuite` that reproduce the errors.
Author: Ala Luszczak <ala@databricks.com>
Closes#19308 from ala/vector-realloc.
This PR adds vectorized UDFs to the Python API
**Proposed API**
Introduce a flag to turn on vectorization for a defined UDF, for example:
```
pandas_udf(DoubleType())
def plus(a, b)
return a + b
```
or
```
plus = pandas_udf(lambda a, b: a + b, DoubleType())
```
Usage is the same as normal UDFs
0-parameter UDFs
pandas_udf functions can declare an optional `**kwargs` and when evaluated, will contain a key "size" that will give the required length of the output. For example:
```
pandas_udf(LongType())
def f0(**kwargs):
return pd.Series(1).repeat(kwargs["size"])
df.select(f0())
```
Added new unit tests in pyspark.sql that are enabled if pyarrow and Pandas are available.
- [x] Fix support for promoted types with null values
- [ ] Discuss 0-param UDF API (use of kwargs)
- [x] Add tests for chained UDFs
- [ ] Discuss behavior when pyarrow not installed / enabled
- [ ] Cleanup pydoc and add user docs
Author: Bryan Cutler <cutlerb@gmail.com>
Author: Takuya UESHIN <ueshin@databricks.com>
Closes#18659 from BryanCutler/arrow-vectorized-udfs-SPARK-21404.
## What changes were proposed in this pull request?
Right now the calculation of SortMergeJoinExec's outputOrdering relies on the fact that its children have already been sorted on the join keys, while this is often not true until EnsureRequirements has been applied. So we ended up not getting the correct outputOrdering during physical planning stage before Sort nodes are added to the children.
For example, J = {A join B on key1 = key2}
1. if A is NOT ordered on key1 ASC, J's outputOrdering should include "key1 ASC"
2. if A is ordered on key1 ASC, J's outputOrdering should include "key1 ASC"
3. if A is ordered on key1 ASC, with sameOrderExp=c1, J's outputOrdering should include "key1 ASC, sameOrderExp=c1"
So to fix this I changed the behavior of <code>getKeyOrdering(keys, childOutputOrdering)</code> to:
1. If the childOutputOrdering satisfies (is a superset of) the required child ordering => childOutputOrdering
2. Otherwise => required child ordering
In addition, I organized the logic for deciding the relationship between two orderings into SparkPlan, so that it can be reused by EnsureRequirements and SortMergeJoinExec, and potentially other classes.
## How was this patch tested?
Added new test cases.
Passed all integration tests.
Author: maryannxue <maryann.xue@gmail.com>
Closes#19281 from maryannxue/spark-21998.
## What changes were proposed in this pull request?
`processAllAvailable` should also check the query state and if the query is stopped, it should return.
## How was this patch tested?
The new unit test.
Author: Shixiong Zhu <zsxwing@gmail.com>
Closes#19314 from zsxwing/SPARK-22094.
## What changes were proposed in this pull request?
#### Architecture
This PR implements stream-stream inner join using a two-way symmetric hash join. At a high level, we want to do the following.
1. For each stream, we maintain the past rows as state in State Store.
- For each joining key, there can be multiple rows that have been received.
- So, we have to effectively maintain a key-to-list-of-values multimap as state for each stream.
2. In each batch, for each input row in each stream
- Look up the other streams state to see if there are matching rows, and output them if they satisfy the joining condition
- Add the input row to corresponding stream’s state.
- If the data has a timestamp/window column with watermark, then we will use that to calculate the threshold for keys that are required to buffered for future matches and drop the rest from the state.
Cleaning up old unnecessary state rows depends completely on whether watermark has been defined and what are join conditions. We definitely want to support state clean up two types of queries that are likely to be common.
- Queries to time range conditions - E.g. `SELECT * FROM leftTable, rightTable ON leftKey = rightKey AND leftTime > rightTime - INTERVAL 8 MINUTES AND leftTime < rightTime + INTERVAL 1 HOUR`
- Queries with windows as the matching key - E.g. `SELECT * FROM leftTable, rightTable ON leftKey = rightKey AND window(leftTime, "1 hour") = window(rightTime, "1 hour")` (pseudo-SQL)
#### Implementation
The stream-stream join is primarily implemented in three classes
- `StreamingSymmetricHashJoinExec` implements the above symmetric join algorithm.
- `SymmetricsHashJoinStateManagers` manages the streaming state for the join. This essentially is a fault-tolerant key-to-list-of-values multimap built on the StateStore APIs. `StreamingSymmetricHashJoinExec` instantiates two such managers, one for each join side.
- `StreamingSymmetricHashJoinExecHelper` is a helper class to extract threshold for the state based on the join conditions and the event watermark.
Refer to the scaladocs class for more implementation details.
Besides the implementation of stream-stream inner join SparkPlan. Some additional changes are
- Allowed inner join in append mode in UnsupportedOperationChecker
- Prevented stream-stream join on an empty batch dataframe to be collapsed by the optimizer
## How was this patch tested?
- New tests in StreamingJoinSuite
- Updated tests UnsupportedOperationSuite
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes#19271 from tdas/SPARK-22053.
## What changes were proposed in this pull request?
I test on a dataset of about 13M instances, and found that using `treeAggregate` give a speedup in following algs:
|Algs| SpeedUp |
|------|-----------|
|OneHotEncoder| 5% |
|StatFunctions.calculateCov| 7% |
|StatFunctions.multipleApproxQuantiles| 9% |
|RegressionEvaluator| 8% |
## How was this patch tested?
existing tests
Author: Zheng RuiFeng <ruifengz@foxmail.com>
Closes#19232 from zhengruifeng/use_treeAggregate.
## What changes were proposed in this pull request?
There is an incorrect `scalastyle:on` comment in `stringExpressions.scala` and causes the line size limit check ineffective in the file. There are many lines of code and comment which are more than 100 chars.
## How was this patch tested?
Code style change only.
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#19305 from viirya/fix-wrong-style.
## What changes were proposed in this pull request?
Adjust EnsureStatefulOpPartitioningSuite to use scalatest lifecycle normally instead of constructor; fixes:
```
*** RUN ABORTED ***
org.apache.spark.SparkException: Only one SparkContext may be running in this JVM (see SPARK-2243). To ignore this error, set spark.driver.allowMultipleContexts = true. The currently running SparkContext was created at:
org.apache.spark.sql.streaming.EnsureStatefulOpPartitioningSuite.<init>(EnsureStatefulOpPartitioningSuite.scala:35)
```
## How was this patch tested?
Existing tests
Author: Sean Owen <sowen@cloudera.com>
Closes#19306 from srowen/SPARK-21977.2.
## What changes were proposed in this pull request?
In SQL conditional expressions, only CASE WHEN lacks for expression description. This patch fills the gap.
## How was this patch tested?
Only documentation change.
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#19304 from viirya/casewhen-doc.
## What changes were proposed in this pull request?
This work is a part of [SPARK-17074](https://issues.apache.org/jira/browse/SPARK-17074) to compute equi-height histograms. Equi-height histogram is an array of bins. A bin consists of two endpoints which form an interval of values and the ndv in that interval.
This PR creates a new aggregate function, given an array of endpoints, counting distinct values (ndv) in intervals among those endpoints.
This PR also refactors `HyperLogLogPlusPlus` by extracting a helper class `HyperLogLogPlusPlusHelper`, where the underlying HLLPP algorithm locates.
## How was this patch tested?
Add new test cases.
Author: Zhenhua Wang <wangzhenhua@huawei.com>
Closes#15544 from wzhfy/countIntervals.
## What changes were proposed in this pull request?
This a follow-up of https://github.com/apache/spark/pull/19289 , we missed another place: `rollup`. `Seq.init.toSeq` also returns a `Stream`, we should fix it too.
## How was this patch tested?
manually
Author: Wenchen Fan <wenchen@databricks.com>
Closes#19298 from cloud-fan/bug.
## What changes were proposed in this pull request?
Spark with Scala 2.10 fails with a group by cube:
```
spark.range(1).select($"id" as "a", $"id" as "b").write.partitionBy("a").mode("overwrite").saveAsTable("rollup_bug")
spark.sql("select 1 from rollup_bug group by rollup ()").show
```
It can be traced back to https://github.com/apache/spark/pull/15484 , which made `Expand.projections` a lazy `Stream` for group by cube.
In scala 2.10 `Stream` captures a lot of stuff, and in this case it captures the entire query plan which has some un-serializable parts.
This change is also good for master branch, to reduce the serialized size of `Expand.projections`.
## How was this patch tested?
manually verified with Spark with Scala 2.10.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#19289 from cloud-fan/bug.
## What changes were proposed in this pull request?
Clarify behavior of to_utc_timestamp/from_utc_timestamp with an example
## How was this patch tested?
Doc only change / existing tests
Author: Sean Owen <sowen@cloudera.com>
Closes#19276 from srowen/SPARK-22049.
## What changes were proposed in this pull request?
Update plugins, including scala-maven-plugin, to latest versions. Update checkstyle to 8.2. Remove bogus checkstyle config and enable it. Fix existing and new Java checkstyle errors.
## How was this patch tested?
Existing tests
Author: Sean Owen <sowen@cloudera.com>
Closes#19282 from srowen/SPARK-22066.
## What changes were proposed in this pull request?
This is a bit hard to explain as there are several issues here, I'll try my best. Here are the requirements:
1. A StructuredStreaming Source that can generate empty RDDs with 0 partitions
2. A StructuredStreaming query that uses the above source, performs a stateful aggregation
(mapGroupsWithState, groupBy.count, ...), and coalesce's by 1
The crux of the problem is that when a dataset has a `coalesce(1)` call, it receives a `SinglePartition` partitioning scheme. This scheme satisfies most required distributions used for aggregations such as HashAggregateExec. This causes a world of problems:
Symptom 1. If the input RDD has 0 partitions, the whole lineage will receive 0 partitions, nothing will be executed, the state store will not create any delta files. When this happens, the next trigger fails, because the StateStore fails to load the delta file for the previous trigger
Symptom 2. Let's say that there was data. Then in this case, if you stop your stream, and change `coalesce(1)` with `coalesce(2)`, then restart your stream, your stream will fail, because `spark.sql.shuffle.partitions - 1` number of StateStores will fail to find its delta files.
To fix the issues above, we must check that the partitioning of the child of a `StatefulOperator` satisfies:
If the grouping expressions are empty:
a) AllTuple distribution
b) Single physical partition
If the grouping expressions are non empty:
a) Clustered distribution
b) spark.sql.shuffle.partition # of partitions
whether or not `coalesce(1)` exists in the plan, and whether or not the input RDD for the trigger has any data.
Once you fix the above problem by adding an Exchange to the plan, you come across the following bug:
If you call `coalesce(1).groupBy().count()` on a Streaming DataFrame, and if you have a trigger with no data, `StateStoreRestoreExec` doesn't return the prior state. However, for this specific aggregation, `HashAggregateExec` after the restore returns a (0, 0) row, since we're performing a count, and there is no data. Then this data gets stored in `StateStoreSaveExec` causing the previous counts to be overwritten and lost.
## How was this patch tested?
Regression tests
Author: Burak Yavuz <brkyvz@gmail.com>
Closes#19196 from brkyvz/sa-0.
This change modifies the live listener bus so that all listeners are
added to queues; each queue has its own thread to dispatch events,
making it possible to separate slow listeners from other more
performance-sensitive ones.
The public API has not changed - all listeners added with the existing
"addListener" method, which after this change mostly means all
user-defined listeners, end up in a default queue. Internally, there's
an API allowing listeners to be added to specific queues, and that API
is used to separate the internal Spark listeners into 3 categories:
application status listeners (e.g. UI), executor management (e.g. dynamic
allocation), and the event log.
The queueing logic, while abstracted away in a separate class, is kept
as much as possible hidden away from consumers. Aside from choosing their
queue, there's no code change needed to take advantage of queues.
Test coverage relies on existing tests; a few tests had to be tweaked
because they relied on `LiveListenerBus.postToAll` being synchronous,
and the change makes that method asynchronous. Other tests were simplified
not to use the asynchronous LiveListenerBus.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#19211 from vanzin/SPARK-18838.
## What changes were proposed in this pull request?
The ArrowWriter StringWriter was setting Arrow data using a position of 0 instead of the actual position in the ByteBuffer. This was currently working because of a bug ARROW-1443, and has been fixed as of
Arrow 0.7.0. Testing with this version revealed the error in ArrowConvertersSuite test string conversion.
## How was this patch tested?
Existing tests, manually verified working with Arrow 0.7.0
Author: Bryan Cutler <cutlerb@gmail.com>
Closes#19284 from BryanCutler/arrow-ArrowWriter-StringWriter-position-SPARK-22067.
## What changes were proposed in this pull request?
Tables in the catalog cache are not invalidated once their statistics are updated. As a consequence, existing sessions will use the cached information even though it is not valid anymore. Consider and an example below.
```
// step 1
spark.range(100).write.saveAsTable("tab1")
// step 2
spark.sql("analyze table tab1 compute statistics")
// step 3
spark.sql("explain cost select distinct * from tab1").show(false)
// step 4
spark.range(100).write.mode("append").saveAsTable("tab1")
// step 5
spark.sql("explain cost select distinct * from tab1").show(false)
```
After step 3, the table will be present in the catalog relation cache. Step 4 will correctly update the metadata inside the catalog but will NOT invalidate the cache.
By the way, ``spark.sql("analyze table tab1 compute statistics")`` between step 3 and step 4 would also solve the problem.
## How was this patch tested?
Current and additional unit tests.
Author: aokolnychyi <anton.okolnychyi@sap.com>
Closes#19252 from aokolnychyi/spark-21969.
## What changes were proposed in this pull request?
org.apache.spark.sql.jdbc.JdbcDialect's method:
def isCascadingTruncateTable(): Option[Boolean] = None
is not overriden in org.apache.spark.sql.jdbc.AggregatedDialect class.
Because of this issue, when you add more than one dialect Spark doesn't truncate table because isCascadingTruncateTable always returns default None for Aggregated Dialect.
Will implement isCascadingTruncateTable in AggregatedDialect class in this PR.
## How was this patch tested?
In JDBCSuite, inside test("Aggregated dialects"), will add one line to test AggregatedDialect.isCascadingTruncateTable
Author: Huaxin Gao <huaxing@us.ibm.com>
Closes#19256 from huaxingao/spark-21338.
## What changes were proposed in this pull request?
While running bin/spark-sql, we will reuse cliSessionState, but the Hive configurations generated here just points to a dummy meta store which actually should be the real one. And the warehouse is determined later in SharedState, HiveClient should respect this config changing in this case too.
## How was this patch tested?
existing ut
cc cloud-fan jiangxb1987
Author: Kent Yao <yaooqinn@hotmail.com>
Closes#19068 from yaooqinn/SPARK-21428-FOLLOWUP.
Current implementation for processingRate-total uses wrong metric:
mistakenly uses inputRowsPerSecond instead of processedRowsPerSecond
## What changes were proposed in this pull request?
Adjust processingRate-total from using inputRowsPerSecond to processedRowsPerSecond
## How was this patch tested?
Built spark from source with proposed change and tested output with correct parameter. Before change the csv metrics file for inputRate-total and processingRate-total displayed the same values due to the error. After changing MetricsReporter.scala the processingRate-total csv file displayed the correct metric.
<img width="963" alt="processed rows per second" src="https://user-images.githubusercontent.com/32072374/30554340-82eea12c-9ca4-11e7-8370-8168526ff9a2.png">
Please review http://spark.apache.org/contributing.html before opening a pull request.
Author: Taaffy <32072374+Taaffy@users.noreply.github.com>
Closes#19268 from Taaffy/patch-1.
## What changes were proposed in this pull request?
* Removed the method `org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter#alignToWords`.
It became unused as a result of 85b0a15754
(SPARK-15962) introducing word alignment for unsafe arrays.
* Cleaned up duplicate code in memory management and unsafe sorters
* The change extracting the exception paths is more than just cosmetics since it def. reduces the size the affected methods compile to
## How was this patch tested?
* Build still passes after removing the method, grepping the codebase for `alignToWords` shows no reference to it anywhere either.
* Dried up code is covered by existing tests.
Author: Armin <me@obrown.io>
Closes#19254 from original-brownbear/cleanup-mem-consumer.
## What changes were proposed in this pull request?
This PR tries to download Spark for each test run, to make sure each test run is absolutely isolated.
## How was this patch tested?
N/A
Author: Wenchen Fan <wenchen@databricks.com>
Closes#19265 from cloud-fan/test.
#### What changes were proposed in this pull request?
This PR enhances the TRIM function support in Spark SQL by allowing the specification
of trim characters set. Below is the SQL syntax :
``` SQL
<trim function> ::= TRIM <left paren> <trim operands> <right paren>
<trim operands> ::= [ [ <trim specification> ] [ <trim character set> ] FROM ] <trim source>
<trim source> ::= <character value expression>
<trim specification> ::=
LEADING
| TRAILING
| BOTH
<trim character set> ::= <characters value expression>
```
or
``` SQL
LTRIM (source-exp [, trim-exp])
RTRIM (source-exp [, trim-exp])
```
Here are the documentation link of support of this feature by other mainstream databases.
- **Oracle:** [TRIM function](http://docs.oracle.com/cd/B28359_01/olap.111/b28126/dml_functions_2126.htm#OLADM704)
- **DB2:** [TRIM scalar function](https://www.ibm.com/support/knowledgecenter/en/SSMKHH_10.0.0/com.ibm.etools.mft.doc/ak05270_.htm)
- **MySQL:** [Trim function](http://dev.mysql.com/doc/refman/5.7/en/string-functions.html#function_trim)
- **Oracle:** [ltrim](https://docs.oracle.com/cd/B28359_01/olap.111/b28126/dml_functions_2018.htm#OLADM594)
- **DB2:** [ltrim](https://www.ibm.com/support/knowledgecenter/en/SSEPEK_11.0.0/sqlref/src/tpc/db2z_bif_ltrim.html)
This PR is to implement the above enhancement. In the implementation, the design principle is to keep the changes to the minimum. Also, the exiting trim functions (which handles a special case, i.e., trimming space characters) are kept unchanged for performane reasons.
#### How was this patch tested?
The unit test cases are added in the following files:
- UTF8StringSuite.java
- StringExpressionsSuite.scala
- sql/SQLQuerySuite.scala
- StringFunctionsSuite.scala
Author: Kevin Yu <qyu@us.ibm.com>
Closes#12646 from kevinyu98/spark-14878.
## What changes were proposed in this pull request?
The UDF needs to deserialize the `UnsafeRow`. When the column type is Array, the `get` method from the `ColumnVector`, which is used by the vectorized reader, is called, but this method is not implemented.
## How was this patch tested?
(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)
Please review http://spark.apache.org/contributing.html before opening a pull request.
Author: Feng Liu <fengliu@databricks.com>
Closes#19230 from liufengdb/fix_array_open.
## What changes were proposed in this pull request?
As reported in https://issues.apache.org/jira/browse/SPARK-22047 , HiveExternalCatalogVersionsSuite is failing frequently, let's disable this test suite to unblock other PRs, I'm looking into the root cause.
## How was this patch tested?
N/A
Author: Wenchen Fan <wenchen@databricks.com>
Closes#19264 from cloud-fan/test.
## What changes were proposed in this pull request?
Take the minimum of all watermark exec nodes as the "real" watermark in StreamExecution, rather than picking one arbitrarily.
## How was this patch tested?
new unit test
Author: Jose Torres <jose@databricks.com>
Closes#19239 from joseph-torres/SPARK-22017.
## What changes were proposed in this pull request?
This PR adds the infrastructure for data source v2, and implement features which Spark already have in data source v1, i.e. column pruning, filter push down, catalyst expression filter push down, InternalRow scan, schema inference, data size report. The write path is excluded to avoid making this PR growing too big, and will be added in follow-up PR.
## How was this patch tested?
new tests
Author: Wenchen Fan <wenchen@databricks.com>
Closes#19136 from cloud-fan/data-source-v2.
## What changes were proposed in this pull request?
In https://github.com/apache/spark/pull/18600 we removed the `metadata` field from `SparkPlanInfo`. This causes a problem when we replay event logs that are generated by older Spark versions.
## How was this patch tested?
a regression test.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#19237 from cloud-fan/event.
## What changes were proposed in this pull request?
https://github.com/apache/spark/pull/18266 add a new feature to support read JDBC table use custom schema, but we must specify all the fields. For simplicity, this PR support specify partial fields.
## How was this patch tested?
unit tests
Author: Yuming Wang <wgyumg@gmail.com>
Closes#19231 from wangyum/SPARK-22002.
## What changes were proposed in this pull request?
If there are two projects like as follows.
```
Project [a_with_metadata#27 AS b#26]
+- Project [a#0 AS a_with_metadata#27]
+- LocalRelation <empty>, [a#0, b#1]
```
Child Project has an output column with a metadata in it, and the parent Project has an alias that implicitly forwards the metadata. So this metadata is visible for higher operators. Upon applying CollapseProject optimizer rule, the metadata is not preserved.
```
Project [a#0 AS b#26]
+- LocalRelation <empty>, [a#0, b#1]
```
This is incorrect, as downstream operators that expect certain metadata (e.g. watermark in structured streaming) to identify certain fields will fail to do so. This PR fixes it by preserving the metadata of top-level aliases.
## How was this patch tested?
New unit test
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes#19240 from tdas/SPARK-22018.
## What changes were proposed in this pull request?
In previous work SPARK-21513, we has allowed `MapType` and `ArrayType` of `MapType`s convert to a json string but only for Scala API. In this follow-up PR, we will make SparkSQL support it for PySpark and SparkR, too. We also fix some little bugs and comments of the previous work in this follow-up PR.
### For PySpark
```
>>> data = [(1, {"name": "Alice"})]
>>> df = spark.createDataFrame(data, ("key", "value"))
>>> df.select(to_json(df.value).alias("json")).collect()
[Row(json=u'{"name":"Alice")']
>>> data = [(1, [{"name": "Alice"}, {"name": "Bob"}])]
>>> df = spark.createDataFrame(data, ("key", "value"))
>>> df.select(to_json(df.value).alias("json")).collect()
[Row(json=u'[{"name":"Alice"},{"name":"Bob"}]')]
```
### For SparkR
```
# Converts a map into a JSON object
df2 <- sql("SELECT map('name', 'Bob')) as people")
df2 <- mutate(df2, people_json = to_json(df2$people))
# Converts an array of maps into a JSON array
df2 <- sql("SELECT array(map('name', 'Bob'), map('name', 'Alice')) as people")
df2 <- mutate(df2, people_json = to_json(df2$people))
```
## How was this patch tested?
Add unit test cases.
cc viirya HyukjinKwon
Author: goldmedal <liugs963@gmail.com>
Closes#19223 from goldmedal/SPARK-21513-fp-PySaprkAndSparkR.
## What changes were proposed in this pull request?
Add default stats to StreamingExecutionRelation.
## How was this patch tested?
existing unit tests and an explain() test to be sure
Author: Jose Torres <jose@databricks.com>
Closes#19212 from joseph-torres/SPARK-21988.
## What changes were proposed in this pull request?
Drop test tables and improve comments.
## How was this patch tested?
Modified existing test.
Author: Zhenhua Wang <wangzhenhua@huawei.com>
Closes#19213 from wzhfy/useless_comment.
## What changes were proposed in this pull request?
This PR is clean the codes in https://github.com/apache/spark/pull/18975
## How was this patch tested?
N/A
Author: gatorsmile <gatorsmile@gmail.com>
Closes#19225 from gatorsmile/refactorSPARK-4131.
## What changes were proposed in this pull request?
When reading column descriptions from hive catalog, we currently populate the metadata for all types to record the raw hive type string. In terms of processing , we need this additional metadata information for CHAR/VARCHAR types or complex type containing the CHAR/VARCHAR types.
Its a minor cleanup. I haven't created a JIRA for it.
## How was this patch tested?
Test added in HiveMetastoreCatalogSuite
Author: Dilip Biswal <dbiswal@us.ibm.com>
Closes#19215 from dilipbiswal/column_metadata.
## What changes were proposed in this pull request?
This pr added a new option to filter TPC-DS queries to run in `TPCDSQueryBenchmark`.
By default, `TPCDSQueryBenchmark` runs all the TPC-DS queries.
This change could enable developers to run some of the TPC-DS queries by this option,
e.g., to run q2, q4, and q6 only:
```
spark-submit --class <this class> --conf spark.sql.tpcds.queryFilter="q2,q4,q6" --jars <spark sql test jar>
```
## How was this patch tested?
Manually checked.
Author: Takeshi Yamamuro <yamamuro@apache.org>
Closes#19188 from maropu/RunPartialQueriesInTPCDS.
## What changes were proposed in this pull request?
Auto generated Oracle schema some times not we expect:
- `number(1)` auto mapped to BooleanType, some times it's not we expect, per [SPARK-20921](https://issues.apache.org/jira/browse/SPARK-20921).
- `number` auto mapped to Decimal(38,10), It can't read big data, per [SPARK-20427](https://issues.apache.org/jira/browse/SPARK-20427).
This PR fix this issue by custom schema as follows:
```scala
val props = new Properties()
props.put("customSchema", "ID decimal(38, 0), N1 int, N2 boolean")
val dfRead = spark.read.schema(schema).jdbc(jdbcUrl, "tableWithCustomSchema", props)
dfRead.show()
```
or
```sql
CREATE TEMPORARY VIEW tableWithCustomSchema
USING org.apache.spark.sql.jdbc
OPTIONS (url '$jdbcUrl', dbTable 'tableWithCustomSchema', customSchema'ID decimal(38, 0), N1 int, N2 boolean')
```
## How was this patch tested?
unit tests
Author: Yuming Wang <wgyumg@gmail.com>
Closes#18266 from wangyum/SPARK-20427.
## What changes were proposed in this pull request?
The code is already merged to master:
https://github.com/apache/spark/pull/18975
This is a following up PR to merge HiveTmpFile.scala to SaveAsHiveFile.
## How was this patch tested?
Build successfully
Author: Jane Wang <janewang@fb.com>
Closes#19221 from janewangfb/merge_savehivefile_hivetmpfile.
## What changes were proposed in this pull request?
https://issues.apache.org/jira/browse/SPARK-21980
This PR fixes the issue in ResolveGroupingAnalytics rule, which indexes the column references in grouping functions without considering case sensitive configurations.
The problem can be reproduced by:
`val df = spark.createDataFrame(Seq((1, 1), (2, 1), (2, 2))).toDF("a", "b")
df.cube("a").agg(grouping("A")).show()`
## How was this patch tested?
unit tests
Author: donnyzone <wellfengzhu@gmail.com>
Closes#19202 from DonnyZone/ResolveGroupingAnalytics.
## What changes were proposed in this pull request?
1. Removing all redundant throws declarations from Java codebase.
2. Removing dead code made visible by this from `ShuffleExternalSorter#closeAndGetSpills`
## How was this patch tested?
Build still passes.
Author: Armin <me@obrown.io>
Closes#19182 from original-brownbear/SPARK-21970.
# What changes were proposed in this pull request?
UDF to_json only supports converting `StructType` or `ArrayType` of `StructType`s to a json output string now.
According to the discussion of JIRA SPARK-21513, I allow to `to_json` support converting `MapType` and `ArrayType` of `MapType`s to a json output string.
This PR is for SQL and Scala API only.
# How was this patch tested?
Adding unit test case.
cc viirya HyukjinKwon
Author: goldmedal <liugs963@gmail.com>
Author: Jia-Xuan Liu <liugs963@gmail.com>
Closes#18875 from goldmedal/SPARK-21513.
## What changes were proposed in this pull request?
Improve QueryPlanConstraints framework, make it robust and simple.
In https://github.com/apache/spark/pull/15319, constraints for expressions like `a = f(b, c)` is resolved.
However, for expressions like
```scala
a = f(b, c) && c = g(a, b)
```
The current QueryPlanConstraints framework will produce non-converging constraints.
Essentially, the problem is caused by having both the name and child of aliases in the same constraint set. We infer constraints, and push down constraints as predicates in filters, later on these predicates are propagated as constraints, etc..
Simply using the alias names only can resolve these problems. The size of constraints is reduced without losing any information. We can always get these inferred constraints on child of aliases when pushing down filters.
Also, the EqualNullSafe between name and child in propagating alias is meaningless
```scala
allConstraints += EqualNullSafe(e, a.toAttribute)
```
It just produces redundant constraints.
## How was this patch tested?
Unit test
Author: Wang Gengliang <ltnwgl@gmail.com>
Closes#19201 from gengliangwang/QueryPlanConstraints.
## What changes were proposed in this pull request?
TPCDSQueryBenchmark packaged into a jar doesn't work with spark-submit.
It's because of the failure of reference query files in the jar file.
## How was this patch tested?
Ran the benchmark.
Author: sarutak <sarutak@oss.nttdata.co.jp>
Author: Kousuke Saruta <sarutak@oss.nttdata.co.jp>
Closes#18592 from sarutak/fix-tpcds-benchmark.
## What changes were proposed in this pull request?
Support DESC (EXTENDED | FORMATTED) ? TABLE COLUMN command.
Support DESC EXTENDED | FORMATTED TABLE COLUMN command to show column-level statistics.
Do NOT support describe nested columns.
## How was this patch tested?
Added test cases.
Author: Zhenhua Wang <wzh_zju@163.com>
Author: Zhenhua Wang <wangzhenhua@huawei.com>
Author: wangzhenhua <wangzhenhua@huawei.com>
Closes#16422 from wzhfy/descColumn.
## What changes were proposed in this pull request?
When the `requiredSchema` only contains `_corrupt_record`, the derived `actualSchema` is empty and the `_corrupt_record` are all null for all rows. This PR captures above situation and raise an exception with a reasonable workaround messag so that users can know what happened and how to fix the query.
## How was this patch tested?
Added unit test in `CSVSuite`.
Author: Jen-Ming Chung <jenmingisme@gmail.com>
Closes#19199 from jmchung/SPARK-21610-FOLLOWUP.
## What changes were proposed in this pull request?
this PR describe remove the import class that are unused.
## How was this patch tested?
N/A
Author: caoxuewen <cao.xuewen@zte.com.cn>
Closes#19131 from heary-cao/unuse_import.
## What changes were proposed in this pull request?
```
echo '{"field": 1}
{"field": 2}
{"field": "3"}' >/tmp/sample.json
```
```scala
import org.apache.spark.sql.types._
val schema = new StructType()
.add("field", ByteType)
.add("_corrupt_record", StringType)
val file = "/tmp/sample.json"
val dfFromFile = spark.read.schema(schema).json(file)
scala> dfFromFile.show(false)
+-----+---------------+
|field|_corrupt_record|
+-----+---------------+
|1 |null |
|2 |null |
|null |{"field": "3"} |
+-----+---------------+
scala> dfFromFile.filter($"_corrupt_record".isNotNull).count()
res1: Long = 0
scala> dfFromFile.filter($"_corrupt_record".isNull).count()
res2: Long = 3
```
When the `requiredSchema` only contains `_corrupt_record`, the derived `actualSchema` is empty and the `_corrupt_record` are all null for all rows. This PR captures above situation and raise an exception with a reasonable workaround messag so that users can know what happened and how to fix the query.
## How was this patch tested?
Added test case.
Author: Jen-Ming Chung <jenmingisme@gmail.com>
Closes#18865 from jmchung/SPARK-21610.
## What changes were proposed in this pull request?
This PR implements the sql feature:
INSERT OVERWRITE [LOCAL] DIRECTORY directory1
[ROW FORMAT row_format] [STORED AS file_format]
SELECT ... FROM ...
## How was this patch tested?
Added new unittests and also pulled the code to fb-spark so that we could test writing to hdfs directory.
Author: Jane Wang <janewang@fb.com>
Closes#18975 from janewangfb/port_local_directory.
## What changes were proposed in this pull request?
Correct DataFrame doc.
## How was this patch tested?
Only doc change, no tests.
Author: Yanbo Liang <ybliang8@gmail.com>
Closes#19173 from yanboliang/df-doc.
## What changes were proposed in this pull request?
`JacksonUtils.verifySchema` verifies if a data type can be converted to JSON. For `MapType`, it now verifies the key type. However, in `JacksonGenerator`, when converting a map to JSON, we only care about its values and create a writer for the values. The keys in a map are treated as strings by calling `toString` on the keys.
Thus, we should change `JacksonUtils.verifySchema` to verify the value type of `MapType`.
## How was this patch tested?
Added tests.
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#19167 from viirya/test-jacksonutils.
## What changes were proposed in this pull request?
In a driver heap dump containing 390,105 instances of SQLTaskMetrics this
would have saved me approximately 3.2MB of memory.
Since we're not getting any benefit from storing this unused value, let's
eliminate it until a future PR makes use of it.
## How was this patch tested?
Existing unit tests
Author: Andrew Ash <andrew@andrewash.com>
Closes#19153 from ash211/aash/trim-sql-listener.
## What changes were proposed in this pull request?
This PR fixes flaky test `InMemoryCatalogedDDLSuite "alter table: rename cached table"`.
Since this test validates distributed DataFrame, the result should be checked by using `checkAnswer`. The original version used `df.collect().Seq` method that does not guaranty an order of each element of the result.
## How was this patch tested?
Use existing test case
Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>
Closes#19159 from kiszk/SPARK-21946.
## What changes were proposed in this pull request?
The condition in `Optimizer.isPlanIntegral` is wrong. We should always return `true` if not in test mode.
## How was this patch tested?
Manually test.
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#19161 from viirya/SPARK-21726-followup.