Commit graph

3095 commits

Author SHA1 Message Date
Herman van Hovell 600c0b69ca [SPARK-13713][SQL] Migrate parser from ANTLR3 to ANTLR4
### What changes were proposed in this pull request?
The current ANTLR3 parser is quite complex to maintain and suffers from code blow-ups. This PR introduces a new parser that is based on ANTLR4.

This parser is based on the [Presto's SQL parser](https://github.com/facebook/presto/blob/master/presto-parser/src/main/antlr4/com/facebook/presto/sql/parser/SqlBase.g4). The current implementation can parse and create Catalyst and SQL plans. Large parts of the HiveQl DDL and some of the DML functionality is currently missing, the plan is to add this in follow-up PRs.

This PR is a work in progress, and work needs to be done in the following area's:

- [x] Error handling should be improved.
- [x] Documentation should be improved.
- [x] Multi-Insert needs to be tested.
- [ ] Naming and package locations.

### How was this patch tested?

Catalyst and SQL unit tests.

Author: Herman van Hovell <hvanhovell@questtec.nl>

Closes #11557 from hvanhovell/ngParser.
2016-03-28 12:31:12 -07:00
Liang-Chi Hsieh 1528ff4c9a [SPARK-14156][SQL] Use executedPlan in HiveComparisonTest for the messages of computed tables
## What changes were proposed in this pull request?
JIRA: https://issues.apache.org/jira/browse/SPARK-14156

In HiveComparisonTest, when catalyst results are different to hive results, we will collect the messages for computed tables during the test. During creating the message, we use sparkPlan. But we actually run the query with executedPlan. So the error message is sometimes confusing.

For example, as wholestage codegen is enabled by default now. The shown spark plan for computed tables is the plan before wholestage codegen.

A concrete is the following error message shown before this patch. It is the error shown when running `HiveCompatibilityTest` `auto_join26`.

auto_join26 has one SQL to create table:

    INSERT OVERWRITE TABLE dest_j1
    SELECT  x.key, count(1) FROM src1 x JOIN src y ON (x.key = y.key) group by x.key;   (1)

Then a SQL to retrieve the result:

    select * from dest_j1 x order by x.key;   (2)

When the above SQL (2) to retrieve the result fails, In `HiveComparisonTest` we will try to collect and show the generated data from table `dest_j1` using the SQL (1)'s spark plan. The you will see this error:

    TungstenAggregate(key=[key#8804], functions=[(count(1),mode=Partial,isDistinct=false)], output=[key#8804,count#8834L])
    +- Project [key#8804]
       +- BroadcastHashJoin [key#8804], [key#8806], Inner, BuildRight, None
          :- Filter isnotnull(key#8804)
          :  +- InMemoryColumnarTableScan [key#8804], [isnotnull(key#8804)], InMemoryRelation [key#8804,value#8805], true, 5, StorageLevel(true, true, false, true, 1), HiveTableScan [key#8717,value#8718], MetastoreRelation default, src1, None, Some(src1)
          +- Filter isnotnull(key#8806)
             +- InMemoryColumnarTableScan [key#8806], [isnotnull(key#8806)], InMemoryRelation [key#8806,value#8807], true, 5, StorageLevel(true, true, false, true, 1), HiveTableScan [key#8760,value#8761], MetastoreRelation default, src, None, Some(src)

	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:47)
	at org.apache.spark.sql.execution.aggregate.TungstenAggregate.doExecute(TungstenAggregate.scala:82)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:121)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:121)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:140)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:137)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:120)
	at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.apply(TungstenAggregate.scala:87)
	at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.apply(TungstenAggregate.scala:82)
	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:46)
	... 70 more
    Caused by: java.lang.UnsupportedOperationException: Filter does not implement doExecuteBroadcast
	at org.apache.spark.sql.execution.SparkPlan.doExecuteBroadcast(SparkPlan.scala:221)

The message is confusing because it is not the plan actually run by SparkSQL engine to create the generated table. The plan actually run is no problem. But as before this patch, we run `e.sparkPlan.collect` to retrieve and show the generated data, spark plan is not the plan we can run. So the above error will be shown.

After this patch, we won't see the error because the executed plan is no problem and works.

## How was this patch tested?
Existing tests.

Author: Liang-Chi Hsieh <simonh@tw.ibm.com>

Closes #11957 from viirya/use-executedplan.
2016-03-28 10:43:54 -07:00
Kazuaki Ishizaki 4a7636f2da [SPARK-13844] [SQL] Generate better code for filters with a non-nullable column
## What changes were proposed in this pull request?

This PR simplifies generated code with a non-nullable column. This PR addresses three items:
1. Generate simplified code for and / or
2. Generate better code for divide and remainder with non-zero dividend
3. Pass nullable information into BoundReference at WholeStageCodegen

I have attached the generated code with and without this PR

## How was this patch tested?

Tested by existing test suites in sql/core

Here is a motivating example
````
(0 to 6).map(i => (i.toString, i.toInt)).toDF("k", "v")
  .filter("v % 2 == 0").filter("v <= 4").filter("v > 1").show()
````

Generated code without this PR
````java
/* 032 */   protected void processNext() throws java.io.IOException {
/* 033 */     /*** PRODUCE: Project [_1#0 AS k#3,_2#1 AS v#4] */
/* 034 */
/* 035 */     /*** PRODUCE: Filter ((isnotnull((_2#1 % 2)) && ((_2#1 % 2) = 0)) && ((_2#1 <= 4) && (_2#1 > 1))) */
/* 036 */
/* 037 */     /*** PRODUCE: INPUT */
/* 038 */
/* 039 */     while (!shouldStop() && inputadapter_input.hasNext()) {
/* 040 */       InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
/* 041 */       /*** CONSUME: Filter ((isnotnull((_2#1 % 2)) && ((_2#1 % 2) = 0)) && ((_2#1 <= 4) && (_2#1 > 1))) */
/* 042 */       /* input[1, int] */
/* 043 */       int filter_value1 = inputadapter_row.getInt(1);
/* 044 */
/* 045 */       /* isnotnull((input[1, int] % 2)) */
/* 046 */       /* (input[1, int] % 2) */
/* 047 */       boolean filter_isNull3 = false;
/* 048 */       int filter_value3 = -1;
/* 049 */       if (false || 2 == 0) {
/* 050 */         filter_isNull3 = true;
/* 051 */       } else {
/* 052 */         if (false) {
/* 053 */           filter_isNull3 = true;
/* 054 */         } else {
/* 055 */           filter_value3 = (int)(filter_value1 % 2);
/* 056 */         }
/* 057 */       }
/* 058 */       if (!(!(filter_isNull3))) continue;
/* 059 */
/* 060 */       /* ((input[1, int] % 2) = 0) */
/* 061 */       boolean filter_isNull6 = true;
/* 062 */       boolean filter_value6 = false;
/* 063 */       /* (input[1, int] % 2) */
/* 064 */       boolean filter_isNull7 = false;
/* 065 */       int filter_value7 = -1;
/* 066 */       if (false || 2 == 0) {
/* 067 */         filter_isNull7 = true;
/* 068 */       } else {
/* 069 */         if (false) {
/* 070 */           filter_isNull7 = true;
/* 071 */         } else {
/* 072 */           filter_value7 = (int)(filter_value1 % 2);
/* 073 */         }
/* 074 */       }
/* 075 */       if (!filter_isNull7) {
/* 076 */         filter_isNull6 = false; // resultCode could change nullability.
/* 077 */         filter_value6 = filter_value7 == 0;
/* 078 */
/* 079 */       }
/* 080 */       if (filter_isNull6 || !filter_value6) continue;
/* 081 */
/* 082 */       /* (input[1, int] <= 4) */
/* 083 */       boolean filter_value11 = false;
/* 084 */       filter_value11 = filter_value1 <= 4;
/* 085 */       if (!filter_value11) continue;
/* 086 */
/* 087 */       /* (input[1, int] > 1) */
/* 088 */       boolean filter_value14 = false;
/* 089 */       filter_value14 = filter_value1 > 1;
/* 090 */       if (!filter_value14) continue;
/* 091 */
/* 092 */       filter_metricValue.add(1);
/* 093 */
/* 094 */       /*** CONSUME: Project [_1#0 AS k#3,_2#1 AS v#4] */
/* 095 */
/* 096 */       /* input[0, string] */
/* 097 */       /* input[0, string] */
/* 098 */       boolean filter_isNull = inputadapter_row.isNullAt(0);
/* 099 */       UTF8String filter_value = filter_isNull ? null : (inputadapter_row.getUTF8String(0));
/* 100 */       project_holder.reset();
/* 101 */
/* 102 */       project_rowWriter.zeroOutNullBytes();
/* 103 */
/* 104 */       if (filter_isNull) {
/* 105 */         project_rowWriter.setNullAt(0);
/* 106 */       } else {
/* 107 */         project_rowWriter.write(0, filter_value);
/* 108 */       }
/* 109 */
/* 110 */       project_rowWriter.write(1, filter_value1);
/* 111 */       project_result.setTotalSize(project_holder.totalSize());
/* 112 */       append(project_result.copy());
/* 113 */     }
/* 114 */   }
/* 115 */ }
````

Generated code with this PR
````java
/* 032 */   protected void processNext() throws java.io.IOException {
/* 033 */     /*** PRODUCE: Project [_1#0 AS k#3,_2#1 AS v#4] */
/* 034 */
/* 035 */     /*** PRODUCE: Filter (((_2#1 % 2) = 0) && ((_2#1 <= 5) && (_2#1 > 1))) */
/* 036 */
/* 037 */     /*** PRODUCE: INPUT */
/* 038 */
/* 039 */     while (!shouldStop() && inputadapter_input.hasNext()) {
/* 040 */       InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
/* 041 */       /*** CONSUME: Filter (((_2#1 % 2) = 0) && ((_2#1 <= 5) && (_2#1 > 1))) */
/* 042 */       /* input[1, int] */
/* 043 */       int filter_value1 = inputadapter_row.getInt(1);
/* 044 */
/* 045 */       /* ((input[1, int] % 2) = 0) */
/* 046 */       /* (input[1, int] % 2) */
/* 047 */       int filter_value3 = (int)(filter_value1 % 2);
/* 048 */
/* 049 */       boolean filter_value2 = false;
/* 050 */       filter_value2 = filter_value3 == 0;
/* 051 */       if (!filter_value2) continue;
/* 052 */
/* 053 */       /* (input[1, int] <= 5) */
/* 054 */       boolean filter_value7 = false;
/* 055 */       filter_value7 = filter_value1 <= 5;
/* 056 */       if (!filter_value7) continue;
/* 057 */
/* 058 */       /* (input[1, int] > 1) */
/* 059 */       boolean filter_value10 = false;
/* 060 */       filter_value10 = filter_value1 > 1;
/* 061 */       if (!filter_value10) continue;
/* 062 */
/* 063 */       filter_metricValue.add(1);
/* 064 */
/* 065 */       /*** CONSUME: Project [_1#0 AS k#3,_2#1 AS v#4] */
/* 066 */
/* 067 */       /* input[0, string] */
/* 068 */       /* input[0, string] */
/* 069 */       boolean filter_isNull = inputadapter_row.isNullAt(0);
/* 070 */       UTF8String filter_value = filter_isNull ? null : (inputadapter_row.getUTF8String(0));
/* 071 */       project_holder.reset();
/* 072 */
/* 073 */       project_rowWriter.zeroOutNullBytes();
/* 074 */
/* 075 */       if (filter_isNull) {
/* 076 */         project_rowWriter.setNullAt(0);
/* 077 */       } else {
/* 078 */         project_rowWriter.write(0, filter_value);
/* 079 */       }
/* 080 */
/* 081 */       project_rowWriter.write(1, filter_value1);
/* 082 */       project_result.setTotalSize(project_holder.totalSize());
/* 083 */       append(project_result.copy());
/* 084 */     }
/* 085 */   }
/* 086 */ }
````

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes #11684 from kiszk/SPARK-13844.
2016-03-28 10:35:48 -07:00
Kousuke Saruta aac13fb48c [SPARK-14185][SQL][MINOR] Make indentation of debug log for generated code proper
## What changes were proposed in this pull request?

The indentation of debug log output by `CodeGenerator` is weird.
The first line of the generated code should be put on the next line of the first line of the log message.

```
16/03/28 11:10:24 DEBUG CodeGenerator: /* 001 */
/* 002 */ public java.lang.Object generate(Object[] references) {
/* 003 */   return new SpecificSafeProjection(references);
...
```

After this patch is applied, we get debug log like as follows.

```
16/03/28 10:45:50 DEBUG CodeGenerator:
/* 001 */
/* 002 */ public java.lang.Object generate(Object[] references) {
/* 003 */   return new SpecificSafeProjection(references);
...
```
## How was this patch tested?

Ran some jobs and checked debug logs.

Author: Kousuke Saruta <sarutak@oss.nttdata.co.jp>

Closes #11990 from sarutak/fix-debuglog-indentation.
2016-03-27 23:50:23 -07:00
Dongjoon Hyun cfcca732b4 [MINOR][SQL] Fix substr/substring testcases.
## What changes were proposed in this pull request?

This PR fixes the following two testcases in order to test the correct usages.
```
checkSqlGeneration("SELECT substr('This is a test', 'is')")
checkSqlGeneration("SELECT substring('This is a test', 'is')")
```

Actually, the testcases works but tests on exceptional cases.
```
scala> sql("SELECT substr('This is a test', 'is')")
res0: org.apache.spark.sql.DataFrame = [substring(This is a test, CAST(is AS INT), 2147483647): string]

scala> sql("SELECT substr('This is a test', 'is')").collect()
res1: Array[org.apache.spark.sql.Row] = Array([null])
```

## How was this patch tested?

Pass the modified unit tests.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #11963 from dongjoon-hyun/fix_substr_testcase.
2016-03-27 20:06:02 +01:00
gatorsmile a01b6a92b5 [SPARK-14177][SQL] Native Parsing for DDL Command "Describe Database" and "Alter Database"
#### What changes were proposed in this pull request?

This PR is to provide native parsing support for two DDL commands:  ```Describe Database``` and ```Alter Database Set Properties```

Based on the Hive DDL document:
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL

##### 1. ALTER DATABASE
**Syntax:**
```SQL
ALTER (DATABASE|SCHEMA) database_name SET DBPROPERTIES (property_name=property_value, ...)
```
 - `ALTER DATABASE` is to add new (key, value) pairs into `DBPROPERTIES`

##### 2. DESCRIBE DATABASE
**Syntax:**
```SQL
DESCRIBE DATABASE [EXTENDED] db_name
```
 - `DESCRIBE DATABASE` shows the name of the database, its comment (if one has been set), and its root location on the filesystem. When `extended` is true, it also shows the database's properties

#### How was this patch tested?
Added the related test cases to `DDLCommandSuite`

Author: gatorsmile <gatorsmile@gmail.com>
Author: xiaoli <lixiao1983@gmail.com>
Author: Xiao Li <xiaoli@Xiaos-MacBook-Pro.local>

This patch had conflicts when merged, resolved by
Committer: Yin Huai <yhuai@databricks.com>

Closes #11977 from gatorsmile/parseAlterDatabase.
2016-03-26 20:12:30 -07:00
Liang-Chi Hsieh bc925b73a6 [SPARK-14157][SQL] Parse Drop Function DDL command
## What changes were proposed in this pull request?
JIRA: https://issues.apache.org/jira/browse/SPARK-14157

We only parse create function command. In order to support native drop function command, we need to parse it too.

From Hive [manual](https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-Create/Drop/ReloadFunction), the drop function command has syntax as:

DROP [TEMPORARY] FUNCTION [IF EXISTS] function_name;

## How was this patch tested?

Added test into `DDLCommandSuite`.

Author: Liang-Chi Hsieh <simonh@tw.ibm.com>

Closes #11959 from viirya/parse-drop-func.
2016-03-26 20:09:01 -07:00
Cheng Lian b547de8a60 [SPARK-14116][SQL] Implements buildReader() for ORC data source
## What changes were proposed in this pull request?

This PR implements `FileFormat.buildReader()` for our ORC data source. It also fixed several minor styling issues related to `HadoopFsRelation` planning code path.

Note that `OrcNewInputFormat` doesn't rely on `OrcNewSplit` for creating `OrcRecordReader`s, plain `FileSplit` is just fine. That's why we can simply create the record reader with the help of `OrcNewInputFormat` and `FileSplit`.

## How was this patch tested?

Existing test cases should do the work

Author: Cheng Lian <lian@databricks.com>

Closes #11936 from liancheng/spark-14116-build-reader-for-orc.
2016-03-26 16:10:35 -07:00
gatorsmile 8989d3a396 [SPARK-14161][SQL] Native Parsing for DDL Command Drop Database
### What changes were proposed in this pull request?
Based on the Hive DDL document https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL

The syntax of DDL command for Drop Database is
```SQL
DROP (DATABASE|SCHEMA) [IF EXISTS] database_name [RESTRICT|CASCADE];
```
 - If `IF EXISTS` is not specified, the default behavior is to issue a warning message if `database_name` does't exist
 - `RESTRICT` is the default behavior.

This PR is to provide a native parsing support for `DROP DATABASE`.

#### How was this patch tested?

Added a test case `DDLCommandSuite`

Author: gatorsmile <gatorsmile@gmail.com>

Closes #11962 from gatorsmile/parseDropDatabase.
2016-03-26 14:11:13 -07:00
Davies Liu bd94ea4c80 [SPARK-14175][SQL] whole stage codegen interface refactor
## What changes were proposed in this pull request?

1. merge consumeChild into consume()
2. always generate code for input variables and UnsafeRow, a plan can use eight of them.

## How was this patch tested?

Existing tests.

Author: Davies Liu <davies@databricks.com>

Closes #11975 from davies/gen_refactor.
2016-03-26 11:03:05 -07:00
Dongjoon Hyun 1808465855 [MINOR] Fix newly added java-lint errors
## What changes were proposed in this pull request?

This PR fixes some newly added java-lint errors(unused-imports, line-lengsth).

## How was this patch tested?

Pass the Jenkins tests.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #11968 from dongjoon-hyun/SPARK-14167.
2016-03-26 11:55:49 +00:00
Tathagata Das 13945dd83b [SPARK-14109][SQL] Fix HDFSMetadataLog to fallback from FileContext to FileSystem API
## What changes were proposed in this pull request?

HDFSMetadataLog uses newer FileContext API to achieve atomic renaming. However, FileContext implementations may not exist for many scheme for which there may be FileSystem implementations. In those cases, rather than failing completely, we should fallback to the FileSystem based implementation, and log warning that there may be file consistency issues in case the log directory is concurrently modified.

In addition I have also added more tests to increase the code coverage.

## How was this patch tested?

Unit test.
Tested on cluster with custom file system.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #11925 from tdas/SPARK-14109.
2016-03-25 20:07:54 -07:00
Shixiong Zhu 24587ce433 [SPARK-14073][STREAMING][TEST-MAVEN] Move flume back to Spark
## What changes were proposed in this pull request?

This PR moves flume back to Spark as per the discussion in the dev mail-list.

## How was this patch tested?

Existing Jenkins tests.

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #11895 from zsxwing/move-flume-back.
2016-03-25 17:37:16 -07:00
Shixiong Zhu b554b3c46b [SPARK-14131][SQL] Add a workaround for HADOOP-10622 to fix DataFrameReaderWriterSuite
## What changes were proposed in this pull request?

There is a potential dead-lock in Hadoop Shell.runCommand before 2.5.0 ([HADOOP-10622](https://issues.apache.org/jira/browse/HADOOP-10622)). If we interrupt some thread running Shell.runCommand, we may hit this issue.

This PR adds some protecion to prevent from interrupting the microBatchThread when we may run into Shell.runCommand. There are two places will call Shell.runCommand now:

- offsetLog.add
- FileStreamSource.getOffset

They will create a file using HDFS API and call Shell.runCommand to set the file permission.

## How was this patch tested?

Existing unit tests.

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #11940 from zsxwing/workaround-for-HADOOP-10622.
2016-03-25 13:28:26 -07:00
Sameer Agarwal afd0debe07 [SPARK-14137] [SPARK-14150] [SQL] Infer IsNotNull constraints from non-nullable attributes
## What changes were proposed in this pull request?

This PR adds support for automatically inferring `IsNotNull` constraints from any non-nullable attributes that are part of an operator's output. This also fixes the issue that causes the optimizer to hit the maximum number of iterations for certain queries in https://github.com/apache/spark/pull/11828.

## How was this patch tested?

Unit test in `ConstraintPropagationSuite`

Author: Sameer Agarwal <sameer@databricks.com>

Closes #11953 from sameeragarwal/infer-isnotnull.
2016-03-25 12:57:26 -07:00
Liang-Chi Hsieh ca003354da [SPARK-12443][SQL] encoderFor should support Decimal
## What changes were proposed in this pull request?

JIRA: https://issues.apache.org/jira/browse/SPARK-12443

`constructorFor` will call `dataTypeFor` to determine if a type is `ObjectType` or not. If there is not case for `Decimal`, it will be recognized as `ObjectType` and causes the bug.

## How was this patch tested?

Test is added into `ExpressionEncoderSuite`.

Author: Liang-Chi Hsieh <simonh@tw.ibm.com>
Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #10399 from viirya/fix-encoder-decimal.
2016-03-25 12:07:56 -07:00
Tathagata Das 11fa8741ca [SQL][HOTFIX] Fix flakiness in StateStoreRDDSuite
## What changes were proposed in this pull request?
StateStoreCoordinator.reportActiveInstance is async, so subsequence state checks must be in eventually.
## How was this patch tested?
Jenkins tests

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #11924 from tdas/state-store-flaky-fix.
2016-03-25 12:04:47 -07:00
Sameer Agarwal b5f8c36e3c [SPARK-14144][SQL] Explicitly identify/catch UnsupportedOperationException during parquet reader initialization
## What changes were proposed in this pull request?

This PR is a minor cleanup task as part of https://issues.apache.org/jira/browse/SPARK-14008 to explicitly identify/catch the `UnsupportedOperationException` while initializing the vectorized parquet reader. Other exceptions will simply be thrown back to `SqlNewHadoopPartition`.

## How was this patch tested?

N/A (cleanup only; no new functionality added)

Author: Sameer Agarwal <sameer@databricks.com>

Closes #11950 from sameeragarwal/parquet-cleanup.
2016-03-25 11:48:05 -07:00
Wenchen Fan 43b15e01c4 [SPARK-14061][SQL] implement CreateMap
## What changes were proposed in this pull request?

As we have `CreateArray` and `CreateStruct`, we should also have `CreateMap`.  This PR adds the `CreateMap` expression, and the DataFrame API, and python API.

## How was this patch tested?

various new tests.

Author: Wenchen Fan <wenchen@databricks.com>

Closes #11879 from cloud-fan/create_map.
2016-03-25 09:50:06 -07:00
Davies Liu 6603d9f7e2 [SPARK-13919] [SQL] fix column pruning through filter
## What changes were proposed in this pull request?

This PR fix the conflict between ColumnPruning and PushPredicatesThroughProject, because ColumnPruning will try to insert a Project before Filter, but PushPredicatesThroughProject will move the Filter before Project.This is fixed by remove the Project before Filter, if the Project only do column pruning.

The RuleExecutor will fail the test if reached max iterations.

Closes #11745

## How was this patch tested?

Existing tests.

This is a test case still failing, disabled for now, will be fixed by https://issues.apache.org/jira/browse/SPARK-14137

Author: Davies Liu <davies@databricks.com>

Closes #11828 from davies/fail_rule.
2016-03-25 09:05:23 -07:00
Wenchen Fan e9b6e7d857 [SPARK-13456][SQL][FOLLOW-UP] lazily generate the outer pointer for case class defined in REPL
## What changes were proposed in this pull request?

In https://github.com/apache/spark/pull/11410, we missed a corner case: define the inner class and use it in `Dataset` at the same time by using paste mode. For this case, the inner class and the `Dataset` are inside same line object, when we build the `Dataset`, we try to get outer pointer from line object, and it will fail because the line object is not initialized yet.

https://issues.apache.org/jira/browse/SPARK-13456?focusedCommentId=15209174&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15209174 is an example for this corner case.

This PR make the process of getting outer pointer from line object lazy, so that we can successfully build the `Dataset` and finish initializing the line object.

## How was this patch tested?

new test in repl suite.

Author: Wenchen Fan <wenchen@databricks.com>

Closes #11931 from cloud-fan/repl.
2016-03-25 20:19:04 +08:00
Reynold Xin 70a6f0bb57 [SPARK-14149] Log exceptions in tryOrIOException
## What changes were proposed in this pull request?
We ran into a problem today debugging some class loading problem during deserialization, and JVM was masking the underlying exception which made it very difficult to debug. We can however log the exceptions using try/catch ourselves in serialization/deserialization. The good thing is that all these methods are already using Utils.tryOrIOException, so we can just put the try catch and logging in a single place.

## How was this patch tested?
A logging change with a manual test.

Author: Reynold Xin <rxin@databricks.com>

Closes #11951 from rxin/SPARK-14149.
2016-03-25 01:17:23 -07:00
Andrew Or 20ddf5fddf [SPARK-14014][SQL] Integrate session catalog (attempt #2)
## What changes were proposed in this pull request?

This reopens #11836, which was merged but promptly reverted because it introduced flaky Hive tests.

## How was this patch tested?

See `CatalogTestCases`, `SessionCatalogSuite` and `HiveContextSuite`.

Author: Andrew Or <andrew@databricks.com>

Closes #11938 from andrewor14/session-catalog-again.
2016-03-24 22:59:35 -07:00
Reynold Xin 1c70b7650f [SPARK-14145][SQL] Remove the untyped version of Dataset.groupByKey
## What changes were proposed in this pull request?
Dataset has two variants of groupByKey, one for untyped and the other for typed. It actually doesn't make as much sense to have an untyped API here, since apps that want to use untyped APIs should just use the groupBy "DataFrame" API.

## How was this patch tested?
This patch removes a method, and removes the associated tests.

Author: Reynold Xin <rxin@databricks.com>

Closes #11949 from rxin/SPARK-14145.
2016-03-24 22:56:34 -07:00
Reynold Xin 3619fec1ec [SPARK-14142][SQL] Replace internal use of unionAll with union
## What changes were proposed in this pull request?
unionAll has been deprecated in SPARK-14088.

## How was this patch tested?
Should be covered by all existing tests.

Author: Reynold Xin <rxin@databricks.com>

Closes #11946 from rxin/SPARK-14142.
2016-03-24 22:34:55 -07:00
gatorsmile 05f652d6c2 [SPARK-13957][SQL] Support Group By Ordinal in SQL
#### What changes were proposed in this pull request?
This PR is to support group by position in SQL. For example, when users input the following query
```SQL
select c1 as a, c2, c3, sum(*) from tbl group by 1, 3, c4
```
The ordinals are recognized as the positions in the select list. Thus, `Analyzer` converts it to
```SQL
select c1, c2, c3, sum(*) from tbl group by c1, c3, c4
```

This is controlled by the config option `spark.sql.groupByOrdinal`.
- When true, the ordinal numbers in group by clauses are treated as the position in the select list.
- When false, the ordinal numbers are ignored.
- Only convert integer literals (not foldable expressions). If found foldable expressions, ignore them.
- When the positions specified in the group by clauses correspond to the aggregate functions in select list, output an exception message.
- star is not allowed to use in the select list when users specify ordinals in group by

Note: This PR is taken from https://github.com/apache/spark/pull/10731. When merging this PR, please give the credit to zhichao-li

Also cc all the people who are involved in the previous discussion:  rxin cloud-fan marmbrus yhuai hvanhovell adrian-wang chenghao-intel tejasapatil

#### How was this patch tested?

Added a few test cases for both positive and negative test cases.

Author: gatorsmile <gatorsmile@gmail.com>
Author: xiaoli <lixiao1983@gmail.com>
Author: Xiao Li <xiaoli@Xiaos-MacBook-Pro.local>

Closes #11846 from gatorsmile/groupByOrdinal.
2016-03-25 12:55:58 +08:00
Andrew Or c44d140cae Revert "[SPARK-14014][SQL] Replace existing catalog with SessionCatalog"
This reverts commit 5dfc01976b.
2016-03-23 22:21:15 -07:00
gatorsmile f42eaf42bd [SPARK-14085][SQL] Star Expansion for Hash
#### What changes were proposed in this pull request?

This PR is to support star expansion in hash. For example,
```SQL
val structDf = testData2.select("a", "b").as("record")
structDf.select(hash($"*")
```

In addition, it refactors the codes for the rule `ResolveStar` and fixes a regression for star expansion in group by when using SQL API. For example,
```SQL
SELECT * FROM testData2 group by a, b
```

cc cloud-fan Now, the code for star resolution is much cleaner. The coverage is better. Could you check if this refactoring is good? Thanks!

#### How was this patch tested?
Added a few test cases to cover it.

Author: gatorsmile <gatorsmile@gmail.com>

Closes #11904 from gatorsmile/starResolution.
2016-03-24 11:13:36 +08:00
Andrew Or 5dfc01976b [SPARK-14014][SQL] Replace existing catalog with SessionCatalog
## What changes were proposed in this pull request?

`SessionCatalog`, introduced in #11750, is a catalog that keeps track of temporary functions and tables, and delegates metastore operations to `ExternalCatalog`. This functionality overlaps a lot with the existing `analysis.Catalog`.

As of this commit, `SessionCatalog` and `ExternalCatalog` will no longer be dead code. There are still things that need to be done after this patch, namely:
- SPARK-14013: Properly implement temporary functions in `SessionCatalog`
- SPARK-13879: Decide which DDL/DML commands to support natively in Spark
- SPARK-?????: Implement the ones we do want to support through `SessionCatalog`.
- SPARK-?????: Merge SQL/HiveContext

## How was this patch tested?

This is largely a refactoring task so there are no new tests introduced. The particularly relevant tests are `SessionCatalogSuite` and `ExternalCatalogSuite`.

Author: Andrew Or <andrew@databricks.com>
Author: Yin Huai <yhuai@databricks.com>

Closes #11836 from andrewor14/use-session-catalog.
2016-03-23 13:34:22 -07:00
Michael Armbrust 6bc4be64f8 [SPARK-14078] Streaming Parquet Based FileSink
This PR adds a new `Sink` implementation that writes out Parquet files.  In order to correctly handle partial failures while maintaining exactly once semantics, the files for each batch are written out to a unique directory and then atomically appended to a metadata log.  When a parquet based `DataSource` is initialized for reading, we first check for this log directory and use it instead of file listing when present.

Unit tests are added, as well as a stress test that checks the answer after non-deterministic injected failures.

Author: Michael Armbrust <michael@databricks.com>

Closes #11897 from marmbrus/fileSink.
2016-03-23 13:03:25 -07:00
Herman van Hovell 919bf32198 [SPARK-13325][SQL] Create a 64-bit hashcode expression
This PR introduces a 64-bit hashcode expression. Such an expression is especially usefull for HyperLogLog++ and other probabilistic datastructures.

I have implemented xxHash64 which is a 64-bit hashing algorithm created by Yann Colet and Mathias Westerdahl. This is a high speed (C implementation runs at memory bandwidth) and high quality hashcode. It exploits both Instruction Level Parralellism (for speed) and the multiplication and rotation techniques (for quality) like MurMurHash does.

The initial results are promising. I have added a CG'ed test to the `HashBenchmark`, and this results in the following results (running from SBT):

    Running benchmark: Hash For simple
      Running case: interpreted version
      Running case: codegen version
      Running case: codegen version 64-bit

    Intel(R) Core(TM) i7-4750HQ CPU  2.00GHz
    Hash For simple:                    Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
    -------------------------------------------------------------------------------------------
    interpreted version                      1011 / 1016        132.8           7.5       1.0X
    codegen version                          1864 / 1869         72.0          13.9       0.5X
    codegen version 64-bit                   1614 / 1644         83.2          12.0       0.6X

    Running benchmark: Hash For normal
      Running case: interpreted version
      Running case: codegen version
      Running case: codegen version 64-bit

    Intel(R) Core(TM) i7-4750HQ CPU  2.00GHz
    Hash For normal:                    Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
    -------------------------------------------------------------------------------------------
    interpreted version                      2467 / 2475          0.9        1176.1       1.0X
    codegen version                          2008 / 2115          1.0         957.5       1.2X
    codegen version 64-bit                    728 /  758          2.9         347.0       3.4X

    Running benchmark: Hash For array
      Running case: interpreted version
      Running case: codegen version
      Running case: codegen version 64-bit

    Intel(R) Core(TM) i7-4750HQ CPU  2.00GHz
    Hash For array:                     Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
    -------------------------------------------------------------------------------------------
    interpreted version                      1544 / 1707          0.1       11779.6       1.0X
    codegen version                          2728 / 2745          0.0       20815.5       0.6X
    codegen version 64-bit                   2508 / 2549          0.1       19132.8       0.6X

    Running benchmark: Hash For map
      Running case: interpreted version
      Running case: codegen version
      Running case: codegen version 64-bit

    Intel(R) Core(TM) i7-4750HQ CPU  2.00GHz
    Hash For map:                       Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
    -------------------------------------------------------------------------------------------
    interpreted version                      1819 / 1826          0.0      444014.3       1.0X
    codegen version                           183 /  194          0.0       44642.9       9.9X
    codegen version 64-bit                    173 /  174          0.0       42120.9      10.5X

This shows that algorithm is consistently faster than MurMurHash32 in all cases and up to 3x (!) in the normal case.

I have also added this to HyperLogLog++ and it cuts the processing time of the following code in half:

    val df = sqlContext.range(1<<25).agg(approxCountDistinct("id"))
    df.explain()
    val t = System.nanoTime()
    df.show()
    val ns = System.nanoTime() - t

    // Before
    ns: Long = 5821524302

    // After
    ns: Long = 2836418963

cc cloud-fan (you have been working on hashcodes) / rxin

Author: Herman van Hovell <hvanhovell@questtec.nl>

Closes #11209 from hvanhovell/xxHash.
2016-03-23 20:51:01 +01:00
Tathagata Das 8c826880f5 [SPARK-13809][SQL] State store for streaming aggregations
## What changes were proposed in this pull request?

In this PR, I am implementing a new abstraction for management of streaming state data - State Store. It is a key-value store for persisting running aggregates for aggregate operations in streaming dataframes. The motivation and design is discussed here.

https://docs.google.com/document/d/1-ncawFx8JS5Zyfq1HAEGBx56RDet9wfVp_hDM8ZL254/edit#

## How was this patch tested?
- [x] Unit tests
- [x] Cluster tests

**Coverage from unit tests**

<img width="952" alt="screen shot 2016-03-21 at 3 09 40 pm" src="https://cloud.githubusercontent.com/assets/663212/13935872/fdc8ba86-ef76-11e5-93e8-9fa310472c7b.png">

## TODO
- [x] Fix updates() iterator to avoid duplicate updates for same key
- [x] Use Coordinator in ContinuousQueryManager
- [x] Plugging in hadoop conf and other confs
- [x] Unit tests
  - [x] StateStore object lifecycle and methods
  - [x] StateStoreCoordinator communication and logic
  - [x] StateStoreRDD fault-tolerance
  - [x] StateStoreRDD preferred location using StateStoreCoordinator
- [ ] Cluster tests
  - [ ] Whether preferred locations are set correctly
  - [ ] Whether recovery works correctly with distributed storage
  - [x] Basic performance tests
- [x] Docs

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #11645 from tdas/state-store.
2016-03-23 12:48:05 -07:00
Sameer Agarwal 0a64294fcb [SPARK-14015][SQL] Support TimestampType in vectorized parquet reader
## What changes were proposed in this pull request?

This PR adds support for TimestampType in the vectorized parquet reader

## How was this patch tested?

1. `VectorizedColumnReader` initially had a gating condition on `primitiveType.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.INT96)` that made us fall back on parquet-mr for handling timestamps. This condition is now removed.
2. The `ParquetHadoopFsRelationSuite` (that tests for all supported hive types -- including `TimestampType`) fails when the gating condition is removed (https://github.com/apache/spark/pull/11808) and should now pass with this change. Similarly, the `ParquetHiveCompatibilitySuite.SPARK-10177 timestamp` test that fails when the gating condition is removed, should now pass as well.
3.  Added tests in `HadoopFsRelationTest` that test both the dictionary encoded and non-encoded versions across all supported datatypes.

Author: Sameer Agarwal <sameer@databricks.com>

Closes #11882 from sameeragarwal/timestamp-parquet.
2016-03-23 12:13:32 -07:00
Davies Liu 02d9c352c7 [SPARK-14092] [SQL] move shouldStop() to end of while loop
## What changes were proposed in this pull request?

This PR rollback some changes in #11274 , which introduced some performance regression when do a simple aggregation on parquet scan with one integer column.

Does not really understand how this change introduce this huge impact, maybe related show JIT compiler inline functions. (saw very different stats from profiling).

## How was this patch tested?

Manually run the parquet reader benchmark, before this change:
```
Intel(R) Core(TM) i7-4558U CPU  2.80GHz
Int and String Scan:                Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
-------------------------------------------------------------------------------------------
SQL Parquet Vectorized                   2391 / 3107         43.9          22.8       1.0X
```
After this change
```
Java HotSpot(TM) 64-Bit Server VM 1.7.0_60-b19 on Mac OS X 10.9.5
Intel(R) Core(TM) i7-4558U CPU  2.80GHz
Int and String Scan:                Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
-------------------------------------------------------------------------------------------
SQL Parquet Vectorized                   2032 / 2626         51.6          19.4       1.0X```

Author: Davies Liu <davies@databricks.com>

Closes #11912 from davies/fix_regression.
2016-03-23 11:58:43 -07:00
Josh Rosen 3de24ae2ed [SPARK-14075] Refactor MemoryStore to be testable independent of BlockManager
This patch refactors the `MemoryStore` so that it can be tested without needing to construct / mock an entire `BlockManager`.

- The block manager's serialization- and compression-related methods have been moved from `BlockManager` to `SerializerManager`.
- `BlockInfoManager `is now passed directly to classes that need it, rather than being passed via the `BlockManager`.
- The `MemoryStore` now calls `dropFromMemory` via a new `BlockEvictionHandler` interface rather than directly calling the `BlockManager`. This change helps to enforce a narrow interface between the `MemoryStore` and `BlockManager` functionality and makes this interface easier to mock in tests.
- Several of the block unrolling tests have been moved from `BlockManagerSuite` into a new `MemoryStoreSuite`.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #11899 from JoshRosen/reduce-memorystore-blockmanager-coupling.
2016-03-23 10:15:23 -07:00
gatorsmile 6ce008ba46 [SPARK-13549][SQL] Refactor the Optimizer Rule CollapseProject
#### What changes were proposed in this pull request?

The PR https://github.com/apache/spark/pull/10541 changed the rule `CollapseProject` by enabling collapsing `Project` into `Aggregate`. It leaves a to-do item to remove the duplicate code. This PR is to finish this to-do item. Also added a test case for covering this change.

#### How was this patch tested?

Added a new test case.

liancheng Could you check if the code refactoring is fine? Thanks!

Author: gatorsmile <gatorsmile@gmail.com>

Closes #11427 from gatorsmile/collapseProjectRefactor.
2016-03-24 00:51:31 +08:00
Cheng Lian cde086cb2a [SPARK-13817][SQL][MINOR] Renames Dataset.newDataFrame to Dataset.ofRows
## What changes were proposed in this pull request?

This PR does the renaming as suggested by marmbrus in [this comment][1].

## How was this patch tested?

Existing tests.

[1]: 6d37e1eb90 (commitcomment-16654694)

Author: Cheng Lian <lian@databricks.com>

Closes #11889 from liancheng/spark-13817-follow-up.
2016-03-24 00:42:13 +08:00
Shixiong Zhu abacf5f258 [HOTFIX][SQL] Don't stop ContinuousQuery in quietly
## What changes were proposed in this pull request?

Try to fix a flaky hang

## How was this patch tested?

Existing Jenkins test

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #11909 from zsxwing/hotfix2.
2016-03-23 00:00:35 -07:00
Reynold Xin 926a93e54b [SPARK-14088][SQL] Some Dataset API touch-up
## What changes were proposed in this pull request?
1. Deprecated unionAll. It is pretty confusing to have both "union" and "unionAll" when the two do the same thing in Spark but are different in SQL.
2. Rename reduce in KeyValueGroupedDataset to reduceGroups so it is more consistent with rest of the functions in KeyValueGroupedDataset. Also makes it more obvious what "reduce" and "reduceGroups" mean. Previously it was confusing because it could be reducing a Dataset, or just reducing groups.
3. Added a "name" function, which is more natural to name columns than "as" for non-SQL users.
4. Remove "subtract" function since it is just an alias for "except".

## How was this patch tested?
All changes should be covered by existing tests. Also added couple test cases to cover "name".

Author: Reynold Xin <rxin@databricks.com>

Closes #11908 from rxin/SPARK-14088.
2016-03-22 23:43:09 -07:00
Dongjoon Hyun 1a22cf1e9b [MINOR][SQL][DOCS] Update sql/README.md and remove some unused imports in sql module.
## What changes were proposed in this pull request?

This PR updates `sql/README.md` according to the latest console output and removes some unused imports in `sql` module. This is done by manually, so there is no guarantee to remove all unused imports.

## How was this patch tested?

Manual.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #11907 from dongjoon-hyun/update_sql_module.
2016-03-22 23:07:49 -07:00
Yong Tang 75dc29620e [SPARK-13401][SQL][TESTS] Fix SQL test warnings.
## What changes were proposed in this pull request?

This fix tries to fix several SQL test warnings under the sql/core/src/test directory. The fixed warnings includes "[unchecked]", "[rawtypes]", and "[varargs]".

## How was this patch tested?

All existing tests passed.

Author: Yong Tang <yong.tang.github@outlook.com>

Closes #11857 from yongtang/SPARK-13401.
2016-03-22 21:08:11 -07:00
Davies Liu 4700adb98e [SPARK-13806] [SQL] fix rounding mode of negative float/double
## What changes were proposed in this pull request?

Round() in database usually round the number up (away from zero), it's different than Math.round() in Java.

For example:
```
scala> java.lang.Math.round(-3.5)
res3: Long = -3
```
In Database, we should return -4.0 in this cases.

This PR remove the buggy special case for scale=0.

## How was this patch tested?

Add tests for negative values with tie.

Author: Davies Liu <davies@databricks.com>

Closes #11894 from davies/fix_round.
2016-03-22 16:45:20 -07:00
Shixiong Zhu d16710b4c9 [HOTFIX][SQL] Add a timeout for 'cq.stop'
## What changes were proposed in this pull request?

Fix an issue that DataFrameReaderWriterSuite may hang forever.

## How was this patch tested?

Existing tests.

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #11902 from zsxwing/hotfix.
2016-03-22 16:41:55 -07:00
Reynold Xin b2b1ad7d4c [SPARK-14060][SQL] Move StringToColumn implicit class into SQLImplicits
## What changes were proposed in this pull request?
This patch moves StringToColumn implicit class into SQLImplicits. This was kept in SQLContext.implicits object for binary backward compatibility, in the Spark 1.x series. It makes more sense for this API to be in SQLImplicits since that's the single class that defines all the SQL implicits.

## How was this patch tested?
Should be covered by existing unit tests.

Author: Reynold Xin <rxin@databricks.com>
Author: Wenchen Fan <wenchen@databricks.com>

Closes #11878 from rxin/SPARK-14060.
2016-03-22 13:48:03 -07:00
Reynold Xin 297c20226d [SPARK-14063][SQL] SQLContext.range should return Dataset[java.lang.Long]
## What changes were proposed in this pull request?
This patch changed the return type for SQLContext.range from `Dataset[Long]` (Scala primitive) to `Dataset[java.lang.Long]` (Java boxed long).

Previously, SPARK-13894 changed the return type of range from `Dataset[Row]` to `Dataset[Long]`. The problem is that due to https://issues.scala-lang.org/browse/SI-4388, Scala compiles primitive types in generics into just Object, i.e. range at bytecode level now just returns `Dataset[Object]`. This is really bad for Java users because they are losing type safety and also need to add a type cast every time they use range.

Talked to Jason Zaugg from Lightbend (Typesafe) who suggested the best approach is to return `Dataset[java.lang.Long]`. The downside is that when Scala users want to explicitly type a closure used on the dataset returned by range, they would need to use `java.lang.Long` instead of the Scala `Long`.

## How was this patch tested?
The signature change should be covered by existing unit tests and API tests. I also added a new test case in DatasetSuite for range.

Author: Reynold Xin <rxin@databricks.com>

Closes #11880 from rxin/SPARK-14063.
2016-03-22 11:37:37 -07:00
Michael Armbrust caea152145 [SPARK-13985][SQL] Deterministic batches with ids
This PR relaxes the requirements of a `Sink` for structured streaming to only require idempotent appending of data.  Previously the `Sink` needed to be able to transactionally append data while recording an opaque offset indicated how far in a stream we have processed.

In order to do this, a new write-ahead-log has been added to stream execution, which records the offsets that will are present in each batch.  The log is created in the newly added `checkpointLocation`, which defaults to `${spark.sql.streaming.checkpointLocation}/${queryName}` but can be overriden by setting `checkpointLocation` in `DataFrameWriter`.

In addition to making sinks easier to write the addition of batchIds and a checkpoint location is done in anticipation of integration with the the `StateStore` (#11645).

Author: Michael Armbrust <michael@databricks.com>

Closes #11804 from marmbrus/batchIds.
2016-03-22 10:18:42 -07:00
Dongjoon Hyun c632bdc01f [SPARK-14029][SQL] Improve BooleanSimplification optimization by implementing Not canonicalization.
## What changes were proposed in this pull request?

Currently, **BooleanSimplification** optimization can handle the following cases.
* a && (!a || b ) ==> a && b
* a && (b || !a ) ==> a && b

However, it can not handle the followings cases since those equations fail at the comparisons between their canonicalized forms.
* a < 1 && (!(a < 1) || b)     ==> (a < 1) && b
* a <= 1 && (!(a <= 1) || b) ==> (a <= 1) && b
* a > 1 && (!(a > 1) || b)     ==> (a > 1) && b
* a >= 1 && (!(a >= 1) || b) ==> (a >= 1) && b

This PR implements the above cases and also the followings, too.
* a < 1 && ((a >= 1) || b )   ==> (a < 1) && b
* a <= 1 && ((a > 1) || b )   ==> (a <= 1) && b
* a > 1 && ((a <= 1) || b)  ==> (a > 1) && b
* a >= 1 && ((a < 1) || b)  ==> (a >= 1) && b

## How was this patch tested?

Pass the Jenkins tests including new test cases in BooleanSimplicationSuite.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #11851 from dongjoon-hyun/SPARK-14029.
2016-03-22 10:17:08 -07:00
Sunitha Kambhampati 0ce01635cc [SPARK-13774][SQL] - Improve error message for non-existent paths and add tests
SPARK-13774: IllegalArgumentException: Can not create a Path from an empty string for incorrect file path

**Overview:**
-	If a non-existent path is given in this call
``
scala> sqlContext.read.format("csv").load("file-path-is-incorrect.csv")
``
it throws the following error:
`java.lang.IllegalArgumentException: Can not create a Path from an empty string` …..
`It gets called from inferSchema call in org.apache.spark.sql.execution.datasources.DataSource.resolveRelation`

-	The purpose of this JIRA is to throw a better error message.
-	With the fix, you will now get a _Path does not exist_ error message.
```
scala> sqlContext.read.format("csv").load("file-path-is-incorrect.csv")
org.apache.spark.sql.AnalysisException: Path does not exist: file:/Users/ksunitha/trunk/spark/file-path-is-incorrect.csv;
  at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$12.apply(DataSource.scala:215)
  at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$12.apply(DataSource.scala:204)
  ...
  at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:204)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:131)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:141)
  ... 49 elided
```

**Details**
_Changes include:_
-	Check if path exists or not in resolveRelation in DataSource, and throw an AnalysisException with message like “Path does not exist: $path”
-	AnalysisException is thrown similar to the exceptions thrown in resolveRelation.
-	The glob path and the non glob path is checked with minimal calls to path exists. If the globPath is empty, then it is a nonexistent glob pattern and an error will be thrown. In the scenario that it is not globPath, it is necessary to only check if the first element in the Seq is valid or not.

_Test modifications:_
-	Changes went in for 3 tests to account for this error checking.
-	SQLQuerySuite:test("run sql directly on files") – Error message needed to be updated.
-	2 tests failed in MetastoreDataSourcesSuite because they had a dummy path and so test is modified to give a tempdir and allow it to move past so it can continue to test the codepath it meant to test

_New Tests:_
2 new tests are added to DataFrameSuite to validate that glob and non-glob path will throw the new error message.

_Testing:_
Unit tests were run with the fix.

**Notes/Questions to reviewers:**
-	There is some code duplication in DataSource.scala in resolveRelation method and also createSource with respect to getting the paths.  I have not made any changes to the createSource codepath.  Should we make the change there as well ?

-	From other JIRAs, I know there is restructuring and changes going on in this area, not sure how that will affect these changes, but since this seemed like a starter issue, I looked into it.  If we prefer not to add the overhead of the checks, or if there is a better place to do so, let me know.

I would appreciate your review. Thanks for your time and comments.

Author: Sunitha Kambhampati <skambha@us.ibm.com>

Closes #11775 from skambha/improve_errmsg.
2016-03-22 20:47:57 +08:00
hyukjinkwon 4e09a0d5ea [SPARK-13953][SQL] Specifying the field name for corrupted record via option at JSON datasource
## What changes were proposed in this pull request?

https://issues.apache.org/jira/browse/SPARK-13953

Currently, JSON data source creates a new field in `PERMISSIVE` mode for storing malformed string.
This field can be renamed via `spark.sql.columnNameOfCorruptRecord` option but it is a global configuration.

This PR make that option can be applied per read and can be specified via `option()`. This will overwrites `spark.sql.columnNameOfCorruptRecord` if it is set.

## How was this patch tested?

Unit tests were used and `./dev/run_tests` for coding style tests.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #11881 from HyukjinKwon/SPARK-13953.
2016-03-22 20:30:48 +08:00
Cheng Lian f2e855fba8 [SPARK-13473][SQL] Simplifies PushPredicateThroughProject
## What changes were proposed in this pull request?

This is a follow-up of PR #11348.

After PR #11348, a predicate is never pushed through a project as long as the project contains any non-deterministic fields. Thus, it's impossible that the candidate filter condition can reference any non-deterministic projected fields, and related logic can be safely cleaned up.

To be more specific, the following optimization is allowed:

```scala
// From:
df.select('a, 'b).filter('c > rand(42))
// To:
df.filter('c > rand(42)).select('a, 'b)
```

while this isn't:

```scala
// From:
df.select('a, rand('b) as 'rb, 'c).filter('c > 'rb)
// To:
df.filter('c > rand('b)).select('a, rand('b) as 'rb, 'c)
```

## How was this patch tested?

Existing test cases should do the work.

Author: Cheng Lian <lian@databricks.com>

Closes #11864 from liancheng/spark-13473-cleanup.
2016-03-22 19:20:56 +08:00