## What changes were proposed in this pull request?
The current table insertion has some weird behaviours:
1. inserting into a partitioned table with mismatch columns has confusing error message for hive table, and wrong result for datasource table
2. inserting into a partitioned table without partition list has wrong result for hive table.
This PR fixes these 2 problems.
## How was this patch tested?
new test in hive `SQLQuerySuite`
Author: Wenchen Fan <wenchen@databricks.com>
Closes#13754 from cloud-fan/insert2.
## What changes were proposed in this pull request?
This small patch renames a few optimizer rules to make the naming more consistent, e.g. class name start with a verb. The main important "fix" is probably SamplePushDown -> PushProjectThroughSample. SamplePushDown is actually the wrong name, since the rule is not about pushing Sample down.
## How was this patch tested?
Updated test cases.
Author: Reynold Xin <rxin@databricks.com>
Closes#13732 from rxin/SPARK-16014.
#### What changes were proposed in this pull request?
`IF NOT EXISTS` in `INSERT OVERWRITE` should not support dynamic partitions. If we specify `IF NOT EXISTS`, the inserted statement is not shown in the table.
This PR is to issue an exception in this case, just like what Hive does. Also issue an exception if users specify `IF NOT EXISTS` if users do not specify any `PARTITION` specification.
#### How was this patch tested?
Added test cases into `PlanParserSuite` and `InsertIntoHiveTableSuite`
Author: gatorsmile <gatorsmile@gmail.com>
Closes#13447 from gatorsmile/insertIfNotExist.
## What changes were proposed in this pull request?
`UTF8String` and all `Unsafe*` classes are backed by either on-heap or off-heap byte arrays. The code generated version `SortMergeJoin` buffers the left hand side join keys during iteration. This was actually problematic in off-heap mode when one of the keys is a `UTF8String` (or any other 'Unsafe*` object) and the left hand side iterator was exhausted (and released its memory); the buffered keys would reference freed memory. This causes Seg-faults and all kinds of other undefined behavior when we would use one these buffered keys.
This PR fixes this problem by creating copies of the buffered variables. I have added a general method to the `CodeGenerator` for this. I have checked all places in which this could happen, and only `SortMergeJoin` had this problem.
This PR is largely based on the work of robbinspg and he should be credited for this.
closes https://github.com/apache/spark/pull/13707
## How was this patch tested?
Manually tested on problematic workloads.
Author: Pete Robbins <robbinspg@gmail.com>
Author: Herman van Hovell <hvanhovell@databricks.com>
Closes#13723 from hvanhovell/SPARK-15822-2.
## What changes were proposed in this pull request?
This PR contains a few changes on code comments.
- `HiveTypeCoercion` is renamed into `TypeCoercion`.
- `NoSuchDatabaseException` is only used for the absence of database.
- For partition type inference, only `DoubleType` is considered.
## How was this patch tested?
N/A
Author: Dongjoon Hyun <dongjoon@apache.org>
Closes#13674 from dongjoon-hyun/minor_doc_types.
## What changes were proposed in this pull request?
I've found some minor issues in "show tables" command:
1. In the `SessionCatalog.scala`, `listTables(db: String)` method will call `listTables(formatDatabaseName(db), "*")` to list all the tables for certain db, but in the method `listTables(db: String, pattern: String)`, this db name is formatted once more. So I think we should remove
`formatDatabaseName()` in the caller.
2. I suggest to add sort to listTables(db: String) in InMemoryCatalog.scala, just like listDatabases().
## How was this patch tested?
The existing test cases should cover it.
Author: bomeng <bmeng@us.ibm.com>
Closes#13695 from bomeng/SPARK-15978.
#### What changes were proposed in this pull request?
~~If the temp table already exists, we should not silently replace it when doing `CACHE TABLE AS SELECT`. This is inconsistent with the behavior of `CREAT VIEW` or `CREATE TABLE`. This PR is to fix this silent drop.~~
~~Maybe, we also can introduce new syntax for replacing the existing one. For example, in Hive, to replace a view, the syntax should be like `ALTER VIEW AS SELECT` or `CREATE OR REPLACE VIEW AS SELECT`~~
The table name in `CACHE TABLE AS SELECT` should NOT contain database prefix like "database.table". Thus, this PR captures this in Parser and outputs a better error message, instead of reporting the view already exists.
In addition, refactoring the `Parser` to generate table identifiers instead of returning the table name string.
#### How was this patch tested?
- Added a test case for caching and uncaching qualified table names
- Fixed a few test cases that do not drop temp table at the end
- Added the related test case for the issue resolved in this PR
Author: gatorsmile <gatorsmile@gmail.com>
Author: xiaoli <lixiao1983@gmail.com>
Author: Xiao Li <xiaoli@Xiaos-MacBook-Pro.local>
Closes#13572 from gatorsmile/cacheTableAsSelect.
## What changes were proposed in this pull request?
gapply() applies an R function on groups grouped by one or more columns of a DataFrame, and returns a DataFrame. It is like GroupedDataSet.flatMapGroups() in the Dataset API.
Please, let me know what do you think and if you have any ideas to improve it.
Thank you!
## How was this patch tested?
Unit tests.
1. Primitive test with different column types
2. Add a boolean column
3. Compute average by a group
Author: Narine Kokhlikyan <narine.kokhlikyan@gmail.com>
Author: NarineK <narine.kokhlikyan@us.ibm.com>
Closes#12836 from NarineK/gapply2.
## What changes were proposed in this pull request?
This PR fixes the problem that Divide Expression inside Aggregation function is casted to wrong type, which cause `select 1/2` and `select sum(1/2)`returning different result.
**Before the change:**
```
scala> sql("select 1/2 as a").show()
+---+
| a|
+---+
|0.5|
+---+
scala> sql("select sum(1/2) as a").show()
+---+
| a|
+---+
|0 |
+---+
scala> sql("select sum(1 / 2) as a").schema
res4: org.apache.spark.sql.types.StructType = StructType(StructField(a,LongType,true))
```
**After the change:**
```
scala> sql("select 1/2 as a").show()
+---+
| a|
+---+
|0.5|
+---+
scala> sql("select sum(1/2) as a").show()
+---+
| a|
+---+
|0.5|
+---+
scala> sql("select sum(1/2) as a").schema
res4: org.apache.spark.sql.types.StructType = StructType(StructField(a,DoubleType,true))
```
## How was this patch tested?
Unit test.
This PR is based on https://github.com/apache/spark/pull/13524 by Sephiroth-Lin
Author: Sean Zhong <seanzhong@databricks.com>
Closes#13651 from clockfly/SPARK-15776.
## What changes were proposed in this pull request?
Two issues I've found for "show databases" command:
1. The returned database name list was not sorted, it only works when "like" was used together; (HIVE will always return a sorted list)
2. When it is used as sql("show databases").show, it will output a table with column named as "result", but for sql("show tables").show, it will output the column name as "tableName", so I think we should be consistent and use "databaseName" at least.
## How was this patch tested?
Updated existing test case to test its ordering as well.
Author: bomeng <bmeng@us.ibm.com>
Closes#13671 from bomeng/SPARK-15952.
## What changes were proposed in this pull request?
`DataFrame` with plan overriding `sameResult` but not using canonicalized plan to compare can't cacheTable.
The example is like:
```
val localRelation = Seq(1, 2, 3).toDF()
localRelation.createOrReplaceTempView("localRelation")
spark.catalog.cacheTable("localRelation")
assert(
localRelation.queryExecution.withCachedData.collect {
case i: InMemoryRelation => i
}.size == 1)
```
and this will fail as:
```
ArrayBuffer() had size 0 instead of expected size 1
```
The reason is that when do `spark.catalog.cacheTable("localRelation")`, `CacheManager` tries to cache for the plan wrapped by `SubqueryAlias` but when planning for the DataFrame `localRelation`, `CacheManager` tries to find cached table for the not-wrapped plan because the plan for DataFrame `localRelation` is not wrapped.
Some plans like `LocalRelation`, `LogicalRDD`, etc. override `sameResult` method, but not use canonicalized plan to compare so the `CacheManager` can't detect the plans are the same.
This pr modifies them to use canonicalized plan when override `sameResult` method.
## How was this patch tested?
Added a test to check if DataFrame with plan overriding sameResult but not using canonicalized plan to compare can cacheTable.
Author: Takuya UESHIN <ueshin@happy-camper.st>
Closes#13638 from ueshin/issues/SPARK-15915.
## What changes were proposed in this pull request?
In our encoder framework, we imply that serializer expressions should use `BoundReference` to refer to the input object, and a lot of codes depend on this contract(e.g. ExpressionEncoder.tuple). This PR adds some document and assert in `ExpressionEncoder` to make it clearer.
## How was this patch tested?
existing tests
Author: Wenchen Fan <wenchen@databricks.com>
Closes#13648 from cloud-fan/comment.
## What changes were proposed in this pull request?
SparkSession.catalog.listFunctions currently returns all functions, including the list of built-in functions. This makes the method not as useful because anytime it is run the result set contains over 100 built-in functions.
## How was this patch tested?
CatalogSuite
Author: Sandeep Singh <sandeep@techaddict.me>
Closes#13413 from techaddict/SPARK-15663.
## What changes were proposed in this pull request?
This PR enforces schema check when converting DataFrame to Dataset using Kryo encoder. For example.
**Before the change:**
Schema is NOT checked when converting DataFrame to Dataset using kryo encoder.
```
scala> case class B(b: Int)
scala> implicit val encoder = Encoders.kryo[B]
scala> val df = Seq((1)).toDF("b")
scala> val ds = df.as[B] // Schema compatibility is NOT checked
```
**After the change:**
Report AnalysisException since the schema is NOT compatible.
```
scala> val ds = Seq((1)).toDF("b").as[B]
org.apache.spark.sql.AnalysisException: cannot resolve 'CAST(`b` AS BINARY)' due to data type mismatch: cannot cast IntegerType to BinaryType;
...
```
## How was this patch tested?
Unit test.
Author: Sean Zhong <seanzhong@databricks.com>
Closes#13632 from clockfly/spark-15910.
# What changes were proposed in this pull request?
This pull request fixes the COUNT bug in the `RewriteCorrelatedScalarSubquery` rule.
After this change, the rule tests the expression at the root of the correlated subquery to determine whether the expression returns `NULL` on empty input. If the expression does not return `NULL`, the rule generates additional logic in the `Project` operator above the rewritten subquery. This additional logic intercepts `NULL` values coming from the outer join and replaces them with the value that the subquery's expression would return on empty input.
This PR takes over https://github.com/apache/spark/pull/13155. It only fixes an issue with `Literal` construction and style issues. All credits should go frreiss.
# How was this patch tested?
Added regression tests to cover all branches of the updated rule (see changes to `SubquerySuite`).
Ran all existing automated regression tests after merging with latest trunk.
Author: frreiss <frreiss@us.ibm.com>
Author: Herman van Hovell <hvanhovell@databricks.com>
Closes#13629 from hvanhovell/SPARK-15370-cleanup.
## What changes were proposed in this pull request?
Queries with embedded existential sub-query predicates throws exception when building the physical plan.
Example failing query:
```SQL
scala> Seq((1, 1), (2, 2)).toDF("c1", "c2").createOrReplaceTempView("t1")
scala> Seq((1, 1), (2, 2)).toDF("c1", "c2").createOrReplaceTempView("t2")
scala> sql("select c1 from t1 where (case when c2 in (select c2 from t2) then 2 else 3 end) IN (select c2 from t1)").show()
Binding attribute, tree: c2#239
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding attribute, tree: c2#239
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:50)
at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:88)
...
at org.apache.spark.sql.catalyst.expressions.BindReferences$.bindReference(BoundAttribute.scala:87)
at org.apache.spark.sql.execution.joins.HashJoin$$anonfun$4.apply(HashJoin.scala:66)
at org.apache.spark.sql.execution.joins.HashJoin$$anonfun$4.apply(HashJoin.scala:66)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:285)
at org.apache.spark.sql.execution.joins.HashJoin$class.org$apache$spark$sql$execution$joins$HashJoin$$x$8(HashJoin.scala:66)
at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.org$apache$spark$sql$execution$joins$HashJoin$$x$8$lzycompute(BroadcastHashJoinExec.scala:38)
at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.org$apache$spark$sql$execution$joins$HashJoin$$x$8(BroadcastHashJoinExec.scala:38)
at org.apache.spark.sql.execution.joins.HashJoin$class.buildKeys(HashJoin.scala:63)
at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.buildKeys$lzycompute(BroadcastHashJoinExec.scala:38)
at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.buildKeys(BroadcastHashJoinExec.scala:38)
at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.requiredChildDistribution(BroadcastHashJoinExec.scala:52)
```
**Problem description:**
When the left hand side expression of an existential sub-query predicate contains another embedded sub-query predicate, the RewritePredicateSubquery optimizer rule does not resolve the embedded sub-query expressions into existential joins.For example, the above query has the following optimized plan, which fails during physical plan build.
```SQL
== Optimized Logical Plan ==
Project [_1#224 AS c1#227]
+- Join LeftSemi, (CASE WHEN predicate-subquery#255 [(_2#225 = c2#239)] THEN 2 ELSE 3 END = c2#228#262)
: +- SubqueryAlias predicate-subquery#255 [(_2#225 = c2#239)]
: +- LocalRelation [c2#239]
:- LocalRelation [_1#224, _2#225]
+- LocalRelation [c2#228#262]
== Physical Plan ==
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding attribute, tree: c2#239
```
**Solution:**
In RewritePredicateSubquery, before rewriting the outermost predicate sub-query, resolve any embedded existential sub-queries. The Optimized plan for the above query after the changes looks like below.
```SQL
== Optimized Logical Plan ==
Project [_1#224 AS c1#227]
+- Join LeftSemi, (CASE WHEN exists#285 THEN 2 ELSE 3 END = c2#228#284)
:- Join ExistenceJoin(exists#285), (_2#225 = c2#239)
: :- LocalRelation [_1#224, _2#225]
: +- LocalRelation [c2#239]
+- LocalRelation [c2#228#284]
== Physical Plan ==
*Project [_1#224 AS c1#227]
+- *BroadcastHashJoin [CASE WHEN exists#285 THEN 2 ELSE 3 END], [c2#228#284], LeftSemi, BuildRight
:- *BroadcastHashJoin [_2#225], [c2#239], ExistenceJoin(exists#285), BuildRight
: :- LocalTableScan [_1#224, _2#225]
: +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
: +- LocalTableScan [c2#239]
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
+- LocalTableScan [c2#228#284]
+- LocalTableScan [c222#36], [[111],[222]]
```
## How was this patch tested?
Added new test cases in SubquerySuite.scala
Author: Ioana Delaney <ioanamdelaney@gmail.com>
Closes#13570 from ioana-delaney/fixEmbedSubPredV1.
## What changes were proposed in this pull request?
This pull request fixes the COUNT bug in the `RewriteCorrelatedScalarSubquery` rule.
After this change, the rule tests the expression at the root of the correlated subquery to determine whether the expression returns NULL on empty input. If the expression does not return NULL, the rule generates additional logic in the Project operator above the rewritten subquery. This additional logic intercepts NULL values coming from the outer join and replaces them with the value that the subquery's expression would return on empty input.
## How was this patch tested?
Added regression tests to cover all branches of the updated rule (see changes to `SubquerySuite.scala`).
Ran all existing automated regression tests after merging with latest trunk.
Author: frreiss <frreiss@us.ibm.com>
Closes#13155 from frreiss/master.
## What changes were proposed in this pull request?
Adds codahale metrics for the codegen source text size and how long it takes to compile. The size is particularly interesting, since the JVM does have hard limits on how large methods can get.
To simplify, I added the metrics under a statically-initialized source that is always registered with SparkEnv.
## How was this patch tested?
Unit tests
Author: Eric Liang <ekl@databricks.com>
Closes#13586 from ericl/spark-15860.
## What changes were proposed in this pull request?
This adds support for radix sort of nullable long fields. When a sort field is null and radix sort is enabled, we keep nulls in a separate region of the sort buffer so that radix sort does not need to deal with them. This also has performance benefits when sorting smaller integer types, since the current representation of nulls in two's complement (Long.MIN_VALUE) otherwise forces a full-width radix sort.
This strategy for nulls does mean the sort is no longer stable. cc davies
## How was this patch tested?
Existing randomized sort tests for correctness. I also tested some TPCDS queries and there does not seem to be any significant regression for non-null sorts.
Some test queries (best of 5 runs each).
Before change:
scala> val start = System.nanoTime; spark.range(5000000).selectExpr("if(id > 5, cast(hash(id) as long), NULL) as h").coalesce(1).orderBy("h").collect(); (System.nanoTime - start) / 1e6
start: Long = 3190437233227987
res3: Double = 4716.471091
After change:
scala> val start = System.nanoTime; spark.range(5000000).selectExpr("if(id > 5, cast(hash(id) as long), NULL) as h").coalesce(1).orderBy("h").collect(); (System.nanoTime - start) / 1e6
start: Long = 3190367870952791
res4: Double = 2981.143045
Author: Eric Liang <ekl@databricks.com>
Closes#13161 from ericl/sc-2998.
## What changes were proposed in this pull request?
Spark currently incorrectly continues to use cached data even if the underlying data is overwritten.
Current behavior:
```scala
val dir = "/tmp/test"
sqlContext.range(1000).write.mode("overwrite").parquet(dir)
val df = sqlContext.read.parquet(dir).cache()
df.count() // outputs 1000
sqlContext.range(10).write.mode("overwrite").parquet(dir)
sqlContext.read.parquet(dir).count() // outputs 1000 <---- We are still using the cached dataset
```
This patch fixes this bug by adding support for `REFRESH path` that invalidates and refreshes all the cached data (and the associated metadata) for any dataframe that contains the given data source path.
Expected behavior:
```scala
val dir = "/tmp/test"
sqlContext.range(1000).write.mode("overwrite").parquet(dir)
val df = sqlContext.read.parquet(dir).cache()
df.count() // outputs 1000
sqlContext.range(10).write.mode("overwrite").parquet(dir)
spark.catalog.refreshResource(dir)
sqlContext.read.parquet(dir).count() // outputs 10 <---- We are not using the cached dataset
```
## How was this patch tested?
Unit tests for overwrites and appends in `ParquetQuerySuite` and `CachedTableSuite`.
Author: Sameer Agarwal <sameer@databricks.com>
Closes#13566 from sameeragarwal/refresh-path-2.
## What changes were proposed in this pull request?
The base class `SpecificParquetRecordReaderBase` used for vectorized parquet reader will try to get pushed-down filters from the given configuration. This pushed-down filters are used for RowGroups-level filtering. However, we don't set up the filters to push down into the configuration. In other words, the filters are not actually pushed down to do RowGroups-level filtering. This patch is to fix this and tries to set up the filters for pushing down to configuration for the reader.
## How was this patch tested?
Existing tests should be passed.
Author: Liang-Chi Hsieh <simonh@tw.ibm.com>
Closes#13371 from viirya/vectorized-reader-push-down-filter.
## What changes were proposed in this pull request?
As discussed in https://github.com/apache/spark/pull/12836
we need to override stringArgs method in MapPartitionsInR in order to avoid too large strings generated by "stringArgs" method based on the input arguments.
In this case exclude some of the input arguments: serialized R objects.
## How was this patch tested?
Existing test cases
Author: Narine Kokhlikyan <narine.kokhlikyan@gmail.com>
Closes#13610 from NarineK/dapply_MapPartitionsInR_stringArgs.
## What changes were proposed in this pull request?
Serializer instantiation will consider existing SparkConf
## How was this patch tested?
manual test with `ImmutableList` (Guava) and `kryo-serializers`'s `Immutable*Serializer` implementations.
Added Test Suite.
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)
Author: Sela <ansela@paypal.com>
Closes#13424 from amitsela/SPARK-15489.
## What changes were proposed in this pull request?
Code generated `SortMergeJoin` failed with wrong results when using structs as keys. This could (eventually) be traced back to the use of a wrong row reference when comparing structs.
## How was this patch tested?
TBD
Author: Herman van Hovell <hvanhovell@databricks.com>
Closes#13589 from hvanhovell/SPARK-15822.
## What changes were proposed in this pull request?
In scala, immutable.List.length is an expensive operation so we should
avoid using Seq.length == 0 or Seq.lenth > 0, and use Seq.isEmpty and Seq.nonEmpty instead.
## How was this patch tested?
existing tests
Author: wangyang <wangyang@haizhi.com>
Closes#13601 from yangw1234/isEmpty.
## What changes were proposed in this pull request?
Replace all occurrences of `None: Option[X]` with `Option.empty[X]`
## How was this patch tested?
Exisiting Tests
Author: Sandeep Singh <sandeep@techaddict.me>
Closes#13591 from techaddict/minor-7.
## What changes were proposed in this pull request?
This PR moves `QueryPlanner.planLater()` method into `GenericStrategy` for extra strategies to be able to use `planLater` in its strategy.
## How was this patch tested?
Existing tests.
Author: Takuya UESHIN <ueshin@happy-camper.st>
Closes#13147 from ueshin/issues/SPARK-6320.
## What changes were proposed in this pull request?
This patch moves some codes in `DataFrameWriter.insertInto` that belongs to `Analyzer`.
## How was this patch tested?
Existing tests.
Author: Liang-Chi Hsieh <simonh@tw.ibm.com>
Closes#13496 from viirya/move-analyzer-stuff.
## What changes were proposed in this pull request?
When the output mode is complete, then the output of a streaming aggregation essentially will contain the complete aggregates every time. So this is not different from a batch dataset within an incremental execution. Other non-streaming operations should be supported on this dataset. In this PR, I am just adding support for sorting, as it is a common useful functionality. Support for other operations will come later.
## How was this patch tested?
Additional unit tests.
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes#13549 from tdas/SPARK-15812.
## What changes were proposed in this pull request?
With very wide tables, e.g. thousands of fields, the plan output is unreadable and often causes OOMs due to inefficient string processing. This truncates all struct and operator field lists to a user configurable threshold to limit performance impact.
It would also be nice to optimize string generation to avoid these sort of O(n^2) slowdowns entirely (i.e. use StringBuilder everywhere including expressions), but this is probably too large of a change for 2.0 at this point, and truncation has other benefits for usability.
## How was this patch tested?
Added a microbenchmark that covers this case particularly well. I also ran the microbenchmark while varying the truncation threshold.
```
numFields = 5
wide shallowly nested struct field r/w: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
2000 wide x 50 rows (write in-mem) 2336 / 2558 0.0 23364.4 0.1X
numFields = 25
wide shallowly nested struct field r/w: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
2000 wide x 50 rows (write in-mem) 4237 / 4465 0.0 42367.9 0.1X
numFields = 100
wide shallowly nested struct field r/w: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
2000 wide x 50 rows (write in-mem) 10458 / 11223 0.0 104582.0 0.0X
numFields = Infinity
wide shallowly nested struct field r/w: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
[info] java.lang.OutOfMemoryError: Java heap space
```
Author: Eric Liang <ekl@databricks.com>
Author: Eric Liang <ekhliang@gmail.com>
Closes#13537 from ericl/truncated-string.
## What changes were proposed in this pull request?
The current implementations of `UnixTime` and `FromUnixTime` do not cache their parser/formatter as much as they could. This PR resolved this issue.
This PR is a take over from https://github.com/apache/spark/pull/13522 and further optimizes the re-use of the parser/formatter. It also fixes the improves handling (catching the actual exception instead of `Throwable`). All credits for this work should go to rajeshbalamohan.
This PR closes https://github.com/apache/spark/pull/13522
## How was this patch tested?
Current tests.
Author: Herman van Hovell <hvanhovell@databricks.com>
Author: Rajesh Balamohan <rbalamohan@apache.org>
Closes#13581 from hvanhovell/SPARK-14321.
## What changes were proposed in this pull request?
The help function 'toStructType' in the AttributeSeq class doesn't include the metadata when it builds the StructField, so it causes this reported problem https://issues.apache.org/jira/browse/SPARK-15804?jql=project%20%3D%20SPARK when spark writes the the dataframe with the metadata to the parquet datasource.
The code path is when spark writes the dataframe to the parquet datasource through the InsertIntoHadoopFsRelationCommand, spark will build the WriteRelation container, and it will call the help function 'toStructType' to create StructType which contains StructField, it should include the metadata there, otherwise, we will lost the user provide metadata.
## How was this patch tested?
added test case in ParquetQuerySuite.scala
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)
Author: Kevin Yu <qyu@us.ibm.com>
Closes#13555 from kevinyu98/spark-15804.
## What changes were proposed in this pull request?
The parser currently does not allow the use of some SQL keywords as table or field names. This PR adds supports for all keywords as identifier. The exception to this are table aliases, in this case most keywords are allowed except for join keywords (```anti, full, inner, left, semi, right, natural, on, join, cross```) and set-operator keywords (```union, intersect, except```).
## How was this patch tested?
I have added/move/renamed test in the catalyst `*ParserSuite`s.
Author: Herman van Hovell <hvanhovell@databricks.com>
Closes#13534 from hvanhovell/SPARK-15789.
## What changes were proposed in this pull request?
The current implementation of "CREATE TEMPORARY TABLE USING datasource..." is NOT creating any intermediate temporary data directory like temporary HDFS folder, instead, it only stores a SQL string in memory. Probably we should use "TEMPORARY VIEW" instead.
This PR assumes a temporary table has to link with some temporary intermediate data. It follows the definition of temporary table like this (from [hortonworks doc](https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.3.0/bk_dataintegration/content/temp-tables.html)):
> A temporary table is a convenient way for an application to automatically manage intermediate data generated during a complex query
**Example**:
```
scala> spark.sql("CREATE temporary view my_tab7 (c1: String, c2: String) USING org.apache.spark.sql.execution.datasources.csv.CSVFileFormat OPTIONS (PATH '/Users/seanzhong/csv/cars.csv')")
scala> spark.sql("select c1, c2 from my_tab7").show()
+----+-----+
| c1| c2|
+----+-----+
|year| make|
|2012|Tesla|
...
```
It NOW prints a **deprecation warning** if "CREATE TEMPORARY TABLE USING..." is used.
```
scala> spark.sql("CREATE temporary table my_tab7 (c1: String, c2: String) USING org.apache.spark.sql.execution.datasources.csv.CSVFileFormat OPTIONS (PATH '/Users/seanzhong/csv/cars.csv')")
16/05/31 10:39:27 WARN SparkStrategies$DDLStrategy: CREATE TEMPORARY TABLE tableName USING... is deprecated, please use CREATE TEMPORARY VIEW viewName USING... instead
```
## How was this patch tested?
Unit test.
Author: Sean Zhong <seanzhong@databricks.com>
Closes#13414 from clockfly/create_temp_view_using.
## What changes were proposed in this pull request?
This PR allows customization of verbosity in explain output. After change, `dataframe.explain()` and `dataframe.explain(true)` has different verbosity output for physical plan.
Currently, this PR only enables verbosity string for operator `HashAggregateExec` and `SortAggregateExec`. We will gradually enable verbosity string for more operators in future.
**Less verbose mode:** dataframe.explain(extended = false)
`output=[count(a)#85L]` is **NOT** displayed for HashAggregate.
```
scala> Seq((1,2,3)).toDF("a", "b", "c").createTempView("df2")
scala> spark.sql("select count(a) from df2").explain()
== Physical Plan ==
*HashAggregate(key=[], functions=[count(1)])
+- Exchange SinglePartition
+- *HashAggregate(key=[], functions=[partial_count(1)])
+- LocalTableScan
```
**Verbose mode:** dataframe.explain(extended = true)
`output=[count(a)#85L]` is displayed for HashAggregate.
```
scala> spark.sql("select count(a) from df2").explain(true) // "output=[count(a)#85L]" is added
...
== Physical Plan ==
*HashAggregate(key=[], functions=[count(1)], output=[count(a)#85L])
+- Exchange SinglePartition
+- *HashAggregate(key=[], functions=[partial_count(1)], output=[count#87L])
+- LocalTableScan
```
## How was this patch tested?
Manual test.
Author: Sean Zhong <seanzhong@databricks.com>
Closes#13535 from clockfly/verbose_breakdown_2.
BindReferences contains a n^2 loop which causes performance issues when operating over large schemas: to determine the ordinal of an attribute reference, we perform a linear scan over the `input` array. Because input can sometimes be a `List`, the call to `input(ordinal).nullable` can also be O(n).
Instead of performing a linear scan, we can convert the input into an array and build a hash map to map from expression ids to ordinals. The greater up-front cost of the map construction is offset by the fact that an expression can contain multiple attribute references, so the cost of the map construction is amortized across a number of lookups.
Perf. benchmarks to follow. /cc ericl
Author: Josh Rosen <joshrosen@databricks.com>
Closes#13505 from JoshRosen/bind-references-improvement.
## What changes were proposed in this pull request?
`an -> a`
Use cmds like `find . -name '*.R' | xargs -i sh -c "grep -in ' an [^aeiou]' {} && echo {}"` to generate candidates, and review them one by one.
## How was this patch tested?
manual tests
Author: Zheng RuiFeng <ruifengz@foxmail.com>
Closes#13515 from zhengruifeng/an_a.
## What changes were proposed in this pull request?
This PR improves the error handling of `RowEncoder`. When we create a `RowEncoder` with a given schema, we should validate the data type of input object. e.g. we should throw an exception when a field is boolean but is declared as a string column.
This PR also removes the support to use `Product` as a valid external type of struct type. This support is added at https://github.com/apache/spark/pull/9712, but is incomplete, e.g. nested product, product in array are both not working. However, we never officially support this feature and I think it's ok to ban it.
## How was this patch tested?
new tests in `RowEncoderSuite`.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#13401 from cloud-fan/bug.
We should cache `Metadata.hashCode` and use a singleton for `Metadata.empty` because calculating metadata hashCodes appears to be a bottleneck for certain workloads.
We should also cache `StructType.hashCode`.
In an optimizer stress-test benchmark run by ericl, these `hashCode` calls accounted for roughly 40% of the total CPU time and this bottleneck was completely eliminated by the caching added by this patch.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#13504 from JoshRosen/metadata-fix.
## What changes were proposed in this pull request?
For input object of non-flat type, we can't encode it to row if it's null, as Spark SQL doesn't allow row to be null, only its columns can be null.
This PR explicitly add this constraint and throw exception if users break it.
## How was this patch tested?
several new tests
Author: Wenchen Fan <wenchen@databricks.com>
Closes#13469 from cloud-fan/null-object.
## What changes were proposed in this pull request?
There are 2 kinds of `GetStructField`:
1. resolved from `UnresolvedExtractValue`, and it will have a `name` property.
2. created when we build deserializer expression for nested tuple, no `name` property.
When we want to validate the ordinals of nested tuple, we should only catch `GetStructField` without the name property.
## How was this patch tested?
new test in `EncoderResolutionSuite`
Author: Wenchen Fan <wenchen@databricks.com>
Closes#13474 from cloud-fan/ordinal-check.
#### What changes were proposed in this pull request?
Before this PR, the output of EXPLAIN of following SQL is like
```SQL
CREATE EXTERNAL TABLE extTable_with_partitions (key INT, value STRING)
PARTITIONED BY (ds STRING, hr STRING)
LOCATION '/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-b39a6185-8981-403b-a4aa-36fb2f4ca8a9'
```
``ExecutedCommand CreateTableCommand CatalogTable(`extTable_with_partitions`,CatalogTableType(EXTERNAL),CatalogStorageFormat(Some(/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-dd234718-e85d-4c5a-8353-8f1834ac0323),Some(org.apache.hadoop.mapred.TextInputFormat),Some(org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat),None,false,Map()),List(CatalogColumn(key,int,true,None), CatalogColumn(value,string,true,None), CatalogColumn(ds,string,true,None), CatalogColumn(hr,string,true,None)),List(ds, hr),List(),List(),-1,,1463026413544,-1,Map(),None,None,None), false``
After this PR, the output is like
```
ExecutedCommand
: +- CreateTableCommand CatalogTable(
Table:`extTable_with_partitions`
Created:Thu Jun 02 21:30:54 PDT 2016
Last Access:Wed Dec 31 15:59:59 PST 1969
Type:EXTERNAL
Schema:[`key` int, `value` string, `ds` string, `hr` string]
Partition Columns:[`ds`, `hr`]
Storage(Location:/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-a06083b8-8e88-4d07-9ff0-d6bd8d943ad3, InputFormat:org.apache.hadoop.mapred.TextInputFormat, OutputFormat:org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat)), false
```
This is also applicable to `DESC EXTENDED`. However, this does not have special handling for Data Source Tables. If needed, we need to move the logics of `DDLUtil`. Let me know if we should do it in this PR. Thanks! rxin liancheng
#### How was this patch tested?
Manual testing
Author: gatorsmile <gatorsmile@gmail.com>
Closes#13070 from gatorsmile/betterExplainCatalogTable.
In Catalyst's TreeNode transform methods we end up calling `productIterator.map(...).toArray` in a number of places, which is slightly inefficient because it needs to allocate an `ArrayBuilder` and grow a temporary array. Since we already know the size of the final output (`productArity`), we can simply allocate an array up-front and use a while loop to consume the iterator and populate the array.
For most workloads, this performance difference is negligible but it does make a measurable difference in optimizer performance for queries that operate over very wide schemas (such as the benchmark queries in #13456).
### Perf results (from #13456 benchmarks)
**Before**
```
Java HotSpot(TM) 64-Bit Server VM 1.8.0_66-b17 on Mac OS X 10.10.5
Intel(R) Core(TM) i7-4960HQ CPU 2.60GHz
parsing large select: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
1 select expressions 19 / 22 0.0 19119858.0 1.0X
10 select expressions 23 / 25 0.0 23208774.0 0.8X
100 select expressions 55 / 73 0.0 54768402.0 0.3X
1000 select expressions 229 / 259 0.0 228606373.0 0.1X
2500 select expressions 530 / 554 0.0 529938178.0 0.0X
```
**After**
```
parsing large select: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
1 select expressions 15 / 21 0.0 14978203.0 1.0X
10 select expressions 22 / 27 0.0 22492262.0 0.7X
100 select expressions 48 / 64 0.0 48449834.0 0.3X
1000 select expressions 189 / 208 0.0 189346428.0 0.1X
2500 select expressions 429 / 449 0.0 428943897.0 0.0X
```
###
Author: Josh Rosen <joshrosen@databricks.com>
Closes#13484 from JoshRosen/treenode-productiterator-map.
## What changes were proposed in this pull request?
Queries with scalar sub-query in the SELECT list run against a local, in-memory relation throw
UnsupportedOperationException exception.
Problem repro:
```SQL
scala> Seq((1, 1), (2, 2)).toDF("c1", "c2").createOrReplaceTempView("t1")
scala> Seq((1, 1), (2, 2)).toDF("c1", "c2").createOrReplaceTempView("t2")
scala> sql("select (select min(c1) from t2) from t1").show()
java.lang.UnsupportedOperationException: Cannot evaluate expression: scalar-subquery#62 []
at org.apache.spark.sql.catalyst.expressions.Unevaluable$class.eval(Expression.scala:215)
at org.apache.spark.sql.catalyst.expressions.ScalarSubquery.eval(subquery.scala:62)
at org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:142)
at org.apache.spark.sql.catalyst.expressions.InterpretedProjection.apply(Projection.scala:45)
at org.apache.spark.sql.catalyst.expressions.InterpretedProjection.apply(Projection.scala:29)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:285)
at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$$anonfun$apply$37.applyOrElse(Optimizer.scala:1473)
```
The problem is specific to local, in memory relations. It is caused by rule ConvertToLocalRelation, which attempts to push down
a scalar-subquery expression to the local tables.
The solution prevents the rule to apply if Project references scalar subqueries.
## How was this patch tested?
Added regression tests to SubquerySuite.scala
Author: Ioana Delaney <ioanamdelaney@gmail.com>
Closes#13418 from ioana-delaney/scalarSubV2.
## What changes were proposed in this pull request?
Our encoder framework has been evolved a lot, this PR tries to clean up the code to make it more readable and emphasise the concept that encoder should be used as a container of serde expressions.
1. move validation logic to analyzer instead of encoder
2. only have a `resolveAndBind` method in encoder instead of `resolve` and `bind`, as we don't have the encoder life cycle concept anymore.
3. `Dataset` don't need to keep a resolved encoder, as there is no such concept anymore. bound encoder is still needed to do serialization outside of query framework.
4. Using `BoundReference` to represent an unresolved field in deserializer expression is kind of weird, this PR adds a `GetColumnByOrdinal` for this purpose. (serializer expression still use `BoundReference`, we can replace it with `GetColumnByOrdinal` in follow-ups)
## How was this patch tested?
existing test
Author: Wenchen Fan <wenchen@databricks.com>
Author: Cheng Lian <lian@databricks.com>
Closes#13269 from cloud-fan/clean-encoder.
## What changes were proposed in this pull request?
This PR makes the explain output less verbose by hiding some verbose output like `None`, `null`, empty List `[]`, empty set `{}`, and etc.
**Before change**:
```
== Physical Plan ==
ExecutedCommand
: +- ShowTablesCommand None, None
```
**After change**:
```
== Physical Plan ==
ExecutedCommand
: +- ShowTablesCommand
```
## How was this patch tested?
Manual test.
Author: Sean Zhong <seanzhong@databricks.com>
Closes#13470 from clockfly/verbose_breakdown_4.
## What changes were proposed in this pull request?
When users create a case class and use java reserved keyword as field name, spark sql will generate illegal java code and throw exception at runtime.
This PR checks the field names when building the encoder, and if illegal field names are used, throw exception immediately with a good error message.
## How was this patch tested?
new test in DatasetSuite
Author: Wenchen Fan <wenchen@databricks.com>
Closes#13485 from cloud-fan/java.