Commit graph

2105 commits

Author SHA1 Message Date
Reynold Xin 66fe819ada [SPARK-19149][SQL] Follow-up: simplify cache implementation.
## What changes were proposed in this pull request?
This patch simplifies slightly the logical plan statistics cache implementation, as discussed in https://github.com/apache/spark/pull/16529

## How was this patch tested?
N/A - this has no behavior change.

Author: Reynold Xin <rxin@databricks.com>

Closes #16544 from rxin/SPARK-19149.
2017-01-11 14:25:36 -08:00
jiangxingbo 30a07071f0 [SPARK-18801][SQL] Support resolve a nested view
## What changes were proposed in this pull request?

We should be able to resolve a nested view. The main advantage is that if you update an underlying view, the current view also gets updated.
The new approach should be compatible with older versions of SPARK/HIVE, that means:
1. The new approach should be able to resolve the views that created by older versions of SPARK/HIVE;
2. The new approach should be able to resolve the views that are currently supported by SPARK SQL.

The new approach mainly brings in the following changes:
1. Add a new operator called `View` to keep track of the CatalogTable that describes the view, and the output attributes as well as the child of the view;
2. Update the `ResolveRelations` rule to resolve the relations and views, note that a nested view should be resolved correctly;
3. Add `viewDefaultDatabase` variable to `CatalogTable` to keep track of the default database name used to resolve a view, if the `CatalogTable` is not a view, then the variable should be `None`;
4. Add `AnalysisContext` to enable us to still support a view created with CTE/Windows query;
5. Enables the view support without enabling Hive support (i.e., enableHiveSupport);
6. Fix a weird behavior: the result of a view query may have different schema if the referenced table has been changed. After this PR, we try to cast the child output attributes to that from the view schema, throw an AnalysisException if cast is not allowed.

Note this is compatible with the views defined by older versions of Spark(before 2.2), which have empty `defaultDatabase` and all the relations in `viewText` have database part defined.

## How was this patch tested?
1. Add new tests in `SessionCatalogSuite` to test the function `lookupRelation`;
2. Add new test case in `SQLViewSuite` to test resolve a nested view.

Author: jiangxingbo <jiangxb1987@gmail.com>

Closes #16233 from jiangxb1987/resolve-view.
2017-01-11 13:44:07 -08:00
wangzhenhua a615513569 [SPARK-19149][SQL] Unify two sets of statistics in LogicalPlan
## What changes were proposed in this pull request?

Currently we have two sets of statistics in LogicalPlan: a simple stats and a stats estimated by cbo, but the computing logic and naming are quite confusing, we need to unify these two sets of stats.

## How was this patch tested?

Just modify existing tests.

Author: wangzhenhua <wangzhenhua@huawei.com>
Author: Zhenhua Wang <wzh_zju@163.com>

Closes #16529 from wzhfy/unifyStats.
2017-01-10 22:34:44 -08:00
Shixiong Zhu bc6c56e940 [SPARK-19140][SS] Allow update mode for non-aggregation streaming queries
## What changes were proposed in this pull request?

This PR allow update mode for non-aggregation streaming queries. It will be same as the append mode if a query has no aggregations.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #16520 from zsxwing/update-without-agg.
2017-01-10 17:58:11 -08:00
Liwei Lin acfc5f3543 [SPARK-16845][SQL] GeneratedClass$SpecificOrdering grows beyond 64 KB
## What changes were proposed in this pull request?

Prior to this patch, we'll generate `compare(...)` for `GeneratedClass$SpecificOrdering` like below, leading to Janino exceptions saying the code grows beyond 64 KB.

``` scala
/* 005 */ class SpecificOrdering extends o.a.s.sql.catalyst.expressions.codegen.BaseOrdering {
/* ..... */   ...
/* 10969 */   private int compare(InternalRow a, InternalRow b) {
/* 10970 */     InternalRow i = null;  // Holds current row being evaluated.
/* 10971 */
/* 1.... */     code for comparing field0
/* 1.... */     code for comparing field1
/* 1.... */     ...
/* 1.... */     code for comparing field449
/* 15012 */
/* 15013 */     return 0;
/* 15014 */   }
/* 15015 */ }
```

This patch would break `compare(...)` into smaller `compare_xxx(...)` methods when necessary; then we'll get generated `compare(...)` like:

``` scala
/* 001 */ public SpecificOrdering generate(Object[] references) {
/* 002 */   return new SpecificOrdering(references);
/* 003 */ }
/* 004 */
/* 005 */ class SpecificOrdering extends o.a.s.sql.catalyst.expressions.codegen.BaseOrdering {
/* 006 */
/* 007 */     ...
/* 1.... */
/* 11290 */   private int compare_0(InternalRow a, InternalRow b) {
/* 11291 */     InternalRow i = null;  // Holds current row being evaluated.
/* 11292 */
/* 11293 */     i = a;
/* 11294 */     boolean isNullA;
/* 11295 */     UTF8String primitiveA;
/* 11296 */     {
/* 11297 */
/* 11298 */       Object obj = ((Expression) references[0]).eval(null);
/* 11299 */       UTF8String value = (UTF8String) obj;
/* 11300 */       isNullA = false;
/* 11301 */       primitiveA = value;
/* 11302 */     }
/* 11303 */     i = b;
/* 11304 */     boolean isNullB;
/* 11305 */     UTF8String primitiveB;
/* 11306 */     {
/* 11307 */
/* 11308 */       Object obj = ((Expression) references[0]).eval(null);
/* 11309 */       UTF8String value = (UTF8String) obj;
/* 11310 */       isNullB = false;
/* 11311 */       primitiveB = value;
/* 11312 */     }
/* 11313 */     if (isNullA && isNullB) {
/* 11314 */       // Nothing
/* 11315 */     } else if (isNullA) {
/* 11316 */       return -1;
/* 11317 */     } else if (isNullB) {
/* 11318 */       return 1;
/* 11319 */     } else {
/* 11320 */       int comp = primitiveA.compare(primitiveB);
/* 11321 */       if (comp != 0) {
/* 11322 */         return comp;
/* 11323 */       }
/* 11324 */     }
/* 11325 */
/* 11326 */
/* 11327 */     i = a;
/* 11328 */     boolean isNullA1;
/* 11329 */     UTF8String primitiveA1;
/* 11330 */     {
/* 11331 */
/* 11332 */       Object obj1 = ((Expression) references[1]).eval(null);
/* 11333 */       UTF8String value1 = (UTF8String) obj1;
/* 11334 */       isNullA1 = false;
/* 11335 */       primitiveA1 = value1;
/* 11336 */     }
/* 11337 */     i = b;
/* 11338 */     boolean isNullB1;
/* 11339 */     UTF8String primitiveB1;
/* 11340 */     {
/* 11341 */
/* 11342 */       Object obj1 = ((Expression) references[1]).eval(null);
/* 11343 */       UTF8String value1 = (UTF8String) obj1;
/* 11344 */       isNullB1 = false;
/* 11345 */       primitiveB1 = value1;
/* 11346 */     }
/* 11347 */     if (isNullA1 && isNullB1) {
/* 11348 */       // Nothing
/* 11349 */     } else if (isNullA1) {
/* 11350 */       return -1;
/* 11351 */     } else if (isNullB1) {
/* 11352 */       return 1;
/* 11353 */     } else {
/* 11354 */       int comp = primitiveA1.compare(primitiveB1);
/* 11355 */       if (comp != 0) {
/* 11356 */         return comp;
/* 11357 */       }
/* 11358 */     }
/* 1.... */
/* 1.... */   ...
/* 1.... */
/* 12652 */     return 0;
/* 12653 */   }
/* 1.... */
/* 1.... */   ...
/* 15387 */
/* 15388 */   public int compare(InternalRow a, InternalRow b) {
/* 15389 */
/* 15390 */     int comp_0 = compare_0(a, b);
/* 15391 */     if (comp_0 != 0) {
/* 15392 */       return comp_0;
/* 15393 */     }
/* 15394 */
/* 15395 */     int comp_1 = compare_1(a, b);
/* 15396 */     if (comp_1 != 0) {
/* 15397 */       return comp_1;
/* 15398 */     }
/* 1.... */
/* 1.... */     ...
/* 1.... */
/* 15450 */     return 0;
/* 15451 */   }
/* 15452 */ }
```
## How was this patch tested?
- a new added test case which
  - would fail prior to this patch
  - would pass with this patch
- ordering correctness should already be covered by existing tests like those in `OrderingSuite`

## Acknowledgement

A major part of this PR - the refactoring work of `splitExpression()` - has been done by ueshin.

Author: Liwei Lin <lwlin7@gmail.com>
Author: Takuya UESHIN <ueshin@happy-camper.st>
Author: Takuya Ueshin <ueshin@happy-camper.st>

Closes #15480 from lw-lin/spec-ordering-64k-.
2017-01-10 19:35:46 +08:00
Zhenhua Wang 15c2bd01b0 [SPARK-19020][SQL] Cardinality estimation of aggregate operator
## What changes were proposed in this pull request?

Support cardinality estimation of aggregate operator

## How was this patch tested?

Add test cases

Author: Zhenhua Wang <wzh_zju@163.com>
Author: wangzhenhua <wangzhenhua@huawei.com>

Closes #16431 from wzhfy/aggEstimation.
2017-01-09 11:29:42 -08:00
Zhenhua Wang 3ccabdfb4d [SPARK-17077][SQL] Cardinality estimation for project operator
## What changes were proposed in this pull request?

Support cardinality estimation for project operator.

## How was this patch tested?

Add a test suite and a base class in the catalyst package.

Author: Zhenhua Wang <wzh_zju@163.com>

Closes #16430 from wzhfy/projectEstimation.
2017-01-08 21:15:52 -08:00
Michal Senkyr 903bb8e8a2 [SPARK-16792][SQL] Dataset containing a Case Class with a List type causes a CompileException (converting sequence to list)
## What changes were proposed in this pull request?

Added a `to` call at the end of the code generated by `ScalaReflection.deserializerFor` if the requested type is not a supertype of `WrappedArray[_]` that uses `CanBuildFrom[_, _, _]` to convert result into an arbitrary subtype of `Seq[_]`.

Care was taken to preserve the original deserialization where it is possible to avoid the overhead of conversion in cases where it is not needed

`ScalaReflection.serializerFor` could already be used to serialize any `Seq[_]` so it was not altered

`SQLImplicits` had to be altered and new implicit encoders added to permit serialization of other sequence types

Also fixes [SPARK-16815] Dataset[List[T]] leads to ArrayStoreException

## How was this patch tested?
```bash
./build/mvn -DskipTests clean package && ./dev/run-tests
```

Also manual execution of the following sets of commands in the Spark shell:
```scala
case class TestCC(key: Int, letters: List[String])

val ds1 = sc.makeRDD(Seq(
(List("D")),
(List("S","H")),
(List("F","H")),
(List("D","L","L"))
)).map(x=>(x.length,x)).toDF("key","letters").as[TestCC]

val test1=ds1.map{_.key}
test1.show
```

```scala
case class X(l: List[String])
spark.createDataset(Seq(List("A"))).map(X).show
```

```scala
spark.sqlContext.createDataset(sc.parallelize(List(1) :: Nil)).collect
```

After adding arbitrary sequence support also tested with the following commands:

```scala
case class QueueClass(q: scala.collection.immutable.Queue[Int])

spark.createDataset(Seq(List(1,2,3))).map(x => QueueClass(scala.collection.immutable.Queue(x: _*))).map(_.q.dequeue).collect
```

Author: Michal Senkyr <mike.senkyr@gmail.com>

Closes #16240 from michalsenkyr/sql-caseclass-list-fix.
2017-01-06 15:05:20 +08:00
Wenchen Fan cca945b6aa [SPARK-18885][SQL] unify CREATE TABLE syntax for data source and hive serde tables
## What changes were proposed in this pull request?

Today we have different syntax to create data source or hive serde tables, we should unify them to not confuse users and step forward to make hive a data source.

Please read https://issues.apache.org/jira/secure/attachment/12843835/CREATE-TABLE.pdf for  details.

TODO(for follow-up PRs):
1. TBLPROPERTIES is not added to the new syntax, we should decide if we wanna add it later.
2. `SHOW CREATE TABLE` should be updated to use the new syntax.
3. we should decide if we wanna change the behavior of `SET LOCATION`.

## How was this patch tested?

new tests

Author: Wenchen Fan <wenchen@databricks.com>

Closes #16296 from cloud-fan/create-table.
2017-01-05 17:40:27 -08:00
Niranjan Padmanabhan a1e40b1f5d
[MINOR][DOCS] Remove consecutive duplicated words/typo in Spark Repo
## What changes were proposed in this pull request?
There are many locations in the Spark repo where the same word occurs consecutively. Sometimes they are appropriately placed, but many times they are not. This PR removes the inappropriately duplicated words.

## How was this patch tested?
N/A since only docs or comments were updated.

Author: Niranjan Padmanabhan <niranjan.padmanabhan@gmail.com>

Closes #16455 from neurons/np.structure_streaming_doc.
2017-01-04 15:07:29 +00:00
Wenchen Fan 101556d0fa [SPARK-19060][SQL] remove the supportsPartial flag in AggregateFunction
## What changes were proposed in this pull request?

Now all aggregation functions support partial aggregate, we can remove the `supportsPartual` flag in `AggregateFunction`

## How was this patch tested?

existing tests.

Author: Wenchen Fan <wenchen@databricks.com>

Closes #16461 from cloud-fan/partial.
2017-01-04 12:46:30 +01:00
Wenchen Fan cbd11d2357 [SPARK-19072][SQL] codegen of Literal should not output boxed value
## What changes were proposed in this pull request?

In https://github.com/apache/spark/pull/16402 we made a mistake that, when double/float is infinity, the `Literal` codegen will output boxed value and cause wrong result.

This PR fixes this by special handling infinity to not output boxed value.

## How was this patch tested?

new regression test

Author: Wenchen Fan <wenchen@databricks.com>

Closes #16469 from cloud-fan/literal.
2017-01-03 22:40:14 -08:00
gatorsmile b67b35f76b [SPARK-19048][SQL] Delete Partition Location when Dropping Managed Partitioned Tables in InMemoryCatalog
### What changes were proposed in this pull request?
The data in the managed table should be deleted after table is dropped. However, if the partition location is not under the location of the partitioned table, it is not deleted as expected. Users can specify any location for the partition when they adding a partition.

This PR is to delete partition location when dropping managed partitioned tables stored in `InMemoryCatalog`.

### How was this patch tested?
Added test cases for both HiveExternalCatalog and InMemoryCatalog

Author: gatorsmile <gatorsmile@gmail.com>

Closes #16448 from gatorsmile/unsetSerdeProp.
2017-01-03 11:43:47 -08:00
Liang-Chi Hsieh 52636226dc [SPARK-18932][SQL] Support partial aggregation for collect_set/collect_list
## What changes were proposed in this pull request?

Currently collect_set/collect_list aggregation expression don't support partial aggregation. This patch is to enable partial aggregation for them.

## How was this patch tested?

Jenkins tests.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #16371 from viirya/collect-partial-support.
2017-01-03 22:11:54 +08:00
Zhenhua Wang ae83c21125 [SPARK-18998][SQL] Add a cbo conf to switch between default statistics and estimated statistics
## What changes were proposed in this pull request?

We add a cbo configuration to switch between default stats and estimated stats.
We also define a new statistics method `planStats` in LogicalPlan with conf as its parameter, in order to pass the cbo switch and other estimation related configurations in the future. `planStats` is used on the caller sides (i.e. in Optimizer and Strategies) to make transformation decisions based on stats.

## How was this patch tested?

Add a test case using a dummy LogicalPlan.

Author: Zhenhua Wang <wzh_zju@163.com>

Closes #16401 from wzhfy/cboSwitch.
2017-01-03 12:19:52 +08:00
gatorsmile a6cd9dbc60 [SPARK-19029][SQL] Remove databaseName from SimpleCatalogRelation
### What changes were proposed in this pull request?
Remove useless `databaseName ` from `SimpleCatalogRelation`.

### How was this patch tested?
Existing test cases.

Author: gatorsmile <gatorsmile@gmail.com>

Closes #16438 from gatorsmile/removeDBFromSimpleCatalogRelation.
2017-01-03 11:55:31 +08:00
gatorsmile 35e974076d [SPARK-19028][SQL] Fixed non-thread-safe functions used in SessionCatalog
### What changes were proposed in this pull request?
Fixed non-thread-safe functions used in SessionCatalog:
- refreshTable
- lookupRelation

### How was this patch tested?
N/A

Author: gatorsmile <gatorsmile@gmail.com>

Closes #16437 from gatorsmile/addSyncToLookUpTable.
2016-12-31 19:40:28 +08:00
hyukjinkwon 852782b83c
[SPARK-18922][TESTS] Fix more path-related test failures on Windows
## What changes were proposed in this pull request?

This PR proposes to fix the test failures due to different format of paths on Windows.

Failed tests are as below:

```
ColumnExpressionSuite:
- input_file_name, input_file_block_start, input_file_block_length - FileScanRDD *** FAILED *** (187 milliseconds)
  "file:///C:/projects/spark/target/tmp/spark-0b21b963-6cfa-411c-8d6f-e6a5e1e73bce/part-00001-c083a03a-e55e-4b05-9073-451de352d006.snappy.parquet" did not contain "C:\projects\spark\target\tmp\spark-0b21b963-6cfa-411c-8d6f-e6a5e1e73bce" (ColumnExpressionSuite.scala:545)

- input_file_name, input_file_block_start, input_file_block_length - HadoopRDD *** FAILED *** (172 milliseconds)
  "file:/C:/projects/spark/target/tmp/spark-5d0afa94-7c2f-463b-9db9-2e8403e2bc5f/part-00000-f6530138-9ad3-466d-ab46-0eeb6f85ed0b.txt" did not contain "C:\projects\spark\target\tmp\spark-5d0afa94-7c2f-463b-9db9-2e8403e2bc5f" (ColumnExpressionSuite.scala:569)

- input_file_name, input_file_block_start, input_file_block_length - NewHadoopRDD *** FAILED *** (156 milliseconds)
  "file:/C:/projects/spark/target/tmp/spark-a894c7df-c74d-4d19-82a2-a04744cb3766/part-00000-29674e3f-3fcf-4327-9b04-4dab1d46338d.txt" did not contain "C:\projects\spark\target\tmp\spark-a894c7df-c74d-4d19-82a2-a04744cb3766" (ColumnExpressionSuite.scala:598)
```

```
DataStreamReaderWriterSuite:
- source metadataPath *** FAILED *** (62 milliseconds)
  org.mockito.exceptions.verification.junit.ArgumentsAreDifferent: Argument(s) are different! Wanted:
streamSourceProvider.createSource(
    org.apache.spark.sql.SQLContext3b04133b,
    "C:\projects\spark\target\tmp\streaming.metadata-b05db6ae-c8dc-4ce4-b0d9-1eb8c84876c0/sources/0",
    None,
    "org.apache.spark.sql.streaming.test",
    Map()
);
-> at org.apache.spark.sql.streaming.test.DataStreamReaderWriterSuite$$anonfun$12.apply$mcV$sp(DataStreamReaderWriterSuite.scala:374)
Actual invocation has different arguments:
streamSourceProvider.createSource(
    org.apache.spark.sql.SQLContext3b04133b,
    "/C:/projects/spark/target/tmp/streaming.metadata-b05db6ae-c8dc-4ce4-b0d9-1eb8c84876c0/sources/0",
    None,
    "org.apache.spark.sql.streaming.test",
    Map()
);
```

```
GlobalTempViewSuite:
- CREATE GLOBAL TEMP VIEW USING *** FAILED *** (110 milliseconds)
  org.apache.spark.sql.AnalysisException: Path does not exist: file:/C:projectsspark  arget mpspark-960398ba-a0a1-45f6-a59a-d98533f9f519;
```

```
CreateTableAsSelectSuite:
- CREATE TABLE USING AS SELECT *** FAILED *** (0 milliseconds)
  java.lang.IllegalArgumentException: Can not create a Path from an empty string

- create a table, drop it and create another one with the same name *** FAILED *** (16 milliseconds)
  java.lang.IllegalArgumentException: Can not create a Path from an empty string

- create table using as select - with partitioned by *** FAILED *** (0 milliseconds)
  java.lang.IllegalArgumentException: Can not create a Path from an empty string

- create table using as select - with non-zero buckets *** FAILED *** (0 milliseconds)
  java.lang.IllegalArgumentException: Can not create a Path from an empty string
```

```
HiveMetadataCacheSuite:
- partitioned table is cached when partition pruning is true *** FAILED *** (532 milliseconds)
  org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:java.lang.IllegalArgumentException: Can not create a Path from an empty string);

- partitioned table is cached when partition pruning is false *** FAILED *** (297 milliseconds)
  org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:java.lang.IllegalArgumentException: Can not create a Path from an empty string);
```

```
MultiDatabaseSuite:
- createExternalTable() to non-default database - with USE *** FAILED *** (954 milliseconds)
  org.apache.spark.sql.AnalysisException: Path does not exist: file:/C:projectsspark  arget mpspark-0839d9a7-5e29-467a-9e3e-3e4cd618ee09;

- createExternalTable() to non-default database - without USE *** FAILED *** (500 milliseconds)
  org.apache.spark.sql.AnalysisException: Path does not exist: file:/C:projectsspark  arget mpspark-c7e24d73-1d8f-45e8-ab7d-53a83087aec3;

 - invalid database name and table names *** FAILED *** (31 milliseconds)
   "Path does not exist: file:/C:projectsspark  arget mpspark-15a2a494-3483-4876-80e5-ec396e704b77;" did not contain "`t:a` is not a valid name for tables/databases. Valid names only contain alphabet characters, numbers and _." (MultiDatabaseSuite.scala:296)
```

```
OrcQuerySuite:
 - SPARK-8501: Avoids discovery schema from empty ORC files *** FAILED *** (15 milliseconds)
   org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:java.lang.IllegalArgumentException: Can not create a Path from an empty string);

 - Verify the ORC conversion parameter: CONVERT_METASTORE_ORC *** FAILED *** (78 milliseconds)
   org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:java.lang.IllegalArgumentException: Can not create a Path from an empty string);

 - converted ORC table supports resolving mixed case field *** FAILED *** (297 milliseconds)
   org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:java.lang.IllegalArgumentException: Can not create a Path from an empty string);
```

```
HadoopFsRelationTest - JsonHadoopFsRelationSuite, OrcHadoopFsRelationSuite, ParquetHadoopFsRelationSuite, SimpleTextHadoopFsRelationSuite:
 - Locality support for FileScanRDD *** FAILED *** (15 milliseconds)
   java.lang.IllegalArgumentException: Wrong FS: file://C:\projects\spark\target\tmp\spark-383d1f13-8783-47fd-964d-9c75e5eec50f, expected: file:///
```

```
HiveQuerySuite:
- CREATE TEMPORARY FUNCTION *** FAILED *** (0 milliseconds)
   java.net.MalformedURLException: For input string: "%5Cprojects%5Cspark%5Csql%5Chive%5Ctarget%5Cscala-2.11%5Ctest-classes%5CTestUDTF.jar"

 - ADD FILE command *** FAILED *** (500 milliseconds)
   java.net.URISyntaxException: Illegal character in opaque part at index 2: C:\projects\spark\sql\hive\target\scala-2.11\test-classes\data\files\v1.txt

 - ADD JAR command 2 *** FAILED *** (110 milliseconds)
   org.apache.spark.sql.AnalysisException: LOAD DATA input path does not exist: C:projectssparksqlhive  argetscala-2.11 est-classesdatafilessample.json;
```

```
PruneFileSourcePartitionsSuite:
 - PruneFileSourcePartitions should not change the output of LogicalRelation *** FAILED *** (15 milliseconds)
   org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:java.lang.IllegalArgumentException: Can not create a Path from an empty string);
```

```
HiveCommandSuite:
 - LOAD DATA LOCAL *** FAILED *** (109 milliseconds)
   org.apache.spark.sql.AnalysisException: LOAD DATA input path does not exist: C:projectssparksqlhive  argetscala-2.11 est-classesdatafilesemployee.dat;

 - LOAD DATA *** FAILED *** (93 milliseconds)
   java.net.URISyntaxException: Illegal character in opaque part at index 15: C:projectsspark arget mpemployee.dat7496657117354281006.tmp

 - Truncate Table *** FAILED *** (78 milliseconds)
   org.apache.spark.sql.AnalysisException: LOAD DATA input path does not exist: C:projectssparksqlhive  argetscala-2.11 est-classesdatafilesemployee.dat;
```

```
HiveExternalCatalogBackwardCompatibilitySuite:
- make sure we can read table created by old version of Spark *** FAILED *** (0 milliseconds)
  "[/C:/projects/spark/target/tmp/]spark-0554d859-74e1-..." did not equal "[C:\projects\spark\target\tmp\]spark-0554d859-74e1-..." (HiveExternalCatalogBackwardCompatibilitySuite.scala:213)
  org.scalatest.exceptions.TestFailedException

- make sure we can alter table location created by old version of Spark *** FAILED *** (110 milliseconds)
  java.net.URISyntaxException: Illegal character in opaque part at index 15: C:projectsspark	arget	mpspark-0e9b2c5f-49a1-4e38-a32a-c0ab1813a79f
```

```
ExternalCatalogSuite:
- create/drop/rename partitions should create/delete/rename the directory *** FAILED *** (610 milliseconds)
  java.net.URISyntaxException: Illegal character in opaque part at index 2: C:\projects\spark\target\tmp\spark-4c24f010-18df-437b-9fed-990c6f9adece
```

```
SQLQuerySuite:
- describe functions - temporary user defined functions *** FAILED *** (16 milliseconds)
  java.net.URISyntaxException: Illegal character in opaque part at index 22: C:projectssparksqlhive	argetscala-2.11	est-classesTestUDTF.jar

- specifying database name for a temporary table is not allowed *** FAILED *** (125 milliseconds)
  org.apache.spark.sql.AnalysisException: Path does not exist: file:/C:projectsspark	arget	mpspark-a34c9814-a483-43f2-be29-37f616b6df91;
```

```
PartitionProviderCompatibilitySuite:
- convert partition provider to hive with repair table *** FAILED *** (281 milliseconds)
  org.apache.spark.sql.AnalysisException: Path does not exist: file:/C:projectsspark	arget	mpspark-ee5fc96d-8c7d-4ebf-8571-a1d62736473e;

- when partition management is enabled, new tables have partition provider hive *** FAILED *** (187 milliseconds)
  org.apache.spark.sql.AnalysisException: Path does not exist: file:/C:projectsspark	arget	mpspark-803ad4d6-3e8c-498d-9ca5-5cda5d9b2a48;

- when partition management is disabled, new tables have no partition provider *** FAILED *** (172 milliseconds)
  org.apache.spark.sql.AnalysisException: Path does not exist: file:/C:projectsspark	arget	mpspark-c9fda9e2-4020-465f-8678-52cd72d0a58f;

- when partition management is disabled, we preserve the old behavior even for new tables *** FAILED *** (203 milliseconds)
  org.apache.spark.sql.AnalysisException: Path does not exist: file:/C:projectsspark	arget
mpspark-f4a518a6-c49d-43d3-b407-0ddd76948e13;

- insert overwrite partition of legacy datasource table *** FAILED *** (188 milliseconds)
  org.apache.spark.sql.AnalysisException: Path does not exist: file:/C:projectsspark	arget	mpspark-f4a518a6-c49d-43d3-b407-0ddd76948e79;

- insert overwrite partition of new datasource table overwrites just partition *** FAILED *** (219 milliseconds)
  org.apache.spark.sql.AnalysisException: Path does not exist: file:/C:projectsspark	arget	mpspark-6ba3a88d-6f6c-42c5-a9f4-6d924a0616ff;

- SPARK-18544 append with saveAsTable - partition management true *** FAILED *** (173 milliseconds)
  org.apache.spark.sql.AnalysisException: Path does not exist: file:/C:projectsspark	arget	mpspark-cd234a6d-9cb4-4d1d-9e51-854ae9543bbd;

- SPARK-18635 special chars in partition values - partition management true *** FAILED *** (2 seconds, 967 milliseconds)
  org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:java.lang.IllegalArgumentException: Can not create a Path from an empty string);

- SPARK-18635 special chars in partition values - partition management false *** FAILED *** (62 milliseconds)
  org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:java.lang.IllegalArgumentException: Can not create a Path from an empty string);

- SPARK-18659 insert overwrite table with lowercase - partition management true *** FAILED *** (63 milliseconds)
  org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:java.lang.IllegalArgumentException: Can not create a Path from an empty string);

- SPARK-18544 append with saveAsTable - partition management false *** FAILED *** (266 milliseconds)
  org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:java.lang.IllegalArgumentException: Can not create a Path from an empty string);

- SPARK-18659 insert overwrite table files - partition management false *** FAILED *** (63 milliseconds)
  org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:java.lang.IllegalArgumentException: Can not create a Path from an empty string);

- SPARK-18659 insert overwrite table with lowercase - partition management false *** FAILED *** (78 milliseconds)
  org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:java.lang.IllegalArgumentException: Can not create a Path from an empty string);

- sanity check table setup *** FAILED *** (31 milliseconds)
  org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:java.lang.IllegalArgumentException: Can not create a Path from an empty string);

- insert into partial dynamic partitions *** FAILED *** (47 milliseconds)
  org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:java.lang.IllegalArgumentException: Can not create a Path from an empty string);

- insert into fully dynamic partitions *** FAILED *** (62 milliseconds)
  org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:java.lang.IllegalArgumentException: Can not create a Path from an empty string);

- insert into static partition *** FAILED *** (78 milliseconds)
  org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:java.lang.IllegalArgumentException: Can not create a Path from an empty string);

- overwrite partial dynamic partitions *** FAILED *** (63 milliseconds)
  org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:java.lang.IllegalArgumentException: Can not create a Path from an empty string);

- overwrite fully dynamic partitions *** FAILED *** (47 milliseconds)
  org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:java.lang.IllegalArgumentException: Can not create a Path from an empty string);

- overwrite static partition *** FAILED *** (63 milliseconds)
  org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:java.lang.IllegalArgumentException: Can not create a Path from an empty string);
```

```
MetastoreDataSourcesSuite:
- check change without refresh *** FAILED *** (203 milliseconds)
  org.apache.spark.sql.AnalysisException: Path does not exist: file:/C:projectsspark	arget	mpspark-00713fe4-ca04-448c-bfc7-6c5e9a2ad2a1;

- drop, change, recreate *** FAILED *** (78 milliseconds)
  org.apache.spark.sql.AnalysisException: Path does not exist: file:/C:projectsspark	arget	mpspark-2030a21b-7d67-4385-a65b-bb5e2bed4861;

- SPARK-15269 external data source table creation *** FAILED *** (78 milliseconds)
  org.apache.spark.sql.AnalysisException: Path does not exist: file:/C:projectsspark	arget	mpspark-4d50fd4a-14bc-41d6-9232-9554dd233f86;

- CTAS *** FAILED *** (109 milliseconds)
  java.lang.IllegalArgumentException: Can not create a Path from an empty string

- CTAS with IF NOT EXISTS *** FAILED *** (109 milliseconds)
  java.lang.IllegalArgumentException: Can not create a Path from an empty string

- CTAS: persisted partitioned bucketed data source table *** FAILED *** (0 milliseconds)
  java.lang.IllegalArgumentException: Can not create a Path from an empty string

- SPARK-15025: create datasource table with path with select *** FAILED *** (16 milliseconds)
  java.lang.IllegalArgumentException: Can not create a Path from an empty string

- CTAS: persisted partitioned data source table *** FAILED *** (47 milliseconds)
  java.lang.IllegalArgumentException: Can not create a Path from an empty string
```

```
HiveMetastoreCatalogSuite:
- Persist non-partitioned parquet relation into metastore as managed table using CTAS *** FAILED *** (16 milliseconds)
  java.lang.IllegalArgumentException: Can not create a Path from an empty string

- Persist non-partitioned orc relation into metastore as managed table using CTAS *** FAILED *** (16 milliseconds)
  java.lang.IllegalArgumentException: Can not create a Path from an empty string
```

```
HiveUDFSuite:
- SPARK-11522 select input_file_name from non-parquet table *** FAILED *** (16 milliseconds)
  org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:java.lang.IllegalArgumentException: Can not create a Path from an empty string);
```

```
QueryPartitionSuite:
- SPARK-13709: reading partitioned Avro table with nested schema *** FAILED *** (250 milliseconds)
  org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:java.lang.IllegalArgumentException: Can not create a Path from an empty string);
```

```
ParquetHiveCompatibilitySuite:
- simple primitives *** FAILED *** (16 milliseconds)
  org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:java.lang.IllegalArgumentException: Can not create a Path from an empty string);

- SPARK-10177 timestamp *** FAILED *** (0 milliseconds)
  org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:java.lang.IllegalArgumentException: Can not create a Path from an empty string);

- array *** FAILED *** (16 milliseconds)
  org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:java.lang.IllegalArgumentException: Can not create a Path from an empty string);

- map *** FAILED *** (16 milliseconds)
  org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:java.lang.IllegalArgumentException: Can not create a Path from an empty string);

- struct *** FAILED *** (0 milliseconds)
  org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:java.lang.IllegalArgumentException: Can not create a Path from an empty string);

- SPARK-16344: array of struct with a single field named 'array_element' *** FAILED *** (15 milliseconds)
  org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:java.lang.IllegalArgumentException: Can not create a Path from an empty string);
```

## How was this patch tested?

Manually tested via AppVeyor.

```
ColumnExpressionSuite:
- input_file_name, input_file_block_start, input_file_block_length - FileScanRDD (234 milliseconds)
- input_file_name, input_file_block_start, input_file_block_length - HadoopRDD (235 milliseconds)
- input_file_name, input_file_block_start, input_file_block_length - NewHadoopRDD (203 milliseconds)
```

```
DataStreamReaderWriterSuite:
- source metadataPath (63 milliseconds)
```

```
GlobalTempViewSuite:
 - CREATE GLOBAL TEMP VIEW USING (436 milliseconds)
```

```
CreateTableAsSelectSuite:
- CREATE TABLE USING AS SELECT (171 milliseconds)
- create a table, drop it and create another one with the same name (422 milliseconds)
- create table using as select - with partitioned by (141 milliseconds)
- create table using as select - with non-zero buckets (125 milliseconds)
```

```
HiveMetadataCacheSuite:
- partitioned table is cached when partition pruning is true (3 seconds, 211 milliseconds)
- partitioned table is cached when partition pruning is false (1 second, 781 milliseconds)
```

```
MultiDatabaseSuite:
 - createExternalTable() to non-default database - with USE (797 milliseconds)
 - createExternalTable() to non-default database - without USE (640 milliseconds)
 - invalid database name and table names (62 milliseconds)
```

```
OrcQuerySuite:
 - SPARK-8501: Avoids discovery schema from empty ORC files (703 milliseconds)
 - Verify the ORC conversion parameter: CONVERT_METASTORE_ORC (750 milliseconds)
 - converted ORC table supports resolving mixed case field (625 milliseconds)
```

```
HadoopFsRelationTest - JsonHadoopFsRelationSuite, OrcHadoopFsRelationSuite, ParquetHadoopFsRelationSuite, SimpleTextHadoopFsRelationSuite:
 - Locality support for FileScanRDD (296 milliseconds)
```

```
HiveQuerySuite:
 - CREATE TEMPORARY FUNCTION (125 milliseconds)
 - ADD FILE command (250 milliseconds)
 - ADD JAR command 2 (609 milliseconds)
```

```
PruneFileSourcePartitionsSuite:
- PruneFileSourcePartitions should not change the output of LogicalRelation (359 milliseconds)
```

```
HiveCommandSuite:
 - LOAD DATA LOCAL (1 second, 829 milliseconds)
 - LOAD DATA (1 second, 735 milliseconds)
 - Truncate Table (1 second, 641 milliseconds)
```

```
HiveExternalCatalogBackwardCompatibilitySuite:
 - make sure we can read table created by old version of Spark (32 milliseconds)
 - make sure we can alter table location created by old version of Spark (125 milliseconds)
 - make sure we can rename table created by old version of Spark (281 milliseconds)
```

```
ExternalCatalogSuite:
- create/drop/rename partitions should create/delete/rename the directory (625 milliseconds)
```

```
SQLQuerySuite:
- describe functions - temporary user defined functions (31 milliseconds)
- specifying database name for a temporary table is not allowed (390 milliseconds)
```

```
PartitionProviderCompatibilitySuite:
 - convert partition provider to hive with repair table (813 milliseconds)
 - when partition management is enabled, new tables have partition provider hive (562 milliseconds)
 - when partition management is disabled, new tables have no partition provider (344 milliseconds)
 - when partition management is disabled, we preserve the old behavior even for new tables (422 milliseconds)
 - insert overwrite partition of legacy datasource table (750 milliseconds)
 - SPARK-18544 append with saveAsTable - partition management true (985 milliseconds)
 - SPARK-18635 special chars in partition values - partition management true (3 seconds, 328 milliseconds)
 - SPARK-18635 special chars in partition values - partition management false (2 seconds, 891 milliseconds)
 - SPARK-18659 insert overwrite table with lowercase - partition management true (750 milliseconds)
 - SPARK-18544 append with saveAsTable - partition management false (656 milliseconds)
 - SPARK-18659 insert overwrite table files - partition management false (922 milliseconds)
 - SPARK-18659 insert overwrite table with lowercase - partition management false (469 milliseconds)
 - sanity check table setup (937 milliseconds)
 - insert into partial dynamic partitions (2 seconds, 985 milliseconds)
 - insert into fully dynamic partitions (1 second, 937 milliseconds)
 - insert into static partition (1 second, 578 milliseconds)
 - overwrite partial dynamic partitions (7 seconds, 561 milliseconds)
 - overwrite fully dynamic partitions (1 second, 766 milliseconds)
 - overwrite static partition (1 second, 797 milliseconds)
```

```
MetastoreDataSourcesSuite:
 - check change without refresh (610 milliseconds)
 - drop, change, recreate (437 milliseconds)
 - SPARK-15269 external data source table creation (297 milliseconds)
 - CTAS with IF NOT EXISTS (437 milliseconds)
 - CTAS: persisted partitioned bucketed data source table (422 milliseconds)
 - SPARK-15025: create datasource table with path with select (265 milliseconds)
 - CTAS (438 milliseconds)
 - CTAS with IF NOT EXISTS (469 milliseconds)
 - CTAS: persisted partitioned bucketed data source table (406 milliseconds)
```

```
HiveMetastoreCatalogSuite:
 - Persist non-partitioned parquet relation into metastore as managed table using CTAS (406 milliseconds)
 - Persist non-partitioned orc relation into metastore as managed table using CTAS (313 milliseconds)
```

```
HiveUDFSuite:
 - SPARK-11522 select input_file_name from non-parquet table (3 seconds, 144 milliseconds)
```

```
QueryPartitionSuite:
 - SPARK-13709: reading partitioned Avro table with nested schema (1 second, 67 milliseconds)
```

```
ParquetHiveCompatibilitySuite:
 - simple primitives (745 milliseconds)
 - SPARK-10177 timestamp (375 milliseconds)
 - array (407 milliseconds)
 - map (409 milliseconds)
 - struct (437 milliseconds)
 - SPARK-16344: array of struct with a single field named 'array_element' (391 milliseconds)
```

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #16397 from HyukjinKwon/SPARK-18922-paths.
2016-12-30 11:16:03 +00:00
Kazuaki Ishizaki 93f35569fd [SPARK-16213][SQL] Reduce runtime overhead of a program that creates an primitive array in DataFrame
## What changes were proposed in this pull request?

This PR reduces runtime overhead of a program the creates an primitive array in DataFrame by using the similar approach to #15044. Generated code performs boxing operation in an assignment from InternalRow to an `Object[]` temporary array (at Lines 051 and 061 in the generated code before without this PR). If we know that type of array elements is primitive, we apply the following optimizations:
1. Eliminate a pair of `isNullAt()` and a null assignment
2. Allocate an primitive array instead of `Object[]` (eliminate boxing operations)
3. Create `UnsafeArrayData` by using `UnsafeArrayWriter` to keep a primitive array in a row format instead of doing non-lightweight operations in constructor of `GenericArrayData`
The PR also performs the same things for `CreateMap`.

Here are performance results of [DataFrame programs](6bf54ec5e2/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/PrimitiveArrayBenchmark.scala (L83-L112)) by up to 17.9x over without this PR.

```
Without SPARK-16043
OpenJDK 64-Bit Server VM 1.8.0_91-b14 on Linux 4.4.11-200.fc22.x86_64
Intel Xeon E3-12xx v2 (Ivy Bridge)
Read a primitive array in DataFrame:     Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
Int                                           3805 / 4150          0.0      507308.9       1.0X
Double                                        3593 / 3852          0.0      479056.9       1.1X

With SPARK-16043
Read a primitive array in DataFrame:     Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
Int                                            213 /  271          0.0       28387.5       1.0X
Double                                         204 /  223          0.0       27250.9       1.0X
```
Note : #15780 is enabled for these measurements

An motivating example

``` java
val df = sparkContext.parallelize(Seq(0.0d, 1.0d), 1).toDF
df.selectExpr("Array(value + 1.1d, value + 2.2d)").show
```

Generated code without this PR

``` java
/* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */   private Object[] references;
/* 007 */   private scala.collection.Iterator[] inputs;
/* 008 */   private scala.collection.Iterator inputadapter_input;
/* 009 */   private UnsafeRow serializefromobject_result;
/* 010 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder serializefromobject_holder;
/* 011 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter serializefromobject_rowWriter;
/* 012 */   private Object[] project_values;
/* 013 */   private UnsafeRow project_result;
/* 014 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder project_holder;
/* 015 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter project_rowWriter;
/* 016 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter project_arrayWriter;
/* 017 */
/* 018 */   public GeneratedIterator(Object[] references) {
/* 019 */     this.references = references;
/* 020 */   }
/* 021 */
/* 022 */   public void init(int index, scala.collection.Iterator[] inputs) {
/* 023 */     partitionIndex = index;
/* 024 */     this.inputs = inputs;
/* 025 */     inputadapter_input = inputs[0];
/* 026 */     serializefromobject_result = new UnsafeRow(1);
/* 027 */     this.serializefromobject_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(serializefromobject_result, 0);
/* 028 */     this.serializefromobject_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(serializefromobject_holder, 1);
/* 029 */     this.project_values = null;
/* 030 */     project_result = new UnsafeRow(1);
/* 031 */     this.project_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(project_result, 32);
/* 032 */     this.project_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(project_holder, 1);
/* 033 */     this.project_arrayWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter();
/* 034 */
/* 035 */   }
/* 036 */
/* 037 */   protected void processNext() throws java.io.IOException {
/* 038 */     while (inputadapter_input.hasNext()) {
/* 039 */       InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
/* 040 */       double inputadapter_value = inputadapter_row.getDouble(0);
/* 041 */
/* 042 */       final boolean project_isNull = false;
/* 043 */       this.project_values = new Object[2];
/* 044 */       boolean project_isNull1 = false;
/* 045 */
/* 046 */       double project_value1 = -1.0;
/* 047 */       project_value1 = inputadapter_value + 1.1D;
/* 048 */       if (false) {
/* 049 */         project_values[0] = null;
/* 050 */       } else {
/* 051 */         project_values[0] = project_value1;
/* 052 */       }
/* 053 */
/* 054 */       boolean project_isNull4 = false;
/* 055 */
/* 056 */       double project_value4 = -1.0;
/* 057 */       project_value4 = inputadapter_value + 2.2D;
/* 058 */       if (false) {
/* 059 */         project_values[1] = null;
/* 060 */       } else {
/* 061 */         project_values[1] = project_value4;
/* 062 */       }
/* 063 */
/* 064 */       final ArrayData project_value = new org.apache.spark.sql.catalyst.util.GenericArrayData(project_values);
/* 065 */       this.project_values = null;
/* 066 */       project_holder.reset();
/* 067 */
/* 068 */       project_rowWriter.zeroOutNullBytes();
/* 069 */
/* 070 */       if (project_isNull) {
/* 071 */         project_rowWriter.setNullAt(0);
/* 072 */       } else {
/* 073 */         // Remember the current cursor so that we can calculate how many bytes are
/* 074 */         // written later.
/* 075 */         final int project_tmpCursor = project_holder.cursor;
/* 076 */
/* 077 */         if (project_value instanceof UnsafeArrayData) {
/* 078 */           final int project_sizeInBytes = ((UnsafeArrayData) project_value).getSizeInBytes();
/* 079 */           // grow the global buffer before writing data.
/* 080 */           project_holder.grow(project_sizeInBytes);
/* 081 */           ((UnsafeArrayData) project_value).writeToMemory(project_holder.buffer, project_holder.cursor);
/* 082 */           project_holder.cursor += project_sizeInBytes;
/* 083 */
/* 084 */         } else {
/* 085 */           final int project_numElements = project_value.numElements();
/* 086 */           project_arrayWriter.initialize(project_holder, project_numElements, 8);
/* 087 */
/* 088 */           for (int project_index = 0; project_index < project_numElements; project_index++) {
/* 089 */             if (project_value.isNullAt(project_index)) {
/* 090 */               project_arrayWriter.setNullDouble(project_index);
/* 091 */             } else {
/* 092 */               final double project_element = project_value.getDouble(project_index);
/* 093 */               project_arrayWriter.write(project_index, project_element);
/* 094 */             }
/* 095 */           }
/* 096 */         }
/* 097 */
/* 098 */         project_rowWriter.setOffsetAndSize(0, project_tmpCursor, project_holder.cursor - project_tmpCursor);
/* 099 */       }
/* 100 */       project_result.setTotalSize(project_holder.totalSize());
/* 101 */       append(project_result);
/* 102 */       if (shouldStop()) return;
/* 103 */     }
/* 104 */   }
/* 105 */ }
```

Generated code with this PR

``` java
/* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */   private Object[] references;
/* 007 */   private scala.collection.Iterator[] inputs;
/* 008 */   private scala.collection.Iterator inputadapter_input;
/* 009 */   private UnsafeRow serializefromobject_result;
/* 010 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder serializefromobject_holder;
/* 011 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter serializefromobject_rowWriter;
/* 012 */   private UnsafeArrayData project_arrayData;
/* 013 */   private UnsafeRow project_result;
/* 014 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder project_holder;
/* 015 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter project_rowWriter;
/* 016 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter project_arrayWriter;
/* 017 */
/* 018 */   public GeneratedIterator(Object[] references) {
/* 019 */     this.references = references;
/* 020 */   }
/* 021 */
/* 022 */   public void init(int index, scala.collection.Iterator[] inputs) {
/* 023 */     partitionIndex = index;
/* 024 */     this.inputs = inputs;
/* 025 */     inputadapter_input = inputs[0];
/* 026 */     serializefromobject_result = new UnsafeRow(1);
/* 027 */     this.serializefromobject_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(serializefromobject_result, 0);
/* 028 */     this.serializefromobject_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(serializefromobject_holder, 1);
/* 029 */
/* 030 */     project_result = new UnsafeRow(1);
/* 031 */     this.project_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(project_result, 32);
/* 032 */     this.project_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(project_holder, 1);
/* 033 */     this.project_arrayWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter();
/* 034 */
/* 035 */   }
/* 036 */
/* 037 */   protected void processNext() throws java.io.IOException {
/* 038 */     while (inputadapter_input.hasNext()) {
/* 039 */       InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
/* 040 */       double inputadapter_value = inputadapter_row.getDouble(0);
/* 041 */
/* 042 */       byte[] project_array = new byte[32];
/* 043 */       project_arrayData = new UnsafeArrayData();
/* 044 */       Platform.putLong(project_array, 16, 2);
/* 045 */       project_arrayData.pointTo(project_array, 16, 32);
/* 046 */
/* 047 */       boolean project_isNull1 = false;
/* 048 */
/* 049 */       double project_value1 = -1.0;
/* 050 */       project_value1 = inputadapter_value + 1.1D;
/* 051 */       if (false) {
/* 052 */         project_arrayData.setNullAt(0);
/* 053 */       } else {
/* 054 */         project_arrayData.setDouble(0, project_value1);
/* 055 */       }
/* 056 */
/* 057 */       boolean project_isNull4 = false;
/* 058 */
/* 059 */       double project_value4 = -1.0;
/* 060 */       project_value4 = inputadapter_value + 2.2D;
/* 061 */       if (false) {
/* 062 */         project_arrayData.setNullAt(1);
/* 063 */       } else {
/* 064 */         project_arrayData.setDouble(1, project_value4);
/* 065 */       }
/* 066 */       project_holder.reset();
/* 067 */
/* 068 */       // Remember the current cursor so that we can calculate how many bytes are
/* 069 */       // written later.
/* 070 */       final int project_tmpCursor = project_holder.cursor;
/* 071 */
/* 072 */       if (project_arrayData instanceof UnsafeArrayData) {
/* 073 */         final int project_sizeInBytes = ((UnsafeArrayData) project_arrayData).getSizeInBytes();
/* 074 */         // grow the global buffer before writing data.
/* 075 */         project_holder.grow(project_sizeInBytes);
/* 076 */         ((UnsafeArrayData) project_arrayData).writeToMemory(project_holder.buffer, project_holder.cursor);
/* 077 */         project_holder.cursor += project_sizeInBytes;
/* 078 */
/* 079 */       } else {
/* 080 */         final int project_numElements = project_arrayData.numElements();
/* 081 */         project_arrayWriter.initialize(project_holder, project_numElements, 8);
/* 082 */
/* 083 */         for (int project_index = 0; project_index < project_numElements; project_index++) {
/* 084 */           if (project_arrayData.isNullAt(project_index)) {
/* 085 */             project_arrayWriter.setNullDouble(project_index);
/* 086 */           } else {
/* 087 */             final double project_element = project_arrayData.getDouble(project_index);
/* 088 */             project_arrayWriter.write(project_index, project_element);
/* 089 */           }
/* 090 */         }
/* 091 */       }
/* 092 */
/* 093 */       project_rowWriter.setOffsetAndSize(0, project_tmpCursor, project_holder.cursor - project_tmpCursor);
/* 094 */       project_result.setTotalSize(project_holder.totalSize());
/* 095 */       append(project_result);
/* 096 */       if (shouldStop()) return;
/* 097 */     }
/* 098 */   }
/* 099 */ }
```
## How was this patch tested?

Added unit tests into `DataFrameComplexTypeSuite`

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>
Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #13909 from kiszk/SPARK-16213.
2016-12-29 10:59:37 +08:00
Wenchen Fan 6ddbf467b4 [SPARK-18999][SQL][MINOR] simplify Literal codegen
## What changes were proposed in this pull request?

`Literal` can use `CodegenContex.addReferenceObj` to implement codegen, instead of `CodegenFallback`.  This can also simplify the generated code a little bit, before we will generate: `((Expression) references[1]).eval(null)`, now it's just `references[1]`.

## How was this patch tested?

N/A

Author: Wenchen Fan <wenchen@databricks.com>

Closes #16402 from cloud-fan/minor.
2016-12-27 06:22:12 -08:00
Wenchen Fan 8a7db8a608 [SPARK-18980][SQL] implement Aggregator with TypedImperativeAggregate
## What changes were proposed in this pull request?

Currently we implement `Aggregator` with `DeclarativeAggregate`, which will serialize/deserialize the buffer object every time we process an input.

This PR implements `Aggregator` with `TypedImperativeAggregate` and avoids to serialize/deserialize buffer object many times. The benchmark shows we get about 2 times speed up.

For simple buffer object that doesn't need serialization, we still go with `DeclarativeAggregate`, to avoid performance regression.

## How was this patch tested?

N/A

Author: Wenchen Fan <wenchen@databricks.com>

Closes #16383 from cloud-fan/aggregator.
2016-12-26 22:10:20 +08:00
wangzhenhua 3cff816157 [SPARK-18911][SQL] Define CatalogStatistics to interact with metastore and convert it to Statistics in relations
## What changes were proposed in this pull request?

Statistics in LogicalPlan should use attributes to refer to columns rather than column names, because two columns from two relations can have the same column name. But CatalogTable doesn't have the concepts of attribute or broadcast hint in Statistics. Therefore, putting Statistics in CatalogTable is confusing.

We define a different statistic structure in CatalogTable, which is only responsible for interacting with metastore, and is converted to statistics in LogicalPlan when it is used.

## How was this patch tested?

add test cases

Author: wangzhenhua <wangzhenhua@huawei.com>
Author: Zhenhua Wang <wzh_zju@163.com>

Closes #16323 from wzhfy/nameToAttr.
2016-12-24 15:34:44 +08:00
Reynold Xin 2615100055 [SPARK-18973][SQL] Remove SortPartitions and RedistributeData
## What changes were proposed in this pull request?
SortPartitions and RedistributeData logical operators are not actually used and can be removed. Note that we do have a Sort operator (with global flag false) that subsumed SortPartitions.

## How was this patch tested?
Also updated test cases to reflect the removal.

Author: Reynold Xin <rxin@databricks.com>

Closes #16381 from rxin/SPARK-18973.
2016-12-22 19:35:09 +01:00
Tathagata Das 83a6ace0d1 [SPARK-18234][SS] Made update mode public
## What changes were proposed in this pull request?

Made update mode public. As part of that here are the changes.
- Update DatastreamWriter to accept "update"
- Changed package of InternalOutputModes from o.a.s.sql to o.a.s.sql.catalyst
- Added update mode state removing with watermark to StateStoreSaveExec

## How was this patch tested?

Added new tests in changed modules

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #16360 from tdas/SPARK-18234.
2016-12-21 16:43:17 -08:00
Ryan Williams afd9bc1d8a [SPARK-17807][CORE] split test-tags into test-JAR
Remove spark-tag's compile-scope dependency (and, indirectly, spark-core's compile-scope transitive-dependency) on scalatest by splitting test-oriented tags into spark-tags' test JAR.

Alternative to #16303.

Author: Ryan Williams <ryan.blake.williams@gmail.com>

Closes #16311 from ryan-williams/tt.
2016-12-21 16:37:20 -08:00
Wenchen Fan f923c849e5 [SPARK-18899][SPARK-18912][SPARK-18913][SQL] refactor the error checking when append data to an existing table
## What changes were proposed in this pull request?

When we append data to an existing table with `DataFrameWriter.saveAsTable`, we will do various checks to make sure the appended data is consistent with the existing data.

However, we get the information of the existing table by matching the table relation, instead of looking at the table metadata. This is error-prone, e.g. we only check the number of columns for `HadoopFsRelation`, we forget to check bucketing, etc.

This PR refactors the error checking by looking at the metadata of the existing table, and fix several bugs:
* SPARK-18899: We forget to check if the specified bucketing matched the existing table, which may lead to a problematic table that has different bucketing in different data files.
* SPARK-18912: We forget to check the number of columns for non-file-based data source table
* SPARK-18913: We don't support append data to a table with special column names.

## How was this patch tested?
new regression test.

Author: Wenchen Fan <wenchen@databricks.com>

Closes #16313 from cloud-fan/bug1.
2016-12-19 20:03:33 -08:00
jiangxingbo 70d495dcec [SPARK-18624][SQL] Implicit cast ArrayType(InternalType)
## What changes were proposed in this pull request?

Currently `ImplicitTypeCasts` doesn't handle casts between `ArrayType`s, this is not convenient, we should add a rule to enable casting from `ArrayType(InternalType)` to `ArrayType(newInternalType)`.

Goals:
1. Add a rule to `ImplicitTypeCasts` to enable casting between `ArrayType`s;
2. Simplify `Percentile` and `ApproximatePercentile`.

## How was this patch tested?

Updated test cases in `TypeCoercionSuite`.

Author: jiangxingbo <jiangxb1987@gmail.com>

Closes #16057 from jiangxb1987/implicit-cast-complex-types.
2016-12-19 21:20:47 +01:00
Reynold Xin 172a52f5d3 [SPARK-18892][SQL] Alias percentile_approx approx_percentile
## What changes were proposed in this pull request?
percentile_approx is the name used in Hive, and approx_percentile is the name used in Presto. approx_percentile is actually more consistent with our approx_count_distinct. Given the cost to alias SQL functions is low (one-liner), it'd be better to just alias them so it is easier to use.

## How was this patch tested?
Technically I could add an end-to-end test to verify this one-line change, but it seemed too trivial to me.

Author: Reynold Xin <rxin@databricks.com>

Closes #16300 from rxin/SPARK-18892.
2016-12-15 21:58:27 -08:00
Tathagata Das 4f7292c875 [SPARK-18870] Disallowed Distinct Aggregations on Streaming Datasets
## What changes were proposed in this pull request?

Check whether Aggregation operators on a streaming subplan have aggregate expressions with isDistinct = true.

## How was this patch tested?

Added unit test

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #16289 from tdas/SPARK-18870.
2016-12-15 11:54:35 -08:00
jiangxingbo 01e14bf303 [SPARK-17910][SQL] Allow users to update the comment of a column
## What changes were proposed in this pull request?

Right now, once a user set the comment of a column with create table command, he/she cannot update the comment. It will be useful to provide a public interface (e.g. SQL) to do that.

This PR implements the following SQL statement:
```
ALTER TABLE table [PARTITION partition_spec]
CHANGE [COLUMN] column_old_name column_new_name column_dataType
[COMMENT column_comment]
[FIRST | AFTER column_name];
```

For further expansion, we could support alter `name`/`dataType`/`index` of a column too.

## How was this patch tested?

Add new test cases in `ExternalCatalogSuite` and `SessionCatalogSuite`.
Add sql file test for `ALTER TABLE CHANGE COLUMN` statement.

Author: jiangxingbo <jiangxb1987@gmail.com>

Closes #15717 from jiangxb1987/change-column.
2016-12-15 10:09:42 -08:00
Reynold Xin 5d510c693a [SPARK-18869][SQL] Add TreeNode.p that returns BaseType
## What changes were proposed in this pull request?
After the bug fix in SPARK-18854, TreeNode.apply now returns TreeNode[_] rather than a more specific type. It would be easier for interactive debugging to introduce a function that returns the BaseType.

## How was this patch tested?
N/A - this is a developer only feature used for interactive debugging. As long as it compiles, it should be good to go. I tested this in spark-shell.

Author: Reynold Xin <rxin@databricks.com>

Closes #16288 from rxin/SPARK-18869.
2016-12-14 21:08:45 -08:00
Reynold Xin ffdd1fcd1e [SPARK-18854][SQL] numberedTreeString and apply(i) inconsistent for subqueries
## What changes were proposed in this pull request?
This is a bug introduced by subquery handling. numberedTreeString (which uses generateTreeString under the hood) numbers trees including innerChildren (used to print subqueries), but apply (which uses getNodeNumbered) ignores innerChildren. As a result, apply(i) would return the wrong plan node if there are subqueries.

This patch fixes the bug.

## How was this patch tested?
Added a test case in SubquerySuite.scala to test both the depth-first traversal of numbering as well as making sure the two methods are consistent.

Author: Reynold Xin <rxin@databricks.com>

Closes #16277 from rxin/SPARK-18854.
2016-12-14 16:12:14 -08:00
Reynold Xin 5d79947369 [SPARK-18853][SQL] Project (UnaryNode) is way too aggressive in estimating statistics
## What changes were proposed in this pull request?
This patch reduces the default number element estimation for arrays and maps from 100 to 1. The issue with the 100 number is that when nested (e.g. an array of map), 100 * 100 would be used as the default size. This sounds like just an overestimation which doesn't seem that bad (since it is usually better to overestimate than underestimate). However, due to the way we assume the size output for Project (new estimated column size / old estimated column size), this overestimation can become underestimation. It is actually in general in this case safer to assume 1 default element.

## How was this patch tested?
This should be covered by existing tests.

Author: Reynold Xin <rxin@databricks.com>

Closes #16274 from rxin/SPARK-18853.
2016-12-14 21:22:49 +01:00
Nattavut Sutyanyong cccd64393e [SPARK-18814][SQL] CheckAnalysis rejects TPCDS query 32
## What changes were proposed in this pull request?
Move the checking of GROUP BY column in correlated scalar subquery from CheckAnalysis
to Analysis to fix a regression caused by SPARK-18504.

This problem can be reproduced with a simple script now.

Seq((1,1)).toDF("pk","pv").createOrReplaceTempView("p")
Seq((1,1)).toDF("ck","cv").createOrReplaceTempView("c")
sql("select * from p,c where p.pk=c.ck and c.cv = (select avg(c1.cv) from c c1 where c1.ck = p.pk)").show

The requirements are:
1. We need to reference the same table twice in both the parent and the subquery. Here is the table c.
2. We need to have a correlated predicate but to a different table. Here is from c (as c1) in the subquery to p in the parent.
3. We will then "deduplicate" c1.ck in the subquery to `ck#<n1>#<n2>` at `Project` above `Aggregate` of `avg`. Then when we compare `ck#<n1>#<n2>` and the original group by column `ck#<n1>` by their canonicalized form, which is #<n2> != #<n1>. That's how we trigger the exception added in SPARK-18504.

## How was this patch tested?

SubquerySuite and a simplified version of TPCDS-Q32

Author: Nattavut Sutyanyong <nsy.can@gmail.com>

Closes #16246 from nsyca/18814.
2016-12-14 11:09:31 +01:00
Wenchen Fan 3e307b4959 [SPARK-18566][SQL] remove OverwriteOptions
## What changes were proposed in this pull request?

`OverwriteOptions` was introduced in https://github.com/apache/spark/pull/15705, to carry the information of static partitions. However, after further refactor, this information becomes duplicated and we can remove `OverwriteOptions`.

## How was this patch tested?

N/A

Author: Wenchen Fan <wenchen@databricks.com>

Closes #15995 from cloud-fan/overwrite.
2016-12-14 11:30:34 +08:00
Marcelo Vanzin 3ae63b808a [SPARK-18752][SQL] Follow-up: add scaladoc explaining isSrcLocal arg.
Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #16257 from vanzin/SPARK-18752.2.
2016-12-13 17:55:38 -08:00
jiangxingbo 5572ccf86b [SPARK-17932][SQL][FOLLOWUP] Change statement SHOW TABLES EXTENDED to SHOW TABLE EXTENDED
## What changes were proposed in this pull request?

Change the statement `SHOW TABLES [EXTENDED] [(IN|FROM) database_name] [[LIKE] 'identifier_with_wildcards'] [PARTITION(partition_spec)]` to the following statements:

- SHOW TABLES [(IN|FROM) database_name] [[LIKE] 'identifier_with_wildcards']
- SHOW TABLE EXTENDED [(IN|FROM) database_name] LIKE 'identifier_with_wildcards' [PARTITION(partition_spec)]

After this change, the statements `SHOW TABLE/SHOW TABLES` have the same syntax with that HIVE has.

## How was this patch tested?
Modified the test sql file `show-tables.sql`;
Modified the test suite `DDLSuite`.

Author: jiangxingbo <jiangxb1987@gmail.com>

Closes #16262 from jiangxb1987/show-table-extended.
2016-12-13 19:04:34 +01:00
Marcelo Vanzin f280ccf449 [SPARK-18835][SQL] Don't expose Guava types in the JavaTypeInference API.
This avoids issues during maven tests because of shading.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #16260 from vanzin/SPARK-18835.
2016-12-13 10:02:19 -08:00
Andrew Ray 46d30ac484 [SPARK-18717][SQL] Make code generation for Scala Map work with immutable.Map also
## What changes were proposed in this pull request?

Fixes compile errors in generated code when user has case class with a `scala.collections.immutable.Map` instead of a `scala.collections.Map`. Since ArrayBasedMapData.toScalaMap returns the immutable version we can make it work with both.

## How was this patch tested?

Additional unit tests.

Author: Andrew Ray <ray.andrew@gmail.com>

Closes #16161 from aray/fix-map-codegen.
2016-12-13 15:49:22 +08:00
Marcelo Vanzin 476b34c23a [SPARK-18752][HIVE] isSrcLocal" value should be set from user query.
The value of the "isSrcLocal" parameter passed to Hive's loadTable and
loadPartition methods needs to be set according to the user query (e.g.
"LOAD DATA LOCAL"), and not the current code that tries to guess what
it should be.

For existing versions of Hive the current behavior is probably ok, but
some recent changes in the Hive code changed the semantics slightly,
making code that sets "isSrcLocal" to "true" incorrectly to do the
wrong thing. It would end up moving the parent directory of the files
into the final location, instead of the file themselves, resulting
in a table that cannot be read.

I modified HiveCommandSuite so that existing "LOAD DATA" tests are run
both in local and non-local mode, since the semantics are slightly different.
The tests include a few new checks to make sure the semantics follow
what Hive describes in its documentation.

Tested with existing unit tests and also ran some Hive integration tests
with a version of Hive containing the changes that surfaced the problem.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #16179 from vanzin/SPARK-18752.
2016-12-12 14:19:42 -08:00
Wenchen Fan 9abd05b6b9
[SQL][MINOR] simplify a test to fix the maven tests
## What changes were proposed in this pull request?

After https://github.com/apache/spark/pull/15620 , all of the Maven-based 2.0 Jenkins jobs time out consistently. As I pointed out in https://github.com/apache/spark/pull/15620#discussion_r91829129 , it seems that the regression test is an overkill and may hit constants pool size limitation, which is a known issue and hasn't been fixed yet.

Since #15620 only fix the code size limitation problem, we can simplify the test to avoid hitting constants pool size limitation.

## How was this patch tested?

test only change

Author: Wenchen Fan <wenchen@databricks.com>

Closes #16244 from cloud-fan/minor.
2016-12-11 09:12:46 +00:00
wangzhenhua a29ee55aaa [SPARK-18815][SQL] Fix NPE when collecting column stats for string/binary column having only null values
## What changes were proposed in this pull request?

During column stats collection, average and max length will be null if a column of string/binary type has only null values. To fix this, I use default size when avg/max length is null.

## How was this patch tested?

Add a test for handling null columns

Author: wangzhenhua <wangzhenhua@huawei.com>

Closes #16243 from wzhfy/nullStats.
2016-12-10 21:25:29 -08:00
Huaxin Gao c5172568b5 [SPARK-17460][SQL] Make sure sizeInBytes in Statistics will not overflow
## What changes were proposed in this pull request?

1. In SparkStrategies.canBroadcast, I will add the check   plan.statistics.sizeInBytes >= 0
2. In LocalRelations.statistics, when calculate the statistics, I will change the size to BigInt so it won't overflow.

## How was this patch tested?

I will add a test case to make sure the statistics.sizeInBytes won't overflow.

Author: Huaxin Gao <huaxing@us.ibm.com>

Closes #16175 from huaxingao/spark-17460.
2016-12-10 22:41:40 +08:00
Jacek Laskowski b162cc0c28
[MINOR][CORE][SQL][DOCS] Typo fixes
## What changes were proposed in this pull request?

Typo fixes

## How was this patch tested?

Local build. Awaiting the official build.

Author: Jacek Laskowski <jacek@japila.pl>

Closes #16144 from jaceklaskowski/typo-fixes.
2016-12-09 18:45:57 +08:00
Nathan Howell bec0a9217b [SPARK-18654][SQL] Remove unreachable patterns in makeRootConverter
## What changes were proposed in this pull request?

`makeRootConverter` is only called with a `StructType` value. By making this method less general we can remove pattern matches, which are never actually hit outside of the test suite.

## How was this patch tested?

The existing tests.

Author: Nathan Howell <nhowell@godaddy.com>

Closes #16084 from NathanHowell/SPARK-18654.
2016-12-07 16:52:05 -08:00
Andrew Ray f1fca81b16 [SPARK-17760][SQL] AnalysisException with dataframe pivot when groupBy column is not attribute
## What changes were proposed in this pull request?

Fixes AnalysisException for pivot queries that have group by columns that are expressions and not attributes by substituting the expressions output attribute in the second aggregation and final projection.

## How was this patch tested?

existing and additional unit tests

Author: Andrew Ray <ray.andrew@gmail.com>

Closes #16177 from aray/SPARK-17760.
2016-12-07 04:44:14 -08:00
Herman van Hovell 381ef4ea76 [SPARK-18634][SQL][TRIVIAL] Touch-up Generate
## What changes were proposed in this pull request?
I jumped the gun on merging https://github.com/apache/spark/pull/16120, and missed a tiny potential problem. This PR fixes that by changing a val into a def; this should prevent potential serialization/initialization weirdness from happening.

## How was this patch tested?
Existing tests.

Author: Herman van Hovell <hvanhovell@databricks.com>

Closes #16170 from hvanhovell/SPARK-18634.
2016-12-06 05:51:39 -08:00
Michael Allman 772ddbeaa6 [SPARK-18572][SQL] Add a method listPartitionNames to ExternalCatalog
(Link to Jira issue: https://issues.apache.org/jira/browse/SPARK-18572)

## What changes were proposed in this pull request?

Currently Spark answers the `SHOW PARTITIONS` command by fetching all of the table's partition metadata from the external catalog and constructing partition names therefrom. The Hive client has a `getPartitionNames` method which is many times faster for this purpose, with the performance improvement scaling with the number of partitions in a table.

To test the performance impact of this PR, I ran the `SHOW PARTITIONS` command on two Hive tables with large numbers of partitions. One table has ~17,800 partitions, and the other has ~95,000 partitions. For the purposes of this PR, I'll call the former table `table1` and the latter table `table2`. I ran 5 trials for each table with before-and-after versions of this PR. The results are as follows:

Spark at bdc8153, `SHOW PARTITIONS table1`, times in seconds:
7.901
3.983
4.018
4.331
4.261

Spark at bdc8153, `SHOW PARTITIONS table2`
(Timed out after 10 minutes with a `SocketTimeoutException`.)

Spark at this PR, `SHOW PARTITIONS table1`, times in seconds:
3.801
0.449
0.395
0.348
0.336

Spark at this PR, `SHOW PARTITIONS table2`, times in seconds:
5.184
1.63
1.474
1.519
1.41

Taking the best times from each trial, we get a 12x performance improvement for a table with ~17,800 partitions and at least a 426x improvement for a table with ~95,000 partitions. More significantly, the latter command doesn't even complete with the current code in master.

This is actually a patch we've been using in-house at VideoAmp since Spark 1.1. It's made all the difference in the practical usability of our largest tables. Even with tables with about 1,000 partitions there's a performance improvement of about 2-3x.

## How was this patch tested?

I added a unit test to `VersionsSuite` which tests that the Hive client's `getPartitionNames` method returns the correct number of partitions.

Author: Michael Allman <michael@videoamp.com>

Closes #15998 from mallman/spark-18572-list_partition_names.
2016-12-06 11:33:35 +08:00
Liang-Chi Hsieh 3ba69b6485 [SPARK-18634][PYSPARK][SQL] Corruption and Correctness issues with exploding Python UDFs
## What changes were proposed in this pull request?

As reported in the Jira, there are some weird issues with exploding Python UDFs in SparkSQL.

The following test code can reproduce it. Notice: the following test code is reported to return wrong results in the Jira. However, as I tested on master branch, it causes exception and so can't return any result.

    >>> from pyspark.sql.functions import *
    >>> from pyspark.sql.types import *
    >>>
    >>> df = spark.range(10)
    >>>
    >>> def return_range(value):
    ...   return [(i, str(i)) for i in range(value - 1, value + 1)]
    ...
    >>> range_udf = udf(return_range, ArrayType(StructType([StructField("integer_val", IntegerType()),
    ...                                                     StructField("string_val", StringType())])))
    >>>
    >>> df.select("id", explode(range_udf(df.id))).show()
    Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
      File "/spark/python/pyspark/sql/dataframe.py", line 318, in show
        print(self._jdf.showString(n, 20))
      File "/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
      File "/spark/python/pyspark/sql/utils.py", line 63, in deco
        return f(*a, **kw)
      File "/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o126.showString.: java.lang.AssertionError: assertion failed
        at scala.Predef$.assert(Predef.scala:156)
        at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:120)
        at org.apache.spark.sql.execution.GenerateExec.consume(GenerateExec.scala:57)

The cause of this issue is, in `ExtractPythonUDFs` we insert `BatchEvalPythonExec` to run PythonUDFs in batch. `BatchEvalPythonExec` will add extra outputs (e.g., `pythonUDF0`) to original plan. In above case, the original `Range` only has one output `id`. After `ExtractPythonUDFs`, the added `BatchEvalPythonExec` has two outputs `id` and `pythonUDF0`.

Because the output of `GenerateExec` is given after analysis phase, in above case, it is the combination of `id`, i.e., the output of `Range`, and `col`. But in planning phase, we change `GenerateExec`'s child plan to `BatchEvalPythonExec` with additional output attributes.

It will cause no problem in non wholestage codegen. Because when evaluating the additional attributes are projected out the final output of `GenerateExec`.

However, as `GenerateExec` now supports wholestage codegen, the framework will input all the outputs of the child plan to `GenerateExec`. Then when consuming `GenerateExec`'s output data (i.e., calling `consume`), the number of output attributes is different to the output variables in wholestage codegen.

To solve this issue, this patch only gives the generator's output to `GenerateExec` after analysis phase. `GenerateExec`'s output is the combination of its child plan's output and the generator's output. So when we change `GenerateExec`'s child, its output is still correct.

## How was this patch tested?

Added test cases to PySpark.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #16120 from viirya/fix-py-udf-with-generator.
2016-12-05 17:50:43 -08:00
Wenchen Fan 01a7d33d08 [SPARK-18711][SQL] should disable subexpression elimination for LambdaVariable
## What changes were proposed in this pull request?

This is kind of a long-standing bug, it's hidden until https://github.com/apache/spark/pull/15780 , which may add `AssertNotNull` on top of `LambdaVariable` and thus enables subexpression elimination.

However, subexpression elimination will evaluate the common expressions at the beginning, which is invalid for `LambdaVariable`. `LambdaVariable` usually represents loop variable, which can't be evaluated ahead of the loop.

This PR skips expressions containing `LambdaVariable` when doing subexpression elimination.

## How was this patch tested?

updated test in `DatasetAggregatorSuite`

Author: Wenchen Fan <wenchen@databricks.com>

Closes #16143 from cloud-fan/aggregator.
2016-12-05 11:37:13 -08:00