## What changes were proposed in this pull request?
Two issues I've found for "show databases" command:
1. The returned database name list was not sorted, it only works when "like" was used together; (HIVE will always return a sorted list)
2. When it is used as sql("show databases").show, it will output a table with column named as "result", but for sql("show tables").show, it will output the column name as "tableName", so I think we should be consistent and use "databaseName" at least.
## How was this patch tested?
Updated existing test case to test its ordering as well.
Author: bomeng <bmeng@us.ibm.com>
Closes#13671 from bomeng/SPARK-15952.
## What changes were proposed in this pull request?
`DataFrame` with plan overriding `sameResult` but not using canonicalized plan to compare can't cacheTable.
The example is like:
```
val localRelation = Seq(1, 2, 3).toDF()
localRelation.createOrReplaceTempView("localRelation")
spark.catalog.cacheTable("localRelation")
assert(
localRelation.queryExecution.withCachedData.collect {
case i: InMemoryRelation => i
}.size == 1)
```
and this will fail as:
```
ArrayBuffer() had size 0 instead of expected size 1
```
The reason is that when do `spark.catalog.cacheTable("localRelation")`, `CacheManager` tries to cache for the plan wrapped by `SubqueryAlias` but when planning for the DataFrame `localRelation`, `CacheManager` tries to find cached table for the not-wrapped plan because the plan for DataFrame `localRelation` is not wrapped.
Some plans like `LocalRelation`, `LogicalRDD`, etc. override `sameResult` method, but not use canonicalized plan to compare so the `CacheManager` can't detect the plans are the same.
This pr modifies them to use canonicalized plan when override `sameResult` method.
## How was this patch tested?
Added a test to check if DataFrame with plan overriding sameResult but not using canonicalized plan to compare can cacheTable.
Author: Takuya UESHIN <ueshin@happy-camper.st>
Closes#13638 from ueshin/issues/SPARK-15915.
## What changes were proposed in this pull request?
In our encoder framework, we imply that serializer expressions should use `BoundReference` to refer to the input object, and a lot of codes depend on this contract(e.g. ExpressionEncoder.tuple). This PR adds some document and assert in `ExpressionEncoder` to make it clearer.
## How was this patch tested?
existing tests
Author: Wenchen Fan <wenchen@databricks.com>
Closes#13648 from cloud-fan/comment.
## What changes were proposed in this pull request?
SparkSession.catalog.listFunctions currently returns all functions, including the list of built-in functions. This makes the method not as useful because anytime it is run the result set contains over 100 built-in functions.
## How was this patch tested?
CatalogSuite
Author: Sandeep Singh <sandeep@techaddict.me>
Closes#13413 from techaddict/SPARK-15663.
## What changes were proposed in this pull request?
This PR enforces schema check when converting DataFrame to Dataset using Kryo encoder. For example.
**Before the change:**
Schema is NOT checked when converting DataFrame to Dataset using kryo encoder.
```
scala> case class B(b: Int)
scala> implicit val encoder = Encoders.kryo[B]
scala> val df = Seq((1)).toDF("b")
scala> val ds = df.as[B] // Schema compatibility is NOT checked
```
**After the change:**
Report AnalysisException since the schema is NOT compatible.
```
scala> val ds = Seq((1)).toDF("b").as[B]
org.apache.spark.sql.AnalysisException: cannot resolve 'CAST(`b` AS BINARY)' due to data type mismatch: cannot cast IntegerType to BinaryType;
...
```
## How was this patch tested?
Unit test.
Author: Sean Zhong <seanzhong@databricks.com>
Closes#13632 from clockfly/spark-15910.
# What changes were proposed in this pull request?
This pull request fixes the COUNT bug in the `RewriteCorrelatedScalarSubquery` rule.
After this change, the rule tests the expression at the root of the correlated subquery to determine whether the expression returns `NULL` on empty input. If the expression does not return `NULL`, the rule generates additional logic in the `Project` operator above the rewritten subquery. This additional logic intercepts `NULL` values coming from the outer join and replaces them with the value that the subquery's expression would return on empty input.
This PR takes over https://github.com/apache/spark/pull/13155. It only fixes an issue with `Literal` construction and style issues. All credits should go frreiss.
# How was this patch tested?
Added regression tests to cover all branches of the updated rule (see changes to `SubquerySuite`).
Ran all existing automated regression tests after merging with latest trunk.
Author: frreiss <frreiss@us.ibm.com>
Author: Herman van Hovell <hvanhovell@databricks.com>
Closes#13629 from hvanhovell/SPARK-15370-cleanup.
## What changes were proposed in this pull request?
Queries with embedded existential sub-query predicates throws exception when building the physical plan.
Example failing query:
```SQL
scala> Seq((1, 1), (2, 2)).toDF("c1", "c2").createOrReplaceTempView("t1")
scala> Seq((1, 1), (2, 2)).toDF("c1", "c2").createOrReplaceTempView("t2")
scala> sql("select c1 from t1 where (case when c2 in (select c2 from t2) then 2 else 3 end) IN (select c2 from t1)").show()
Binding attribute, tree: c2#239
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding attribute, tree: c2#239
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:50)
at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:88)
...
at org.apache.spark.sql.catalyst.expressions.BindReferences$.bindReference(BoundAttribute.scala:87)
at org.apache.spark.sql.execution.joins.HashJoin$$anonfun$4.apply(HashJoin.scala:66)
at org.apache.spark.sql.execution.joins.HashJoin$$anonfun$4.apply(HashJoin.scala:66)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:285)
at org.apache.spark.sql.execution.joins.HashJoin$class.org$apache$spark$sql$execution$joins$HashJoin$$x$8(HashJoin.scala:66)
at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.org$apache$spark$sql$execution$joins$HashJoin$$x$8$lzycompute(BroadcastHashJoinExec.scala:38)
at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.org$apache$spark$sql$execution$joins$HashJoin$$x$8(BroadcastHashJoinExec.scala:38)
at org.apache.spark.sql.execution.joins.HashJoin$class.buildKeys(HashJoin.scala:63)
at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.buildKeys$lzycompute(BroadcastHashJoinExec.scala:38)
at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.buildKeys(BroadcastHashJoinExec.scala:38)
at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.requiredChildDistribution(BroadcastHashJoinExec.scala:52)
```
**Problem description:**
When the left hand side expression of an existential sub-query predicate contains another embedded sub-query predicate, the RewritePredicateSubquery optimizer rule does not resolve the embedded sub-query expressions into existential joins.For example, the above query has the following optimized plan, which fails during physical plan build.
```SQL
== Optimized Logical Plan ==
Project [_1#224 AS c1#227]
+- Join LeftSemi, (CASE WHEN predicate-subquery#255 [(_2#225 = c2#239)] THEN 2 ELSE 3 END = c2#228#262)
: +- SubqueryAlias predicate-subquery#255 [(_2#225 = c2#239)]
: +- LocalRelation [c2#239]
:- LocalRelation [_1#224, _2#225]
+- LocalRelation [c2#228#262]
== Physical Plan ==
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding attribute, tree: c2#239
```
**Solution:**
In RewritePredicateSubquery, before rewriting the outermost predicate sub-query, resolve any embedded existential sub-queries. The Optimized plan for the above query after the changes looks like below.
```SQL
== Optimized Logical Plan ==
Project [_1#224 AS c1#227]
+- Join LeftSemi, (CASE WHEN exists#285 THEN 2 ELSE 3 END = c2#228#284)
:- Join ExistenceJoin(exists#285), (_2#225 = c2#239)
: :- LocalRelation [_1#224, _2#225]
: +- LocalRelation [c2#239]
+- LocalRelation [c2#228#284]
== Physical Plan ==
*Project [_1#224 AS c1#227]
+- *BroadcastHashJoin [CASE WHEN exists#285 THEN 2 ELSE 3 END], [c2#228#284], LeftSemi, BuildRight
:- *BroadcastHashJoin [_2#225], [c2#239], ExistenceJoin(exists#285), BuildRight
: :- LocalTableScan [_1#224, _2#225]
: +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
: +- LocalTableScan [c2#239]
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
+- LocalTableScan [c2#228#284]
+- LocalTableScan [c222#36], [[111],[222]]
```
## How was this patch tested?
Added new test cases in SubquerySuite.scala
Author: Ioana Delaney <ioanamdelaney@gmail.com>
Closes#13570 from ioana-delaney/fixEmbedSubPredV1.
## What changes were proposed in this pull request?
This pull request fixes the COUNT bug in the `RewriteCorrelatedScalarSubquery` rule.
After this change, the rule tests the expression at the root of the correlated subquery to determine whether the expression returns NULL on empty input. If the expression does not return NULL, the rule generates additional logic in the Project operator above the rewritten subquery. This additional logic intercepts NULL values coming from the outer join and replaces them with the value that the subquery's expression would return on empty input.
## How was this patch tested?
Added regression tests to cover all branches of the updated rule (see changes to `SubquerySuite.scala`).
Ran all existing automated regression tests after merging with latest trunk.
Author: frreiss <frreiss@us.ibm.com>
Closes#13155 from frreiss/master.
## What changes were proposed in this pull request?
Adds codahale metrics for the codegen source text size and how long it takes to compile. The size is particularly interesting, since the JVM does have hard limits on how large methods can get.
To simplify, I added the metrics under a statically-initialized source that is always registered with SparkEnv.
## How was this patch tested?
Unit tests
Author: Eric Liang <ekl@databricks.com>
Closes#13586 from ericl/spark-15860.
## What changes were proposed in this pull request?
This adds support for radix sort of nullable long fields. When a sort field is null and radix sort is enabled, we keep nulls in a separate region of the sort buffer so that radix sort does not need to deal with them. This also has performance benefits when sorting smaller integer types, since the current representation of nulls in two's complement (Long.MIN_VALUE) otherwise forces a full-width radix sort.
This strategy for nulls does mean the sort is no longer stable. cc davies
## How was this patch tested?
Existing randomized sort tests for correctness. I also tested some TPCDS queries and there does not seem to be any significant regression for non-null sorts.
Some test queries (best of 5 runs each).
Before change:
scala> val start = System.nanoTime; spark.range(5000000).selectExpr("if(id > 5, cast(hash(id) as long), NULL) as h").coalesce(1).orderBy("h").collect(); (System.nanoTime - start) / 1e6
start: Long = 3190437233227987
res3: Double = 4716.471091
After change:
scala> val start = System.nanoTime; spark.range(5000000).selectExpr("if(id > 5, cast(hash(id) as long), NULL) as h").coalesce(1).orderBy("h").collect(); (System.nanoTime - start) / 1e6
start: Long = 3190367870952791
res4: Double = 2981.143045
Author: Eric Liang <ekl@databricks.com>
Closes#13161 from ericl/sc-2998.
## What changes were proposed in this pull request?
Spark currently incorrectly continues to use cached data even if the underlying data is overwritten.
Current behavior:
```scala
val dir = "/tmp/test"
sqlContext.range(1000).write.mode("overwrite").parquet(dir)
val df = sqlContext.read.parquet(dir).cache()
df.count() // outputs 1000
sqlContext.range(10).write.mode("overwrite").parquet(dir)
sqlContext.read.parquet(dir).count() // outputs 1000 <---- We are still using the cached dataset
```
This patch fixes this bug by adding support for `REFRESH path` that invalidates and refreshes all the cached data (and the associated metadata) for any dataframe that contains the given data source path.
Expected behavior:
```scala
val dir = "/tmp/test"
sqlContext.range(1000).write.mode("overwrite").parquet(dir)
val df = sqlContext.read.parquet(dir).cache()
df.count() // outputs 1000
sqlContext.range(10).write.mode("overwrite").parquet(dir)
spark.catalog.refreshResource(dir)
sqlContext.read.parquet(dir).count() // outputs 10 <---- We are not using the cached dataset
```
## How was this patch tested?
Unit tests for overwrites and appends in `ParquetQuerySuite` and `CachedTableSuite`.
Author: Sameer Agarwal <sameer@databricks.com>
Closes#13566 from sameeragarwal/refresh-path-2.
## What changes were proposed in this pull request?
The base class `SpecificParquetRecordReaderBase` used for vectorized parquet reader will try to get pushed-down filters from the given configuration. This pushed-down filters are used for RowGroups-level filtering. However, we don't set up the filters to push down into the configuration. In other words, the filters are not actually pushed down to do RowGroups-level filtering. This patch is to fix this and tries to set up the filters for pushing down to configuration for the reader.
## How was this patch tested?
Existing tests should be passed.
Author: Liang-Chi Hsieh <simonh@tw.ibm.com>
Closes#13371 from viirya/vectorized-reader-push-down-filter.
## What changes were proposed in this pull request?
As discussed in https://github.com/apache/spark/pull/12836
we need to override stringArgs method in MapPartitionsInR in order to avoid too large strings generated by "stringArgs" method based on the input arguments.
In this case exclude some of the input arguments: serialized R objects.
## How was this patch tested?
Existing test cases
Author: Narine Kokhlikyan <narine.kokhlikyan@gmail.com>
Closes#13610 from NarineK/dapply_MapPartitionsInR_stringArgs.
## What changes were proposed in this pull request?
Serializer instantiation will consider existing SparkConf
## How was this patch tested?
manual test with `ImmutableList` (Guava) and `kryo-serializers`'s `Immutable*Serializer` implementations.
Added Test Suite.
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)
Author: Sela <ansela@paypal.com>
Closes#13424 from amitsela/SPARK-15489.
## What changes were proposed in this pull request?
Code generated `SortMergeJoin` failed with wrong results when using structs as keys. This could (eventually) be traced back to the use of a wrong row reference when comparing structs.
## How was this patch tested?
TBD
Author: Herman van Hovell <hvanhovell@databricks.com>
Closes#13589 from hvanhovell/SPARK-15822.
## What changes were proposed in this pull request?
In scala, immutable.List.length is an expensive operation so we should
avoid using Seq.length == 0 or Seq.lenth > 0, and use Seq.isEmpty and Seq.nonEmpty instead.
## How was this patch tested?
existing tests
Author: wangyang <wangyang@haizhi.com>
Closes#13601 from yangw1234/isEmpty.
## What changes were proposed in this pull request?
Replace all occurrences of `None: Option[X]` with `Option.empty[X]`
## How was this patch tested?
Exisiting Tests
Author: Sandeep Singh <sandeep@techaddict.me>
Closes#13591 from techaddict/minor-7.
## What changes were proposed in this pull request?
This PR moves `QueryPlanner.planLater()` method into `GenericStrategy` for extra strategies to be able to use `planLater` in its strategy.
## How was this patch tested?
Existing tests.
Author: Takuya UESHIN <ueshin@happy-camper.st>
Closes#13147 from ueshin/issues/SPARK-6320.
## What changes were proposed in this pull request?
This patch moves some codes in `DataFrameWriter.insertInto` that belongs to `Analyzer`.
## How was this patch tested?
Existing tests.
Author: Liang-Chi Hsieh <simonh@tw.ibm.com>
Closes#13496 from viirya/move-analyzer-stuff.
## What changes were proposed in this pull request?
When the output mode is complete, then the output of a streaming aggregation essentially will contain the complete aggregates every time. So this is not different from a batch dataset within an incremental execution. Other non-streaming operations should be supported on this dataset. In this PR, I am just adding support for sorting, as it is a common useful functionality. Support for other operations will come later.
## How was this patch tested?
Additional unit tests.
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes#13549 from tdas/SPARK-15812.
## What changes were proposed in this pull request?
With very wide tables, e.g. thousands of fields, the plan output is unreadable and often causes OOMs due to inefficient string processing. This truncates all struct and operator field lists to a user configurable threshold to limit performance impact.
It would also be nice to optimize string generation to avoid these sort of O(n^2) slowdowns entirely (i.e. use StringBuilder everywhere including expressions), but this is probably too large of a change for 2.0 at this point, and truncation has other benefits for usability.
## How was this patch tested?
Added a microbenchmark that covers this case particularly well. I also ran the microbenchmark while varying the truncation threshold.
```
numFields = 5
wide shallowly nested struct field r/w: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
2000 wide x 50 rows (write in-mem) 2336 / 2558 0.0 23364.4 0.1X
numFields = 25
wide shallowly nested struct field r/w: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
2000 wide x 50 rows (write in-mem) 4237 / 4465 0.0 42367.9 0.1X
numFields = 100
wide shallowly nested struct field r/w: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
2000 wide x 50 rows (write in-mem) 10458 / 11223 0.0 104582.0 0.0X
numFields = Infinity
wide shallowly nested struct field r/w: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
[info] java.lang.OutOfMemoryError: Java heap space
```
Author: Eric Liang <ekl@databricks.com>
Author: Eric Liang <ekhliang@gmail.com>
Closes#13537 from ericl/truncated-string.
## What changes were proposed in this pull request?
The current implementations of `UnixTime` and `FromUnixTime` do not cache their parser/formatter as much as they could. This PR resolved this issue.
This PR is a take over from https://github.com/apache/spark/pull/13522 and further optimizes the re-use of the parser/formatter. It also fixes the improves handling (catching the actual exception instead of `Throwable`). All credits for this work should go to rajeshbalamohan.
This PR closes https://github.com/apache/spark/pull/13522
## How was this patch tested?
Current tests.
Author: Herman van Hovell <hvanhovell@databricks.com>
Author: Rajesh Balamohan <rbalamohan@apache.org>
Closes#13581 from hvanhovell/SPARK-14321.
## What changes were proposed in this pull request?
The help function 'toStructType' in the AttributeSeq class doesn't include the metadata when it builds the StructField, so it causes this reported problem https://issues.apache.org/jira/browse/SPARK-15804?jql=project%20%3D%20SPARK when spark writes the the dataframe with the metadata to the parquet datasource.
The code path is when spark writes the dataframe to the parquet datasource through the InsertIntoHadoopFsRelationCommand, spark will build the WriteRelation container, and it will call the help function 'toStructType' to create StructType which contains StructField, it should include the metadata there, otherwise, we will lost the user provide metadata.
## How was this patch tested?
added test case in ParquetQuerySuite.scala
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)
Author: Kevin Yu <qyu@us.ibm.com>
Closes#13555 from kevinyu98/spark-15804.
## What changes were proposed in this pull request?
The parser currently does not allow the use of some SQL keywords as table or field names. This PR adds supports for all keywords as identifier. The exception to this are table aliases, in this case most keywords are allowed except for join keywords (```anti, full, inner, left, semi, right, natural, on, join, cross```) and set-operator keywords (```union, intersect, except```).
## How was this patch tested?
I have added/move/renamed test in the catalyst `*ParserSuite`s.
Author: Herman van Hovell <hvanhovell@databricks.com>
Closes#13534 from hvanhovell/SPARK-15789.
## What changes were proposed in this pull request?
The current implementation of "CREATE TEMPORARY TABLE USING datasource..." is NOT creating any intermediate temporary data directory like temporary HDFS folder, instead, it only stores a SQL string in memory. Probably we should use "TEMPORARY VIEW" instead.
This PR assumes a temporary table has to link with some temporary intermediate data. It follows the definition of temporary table like this (from [hortonworks doc](https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.3.0/bk_dataintegration/content/temp-tables.html)):
> A temporary table is a convenient way for an application to automatically manage intermediate data generated during a complex query
**Example**:
```
scala> spark.sql("CREATE temporary view my_tab7 (c1: String, c2: String) USING org.apache.spark.sql.execution.datasources.csv.CSVFileFormat OPTIONS (PATH '/Users/seanzhong/csv/cars.csv')")
scala> spark.sql("select c1, c2 from my_tab7").show()
+----+-----+
| c1| c2|
+----+-----+
|year| make|
|2012|Tesla|
...
```
It NOW prints a **deprecation warning** if "CREATE TEMPORARY TABLE USING..." is used.
```
scala> spark.sql("CREATE temporary table my_tab7 (c1: String, c2: String) USING org.apache.spark.sql.execution.datasources.csv.CSVFileFormat OPTIONS (PATH '/Users/seanzhong/csv/cars.csv')")
16/05/31 10:39:27 WARN SparkStrategies$DDLStrategy: CREATE TEMPORARY TABLE tableName USING... is deprecated, please use CREATE TEMPORARY VIEW viewName USING... instead
```
## How was this patch tested?
Unit test.
Author: Sean Zhong <seanzhong@databricks.com>
Closes#13414 from clockfly/create_temp_view_using.
## What changes were proposed in this pull request?
This PR allows customization of verbosity in explain output. After change, `dataframe.explain()` and `dataframe.explain(true)` has different verbosity output for physical plan.
Currently, this PR only enables verbosity string for operator `HashAggregateExec` and `SortAggregateExec`. We will gradually enable verbosity string for more operators in future.
**Less verbose mode:** dataframe.explain(extended = false)
`output=[count(a)#85L]` is **NOT** displayed for HashAggregate.
```
scala> Seq((1,2,3)).toDF("a", "b", "c").createTempView("df2")
scala> spark.sql("select count(a) from df2").explain()
== Physical Plan ==
*HashAggregate(key=[], functions=[count(1)])
+- Exchange SinglePartition
+- *HashAggregate(key=[], functions=[partial_count(1)])
+- LocalTableScan
```
**Verbose mode:** dataframe.explain(extended = true)
`output=[count(a)#85L]` is displayed for HashAggregate.
```
scala> spark.sql("select count(a) from df2").explain(true) // "output=[count(a)#85L]" is added
...
== Physical Plan ==
*HashAggregate(key=[], functions=[count(1)], output=[count(a)#85L])
+- Exchange SinglePartition
+- *HashAggregate(key=[], functions=[partial_count(1)], output=[count#87L])
+- LocalTableScan
```
## How was this patch tested?
Manual test.
Author: Sean Zhong <seanzhong@databricks.com>
Closes#13535 from clockfly/verbose_breakdown_2.
## What changes were proposed in this pull request?
This PR makes sure the typed Filter doesn't change the Dataset schema.
**Before the change:**
```
scala> val df = spark.range(0,9)
scala> df.schema
res12: org.apache.spark.sql.types.StructType = StructType(StructField(id,LongType,false))
scala> val afterFilter = df.filter(_=>true)
scala> afterFilter.schema // !!! schema is CHANGED!!! Column name is changed from id to value, nullable is changed from false to true.
res13: org.apache.spark.sql.types.StructType = StructType(StructField(value,LongType,true))
```
SerializeFromObject and DeserializeToObject are inserted to wrap the Filter, and these two can possibly change the schema of Dataset.
**After the change:**
```
scala> afterFilter.schema // schema is NOT changed.
res47: org.apache.spark.sql.types.StructType = StructType(StructField(id,LongType,false))
```
## How was this patch tested?
Unit test.
Author: Sean Zhong <seanzhong@databricks.com>
Closes#13529 from clockfly/spark-15632.
BindReferences contains a n^2 loop which causes performance issues when operating over large schemas: to determine the ordinal of an attribute reference, we perform a linear scan over the `input` array. Because input can sometimes be a `List`, the call to `input(ordinal).nullable` can also be O(n).
Instead of performing a linear scan, we can convert the input into an array and build a hash map to map from expression ids to ordinals. The greater up-front cost of the map construction is offset by the fact that an expression can contain multiple attribute references, so the cost of the map construction is amortized across a number of lookups.
Perf. benchmarks to follow. /cc ericl
Author: Josh Rosen <joshrosen@databricks.com>
Closes#13505 from JoshRosen/bind-references-improvement.
## What changes were proposed in this pull request?
`an -> a`
Use cmds like `find . -name '*.R' | xargs -i sh -c "grep -in ' an [^aeiou]' {} && echo {}"` to generate candidates, and review them one by one.
## How was this patch tested?
manual tests
Author: Zheng RuiFeng <ruifengz@foxmail.com>
Closes#13515 from zhengruifeng/an_a.
## What changes were proposed in this pull request?
This PR improves the error handling of `RowEncoder`. When we create a `RowEncoder` with a given schema, we should validate the data type of input object. e.g. we should throw an exception when a field is boolean but is declared as a string column.
This PR also removes the support to use `Product` as a valid external type of struct type. This support is added at https://github.com/apache/spark/pull/9712, but is incomplete, e.g. nested product, product in array are both not working. However, we never officially support this feature and I think it's ok to ban it.
## How was this patch tested?
new tests in `RowEncoderSuite`.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#13401 from cloud-fan/bug.
## What changes were proposed in this pull request?
In forType function of object RandomDataGenerator, the code following:
if (maybeSqlTypeGenerator.isDefined){
....
Some(generator)
} else{
None
}
will be changed. Instead, maybeSqlTypeGenerator.map will be used.
## How was this patch tested?
All of the current unit tests passed.
Author: Weiqing Yang <yangweiqing001@gmail.com>
Closes#13448 from Sherry302/master.
We should cache `Metadata.hashCode` and use a singleton for `Metadata.empty` because calculating metadata hashCodes appears to be a bottleneck for certain workloads.
We should also cache `StructType.hashCode`.
In an optimizer stress-test benchmark run by ericl, these `hashCode` calls accounted for roughly 40% of the total CPU time and this bottleneck was completely eliminated by the caching added by this patch.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#13504 from JoshRosen/metadata-fix.
## What changes were proposed in this pull request?
For input object of non-flat type, we can't encode it to row if it's null, as Spark SQL doesn't allow row to be null, only its columns can be null.
This PR explicitly add this constraint and throw exception if users break it.
## How was this patch tested?
several new tests
Author: Wenchen Fan <wenchen@databricks.com>
Closes#13469 from cloud-fan/null-object.
## What changes were proposed in this pull request?
There are 2 kinds of `GetStructField`:
1. resolved from `UnresolvedExtractValue`, and it will have a `name` property.
2. created when we build deserializer expression for nested tuple, no `name` property.
When we want to validate the ordinals of nested tuple, we should only catch `GetStructField` without the name property.
## How was this patch tested?
new test in `EncoderResolutionSuite`
Author: Wenchen Fan <wenchen@databricks.com>
Closes#13474 from cloud-fan/ordinal-check.
#### What changes were proposed in this pull request?
Before this PR, the output of EXPLAIN of following SQL is like
```SQL
CREATE EXTERNAL TABLE extTable_with_partitions (key INT, value STRING)
PARTITIONED BY (ds STRING, hr STRING)
LOCATION '/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-b39a6185-8981-403b-a4aa-36fb2f4ca8a9'
```
``ExecutedCommand CreateTableCommand CatalogTable(`extTable_with_partitions`,CatalogTableType(EXTERNAL),CatalogStorageFormat(Some(/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-dd234718-e85d-4c5a-8353-8f1834ac0323),Some(org.apache.hadoop.mapred.TextInputFormat),Some(org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat),None,false,Map()),List(CatalogColumn(key,int,true,None), CatalogColumn(value,string,true,None), CatalogColumn(ds,string,true,None), CatalogColumn(hr,string,true,None)),List(ds, hr),List(),List(),-1,,1463026413544,-1,Map(),None,None,None), false``
After this PR, the output is like
```
ExecutedCommand
: +- CreateTableCommand CatalogTable(
Table:`extTable_with_partitions`
Created:Thu Jun 02 21:30:54 PDT 2016
Last Access:Wed Dec 31 15:59:59 PST 1969
Type:EXTERNAL
Schema:[`key` int, `value` string, `ds` string, `hr` string]
Partition Columns:[`ds`, `hr`]
Storage(Location:/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-a06083b8-8e88-4d07-9ff0-d6bd8d943ad3, InputFormat:org.apache.hadoop.mapred.TextInputFormat, OutputFormat:org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat)), false
```
This is also applicable to `DESC EXTENDED`. However, this does not have special handling for Data Source Tables. If needed, we need to move the logics of `DDLUtil`. Let me know if we should do it in this PR. Thanks! rxin liancheng
#### How was this patch tested?
Manual testing
Author: gatorsmile <gatorsmile@gmail.com>
Closes#13070 from gatorsmile/betterExplainCatalogTable.
In Catalyst's TreeNode transform methods we end up calling `productIterator.map(...).toArray` in a number of places, which is slightly inefficient because it needs to allocate an `ArrayBuilder` and grow a temporary array. Since we already know the size of the final output (`productArity`), we can simply allocate an array up-front and use a while loop to consume the iterator and populate the array.
For most workloads, this performance difference is negligible but it does make a measurable difference in optimizer performance for queries that operate over very wide schemas (such as the benchmark queries in #13456).
### Perf results (from #13456 benchmarks)
**Before**
```
Java HotSpot(TM) 64-Bit Server VM 1.8.0_66-b17 on Mac OS X 10.10.5
Intel(R) Core(TM) i7-4960HQ CPU 2.60GHz
parsing large select: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
1 select expressions 19 / 22 0.0 19119858.0 1.0X
10 select expressions 23 / 25 0.0 23208774.0 0.8X
100 select expressions 55 / 73 0.0 54768402.0 0.3X
1000 select expressions 229 / 259 0.0 228606373.0 0.1X
2500 select expressions 530 / 554 0.0 529938178.0 0.0X
```
**After**
```
parsing large select: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
1 select expressions 15 / 21 0.0 14978203.0 1.0X
10 select expressions 22 / 27 0.0 22492262.0 0.7X
100 select expressions 48 / 64 0.0 48449834.0 0.3X
1000 select expressions 189 / 208 0.0 189346428.0 0.1X
2500 select expressions 429 / 449 0.0 428943897.0 0.0X
```
###
Author: Josh Rosen <joshrosen@databricks.com>
Closes#13484 from JoshRosen/treenode-productiterator-map.
## What changes were proposed in this pull request?
Queries with scalar sub-query in the SELECT list run against a local, in-memory relation throw
UnsupportedOperationException exception.
Problem repro:
```SQL
scala> Seq((1, 1), (2, 2)).toDF("c1", "c2").createOrReplaceTempView("t1")
scala> Seq((1, 1), (2, 2)).toDF("c1", "c2").createOrReplaceTempView("t2")
scala> sql("select (select min(c1) from t2) from t1").show()
java.lang.UnsupportedOperationException: Cannot evaluate expression: scalar-subquery#62 []
at org.apache.spark.sql.catalyst.expressions.Unevaluable$class.eval(Expression.scala:215)
at org.apache.spark.sql.catalyst.expressions.ScalarSubquery.eval(subquery.scala:62)
at org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:142)
at org.apache.spark.sql.catalyst.expressions.InterpretedProjection.apply(Projection.scala:45)
at org.apache.spark.sql.catalyst.expressions.InterpretedProjection.apply(Projection.scala:29)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:285)
at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$$anonfun$apply$37.applyOrElse(Optimizer.scala:1473)
```
The problem is specific to local, in memory relations. It is caused by rule ConvertToLocalRelation, which attempts to push down
a scalar-subquery expression to the local tables.
The solution prevents the rule to apply if Project references scalar subqueries.
## How was this patch tested?
Added regression tests to SubquerySuite.scala
Author: Ioana Delaney <ioanamdelaney@gmail.com>
Closes#13418 from ioana-delaney/scalarSubV2.
## What changes were proposed in this pull request?
Our encoder framework has been evolved a lot, this PR tries to clean up the code to make it more readable and emphasise the concept that encoder should be used as a container of serde expressions.
1. move validation logic to analyzer instead of encoder
2. only have a `resolveAndBind` method in encoder instead of `resolve` and `bind`, as we don't have the encoder life cycle concept anymore.
3. `Dataset` don't need to keep a resolved encoder, as there is no such concept anymore. bound encoder is still needed to do serialization outside of query framework.
4. Using `BoundReference` to represent an unresolved field in deserializer expression is kind of weird, this PR adds a `GetColumnByOrdinal` for this purpose. (serializer expression still use `BoundReference`, we can replace it with `GetColumnByOrdinal` in follow-ups)
## How was this patch tested?
existing test
Author: Wenchen Fan <wenchen@databricks.com>
Author: Cheng Lian <lian@databricks.com>
Closes#13269 from cloud-fan/clean-encoder.
## What changes were proposed in this pull request?
This PR makes the explain output less verbose by hiding some verbose output like `None`, `null`, empty List `[]`, empty set `{}`, and etc.
**Before change**:
```
== Physical Plan ==
ExecutedCommand
: +- ShowTablesCommand None, None
```
**After change**:
```
== Physical Plan ==
ExecutedCommand
: +- ShowTablesCommand
```
## How was this patch tested?
Manual test.
Author: Sean Zhong <seanzhong@databricks.com>
Closes#13470 from clockfly/verbose_breakdown_4.
## What changes were proposed in this pull request?
When users create a case class and use java reserved keyword as field name, spark sql will generate illegal java code and throw exception at runtime.
This PR checks the field names when building the encoder, and if illegal field names are used, throw exception immediately with a good error message.
## How was this patch tested?
new test in DatasetSuite
Author: Wenchen Fan <wenchen@databricks.com>
Closes#13485 from cloud-fan/java.
## What changes were proposed in this pull request?
This command didn't work for Hive tables. Now it does:
```
ALTER TABLE boxes PARTITION (width=3)
SET SERDE 'com.sparkbricks.serde.ColumnarSerDe'
WITH SERDEPROPERTIES ('compress'='true')
```
## How was this patch tested?
`HiveExternalCatalogSuite`
Author: Andrew Or <andrew@databricks.com>
Closes#13453 from andrewor14/alter-partition-storage.
## What changes were proposed in this pull request?
This patch fixes a number of `com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException` exceptions reported in [SPARK-15604], [SPARK-14752] etc. (while executing sparkSQL queries with the kryo serializer) by explicitly implementing `KryoSerialization` for `LazilyGenerateOrdering`.
## How was this patch tested?
1. Modified `OrderingSuite` so that all tests in the suite also test kryo serialization (for both interpreted and generated ordering).
2. Manually verified TPC-DS q1.
Author: Sameer Agarwal <sameer@databricks.com>
Closes#13466 from sameeragarwal/kryo.
## What changes were proposed in this pull request?
This PR add a rule at the end of analyzer to correct nullable fields of attributes in a logical plan by using nullable fields of the corresponding attributes in its children logical plans (these plans generate the input rows).
This is another approach for addressing SPARK-13484 (the first approach is https://github.com/apache/spark/pull/11371).
Close#113711
Author: Takeshi YAMAMURO <linguin.m.s@gmail.com>
Author: Yin Huai <yhuai@databricks.com>
Closes#13290 from yhuai/SPARK-13484.
## What changes were proposed in this pull request?
Join on transformed dataset has attributes conflicts, which make query execution failure, for example:
```
val dataset = Seq(1, 2, 3).toDs
val mappedDs = dataset.map(_ + 1)
mappedDs.as("t1").joinWith(mappedDs.as("t2"), $"t1.value" === $"t2.value").show()
```
will throw exception:
```
org.apache.spark.sql.AnalysisException: cannot resolve '`t1.value`' given input columns: [value];
at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:62)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:59)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:287)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:287)
```
## How was this patch tested?
Unit test.
Author: jerryshao <sshao@hortonworks.com>
Closes#13399 from jerryshao/SPARK-15620.
## What changes were proposed in this pull request?
Improves the explain output of several physical plans by displaying embedded logical plan in tree style
Some physical plan contains a embedded logical plan, for example, `cache tableName query` maps to:
```
case class CacheTableCommand(
tableName: String,
plan: Option[LogicalPlan],
isLazy: Boolean)
extends RunnableCommand
```
It is easier to read the explain output if we can display the `plan` in tree style.
**Before change:**
Everything is messed in one line.
```
scala> Seq((1,2)).toDF().createOrReplaceTempView("testView")
scala> spark.sql("cache table testView2 select * from testView").explain()
== Physical Plan ==
ExecutedCommand CacheTableCommand testView2, Some('Project [*]
+- 'UnresolvedRelation `testView`, None
), false
```
**After change:**
```
scala> spark.sql("cache table testView2 select * from testView").explain()
== Physical Plan ==
ExecutedCommand
: +- CacheTableCommand testView2, false
: : +- 'Project [*]
: : +- 'UnresolvedRelation `testView`, None
```
## How was this patch tested?
Manual test.
Author: Sean Zhong <seanzhong@databricks.com>
Closes#13433 from clockfly/verbose_breakdown_3_2.