Commit graph

3616 commits

Author SHA1 Message Date
Wenchen Fan dacc382f0c [SPARK-19887][SQL] dynamic partition keys can be null or empty string
## What changes were proposed in this pull request?

When dynamic partition value is null or empty string, we should write the data to a directory like `a=__HIVE_DEFAULT_PARTITION__`, when we read the data back, we should respect this special directory name and treat it as null.

This is the same behavior of impala, see https://issues.apache.org/jira/browse/IMPALA-252

## How was this patch tested?

new regression test

Author: Wenchen Fan <wenchen@databricks.com>

Closes #17277 from cloud-fan/partition.
2017-03-15 08:24:41 +08:00
Takuya UESHIN 7ded39c223 [SPARK-19817][SQL] Make it clear that timeZone option is a general option in DataFrameReader/Writer.
## What changes were proposed in this pull request?

As timezone setting can also affect partition values, it works for all formats, we should make it clear.

## How was this patch tested?

Existing tests.

Author: Takuya UESHIN <ueshin@databricks.com>

Closes #17281 from ueshin/issues/SPARK-19817.
2017-03-14 13:57:23 -07:00
Nattavut Sutyanyong 6eac96823c [SPARK-18966][SQL] NOT IN subquery with correlated expressions may return incorrect result
## What changes were proposed in this pull request?

This PR fixes the following problem:
````
Seq((1, 2)).toDF("a1", "a2").createOrReplaceTempView("a")
Seq[(java.lang.Integer, java.lang.Integer)]((1, null)).toDF("b1", "b2").createOrReplaceTempView("b")

// The expected result is 1 row of (1,2) as shown in the next statement.
sql("select * from a where a1 not in (select b1 from b where b2 = a2)").show
+---+---+
| a1| a2|
+---+---+
+---+---+

sql("select * from a where a1 not in (select b1 from b where b2 = 2)").show
+---+---+
| a1| a2|
+---+---+
|  1|  2|
+---+---+
````
There are a number of scenarios to consider:

1. When the correlated predicate yields a match (i.e., B.B2 = A.A2)
1.1. When the NOT IN expression yields a match (i.e., A.A1 = B.B1)
1.2. When the NOT IN expression yields no match (i.e., A.A1 = B.B1 returns false)
1.3. When A.A1 is null
1.4. When B.B1 is null
1.4.1. When A.A1 is not null
1.4.2. When A.A1 is null

2. When the correlated predicate yields no match (i.e.,B.B2 = A.A2 is false or unknown)
2.1. When B.B2 is null and A.A2 is null
2.2. When B.B2 is null and A.A2 is not null
2.3. When the value of A.A2 does not match any of B.B2

````
 A.A1   A.A2      B.B1   B.B2
-----  -----     -----  -----
    1      1         1      1    (1.1)
    2      1                     (1.2)
 null      1                     (1.3)

    1      3      null      3    (1.4.1)
 null      3                     (1.4.2)

    1   null         1   null    (2.1)
 null      2                     (2.2 & 2.3)
````

We can divide the evaluation of the above correlated NOT IN subquery into 2 groups:-

Group 1: The rows in A when there is a match from the correlated predicate (A.A1 = B.B1)

In this case, the result of the subquery is not empty and the semantics of the NOT IN depends solely on the evaluation of the equality comparison of the columns of NOT IN, i.e., A1 = B1, which says

- If A.A1 is null, the row is filtered (1.3 and 1.4.2)
- If A.A1 = B.B1, the row is filtered (1.1)
- If B.B1 is null, any rows of A in the same group (A.A2 = B.B2) is filtered (1.4.1 & 1.4.2)
- Otherwise, the row is qualified.

Hence, in this group, the result is the row from (1.2).

Group 2: The rows in A when there is no match from the correlated predicate (A.A2 = B.B2)

In this case, all the rows in A, including the rows where A.A1, are qualified because the subquery returns an empty set and by the semantics of the NOT IN, all rows from the parent side qualifies as the result set, that is, the rows from (2.1, 2.2 and 2.3).

In conclusion, the correct result set of the above query is
````
 A.A1   A.A2
-----  -----
    2      1    (1.2)
    1   null    (2.1)
 null      2    (2.2 & 2.3)
````
## How was this patch tested?
unit tests, regression tests, and new test cases focusing on the problem being fixed.

Author: Nattavut Sutyanyong <nsy.can@gmail.com>

Closes #17294 from nsyca/18966.
2017-03-14 20:34:59 +01:00
Herman van Hovell e04c05cf41 [SPARK-19933][SQL] Do not change output of a subquery
## What changes were proposed in this pull request?
The `RemoveRedundantAlias` rule can change the output attributes (the expression id's to be precise) of a query by eliminating the redundant alias producing them. This is no problem for a regular query, but can cause problems for correlated subqueries: The attributes produced by the subquery are used in the parent plan; changing them will break the parent plan.

This PR fixes this by wrapping a subquery in a `Subquery` top level node when it gets optimized. The `RemoveRedundantAlias` rule now recognizes `Subquery` and makes sure that the output attributes of the `Subquery` node are retained.

## How was this patch tested?
Added a test case to `RemoveRedundantAliasAndProjectSuite` and added a regression test to `SubquerySuite`.

Author: Herman van Hovell <hvanhovell@databricks.com>

Closes #17278 from hvanhovell/SPARK-19933.
2017-03-14 18:52:16 +01:00
jiangxingbo a02a0b1703 [SPARK-18961][SQL] Support SHOW TABLE EXTENDED ... PARTITION statement
## What changes were proposed in this pull request?

We should support the statement `SHOW TABLE EXTENDED LIKE 'table_identifier' PARTITION(partition_spec)`, just like that HIVE does.
When partition is specified, the `SHOW TABLE EXTENDED` command should output the information of the partitions instead of the tables.
Note that in this statement, we require exact matched partition spec. For example:
```
CREATE TABLE show_t1(a String, b Int) PARTITIONED BY (c String, d String);
ALTER TABLE show_t1 ADD PARTITION (c='Us', d=1) PARTITION (c='Us', d=22);

-- Output the extended information of Partition(c='Us', d=1)
SHOW TABLE EXTENDED LIKE 'show_t1' PARTITION(c='Us', d=1);
-- Throw an AnalysisException
SHOW TABLE EXTENDED LIKE 'show_t1' PARTITION(c='Us');
```

## How was this patch tested?
Add new test sqls in file `show-tables.sql`.
Add new test case in `DDLSuite`.

Author: jiangxingbo <jiangxb1987@gmail.com>

Closes #16373 from jiangxb1987/show-partition-extended.
2017-03-14 10:13:50 -07:00
Herman van Hovell a0b92f73fe [SPARK-19850][SQL] Allow the use of aliases in SQL function calls
## What changes were proposed in this pull request?
We currently cannot use aliases in SQL function calls. This is inconvenient when you try to create a struct. This SQL query for example `select struct(1, 2) st`, will create a struct with column names `col1` and `col2`. This is even more problematic when we want to append a field to an existing struct. For example if we want to a field to struct `st` we would issue the following SQL query `select struct(st.*, 1) as st from src`, the result will be struct `st` with an a column with a non descriptive name `col3` (if `st` itself has 2 fields).

This PR proposes to change this by allowing the use of aliased expression in function parameters. For example `select struct(1 as a, 2 as b) st`, will create a struct with columns `a` & `b`.

## How was this patch tested?
Added a test to `ExpressionParserSuite` and added a test file for `SQLQueryTestSuite`.

Author: Herman van Hovell <hvanhovell@databricks.com>

Closes #17245 from hvanhovell/SPARK-19850.
2017-03-14 12:49:30 +01:00
Reynold Xin 0ee38a39e4 [SPARK-19944][SQL] Move SQLConf from sql/core to sql/catalyst
## What changes were proposed in this pull request?
This patch moves SQLConf from sql/core to sql/catalyst. To minimize the changes, the patch used type alias to still keep CatalystConf (as a type alias) and SimpleCatalystConf (as a concrete class that extends SQLConf).

Motivation for the change is that it is pretty weird to have SQLConf only in sql/core and then we have to duplicate config options that impact optimizer/analyzer in sql/catalyst using CatalystConf.

## How was this patch tested?
N/A

Author: Reynold Xin <rxin@databricks.com>

Closes #17285 from rxin/SPARK-19944.
2017-03-14 19:02:30 +08:00
Nattavut Sutyanyong 4ce970d714 [SPARK-18874][SQL] First phase: Deferring the correlated predicate pull up to Optimizer phase
## What changes were proposed in this pull request?
Currently Analyzer as part of ResolveSubquery, pulls up the correlated predicates to its
originating SubqueryExpression. The subquery plan is then transformed to remove the correlated
predicates after they are moved up to the outer plan. In this PR, the task of pulling up
correlated predicates is deferred to Optimizer. This is the initial work that will allow us to
support the form of correlated subqueries that we don't support today. The design document
from nsyca can be found in the following link :
[DesignDoc](https://docs.google.com/document/d/1QDZ8JwU63RwGFS6KVF54Rjj9ZJyK33d49ZWbjFBaIgU/edit#)

The brief description of code changes (hopefully to aid with code review) can be be found in the
following link:
[CodeChanges](https://docs.google.com/document/d/18mqjhL9V1An-tNta7aVE13HkALRZ5GZ24AATA-Vqqf0/edit#)

## How was this patch tested?
The test case PRs were submitted earlier using.
[16337](https://github.com/apache/spark/pull/16337) [16759](https://github.com/apache/spark/pull/16759) [16841](https://github.com/apache/spark/pull/16841) [16915](https://github.com/apache/spark/pull/16915) [16798](https://github.com/apache/spark/pull/16798) [16712](https://github.com/apache/spark/pull/16712) [16710](https://github.com/apache/spark/pull/16710) [16760](https://github.com/apache/spark/pull/16760) [16802](https://github.com/apache/spark/pull/16802)

Author: Dilip Biswal <dbiswal@us.ibm.com>

Closes #16954 from dilipbiswal/SPARK-18874.
2017-03-14 10:37:10 +01:00
Xiao Li 415f9f3423 [SPARK-19921][SQL][TEST] Enable end-to-end testing using different Hive metastore versions.
### What changes were proposed in this pull request?

To improve the quality of our Spark SQL in different Hive metastore versions, this PR is to enable end-to-end testing using different versions. This PR allows the test cases in sql/hive to pass the existing Hive client to create a SparkSession.
- Since Derby does not allow concurrent connections, the pre-built Hive clients use different database from the TestHive's built-in 1.2.1 client.
- Since our test cases in sql/hive only can create a single Spark context in the same JVM, the newly created SparkSession share the same spark context with the existing TestHive's corresponding SparkSession.

### How was this patch tested?
Fixed the existing test cases.

Author: Xiao Li <gatorsmile@gmail.com>

Closes #17260 from gatorsmile/versionSuite.
2017-03-14 14:19:02 +08:00
Wenchen Fan 05887fc3d8 [SPARK-19916][SQL] simplify bad file handling
## What changes were proposed in this pull request?

We should only have one centre place to try catch the exception for corrupted files.

## How was this patch tested?

existing test

Author: Wenchen Fan <wenchen@databricks.com>

Closes #17253 from cloud-fan/bad-file.
2017-03-12 23:16:45 -07:00
uncleGen e29a74d5b1 [DOCS][SS] fix structured streaming python example
## What changes were proposed in this pull request?

- SS python example: `TypeError: 'xxx' object is not callable`
- some other doc issue.

## How was this patch tested?

Jenkins.

Author: uncleGen <hustyugm@gmail.com>

Closes #17257 from uncleGen/docs-ss-python.
2017-03-12 08:29:37 +00:00
windpiger f6fdf92d0d [SPARK-19723][SQL] create datasource table with an non-existent location should work
## What changes were proposed in this pull request?

This JIRA is a follow up work after [SPARK-19583](https://issues.apache.org/jira/browse/SPARK-19583)

As we discussed in that [PR](https://github.com/apache/spark/pull/16938)

The following DDL for datasource table with an non-existent location should work:
```
CREATE TABLE ... (PARTITIONED BY ...) LOCATION path
```
Currently it will throw exception that path not exists for datasource table for datasource table

## How was this patch tested?
unit test added

Author: windpiger <songjun@outlook.com>

Closes #17055 from windpiger/CTDataSourcePathNotExists.
2017-03-10 20:59:32 -08:00
Wenchen Fan fb9beda546 [SPARK-19893][SQL] should not run DataFrame set oprations with map type
## What changes were proposed in this pull request?

In spark SQL, map type can't be used in equality test/comparison, and `Intersect`/`Except`/`Distinct` do need equality test for all columns, we should not allow map type in `Intersect`/`Except`/`Distinct`.

## How was this patch tested?

new regression test

Author: Wenchen Fan <wenchen@databricks.com>

Closes #17236 from cloud-fan/map.
2017-03-10 16:14:22 -08:00
Cheng Lian ffee4f1cef [SPARK-19905][SQL] Bring back Dataset.inputFiles for Hive SerDe tables
## What changes were proposed in this pull request?

`Dataset.inputFiles` works by matching `FileRelation`s in the query plan. In Spark 2.1, Hive SerDe tables are represented by `MetastoreRelation`, which inherits from `FileRelation`. However, in Spark 2.2, Hive SerDe tables are now represented by `CatalogRelation`, which doesn't inherit from `FileRelation` anymore, due to the unification of Hive SerDe tables and data source tables. This change breaks `Dataset.inputFiles` for Hive SerDe tables.

This PR tries to fix this issue by explicitly matching `CatalogRelation`s that are Hive SerDe tables in `Dataset.inputFiles`. Note that we can't make `CatalogRelation` inherit from `FileRelation` since not all `CatalogRelation`s are file based (e.g., JDBC data source tables).

## How was this patch tested?

New test case added in `HiveDDLSuite`.

Author: Cheng Lian <lian@databricks.com>

Closes #17247 from liancheng/spark-19905-hive-table-input-files.
2017-03-10 15:19:32 -08:00
Carson Wang dd9049e049 [SPARK-19620][SQL] Fix incorrect exchange coordinator id in the physical plan
## What changes were proposed in this pull request?
When adaptive execution is enabled, an exchange coordinator is used in the Exchange operators. For Join, the same exchange coordinator is used for its two Exchanges. But the physical plan shows two different coordinator Ids which is confusing.

This PR is to fix the incorrect exchange coordinator id in the physical plan. The coordinator object instead of the `Option[ExchangeCoordinator]` should be used to generate the identity hash code of the same coordinator.

## How was this patch tested?
Before the patch, the physical plan shows two different exchange coordinator id for Join.
```
== Physical Plan ==
*Project [key1#3L, value2#12L]
+- *SortMergeJoin [key1#3L], [key2#11L], Inner
   :- *Sort [key1#3L ASC NULLS FIRST], false, 0
   :  +- Exchange(coordinator id: 1804587700) hashpartitioning(key1#3L, 10), coordinator[target post-shuffle partition size: 67108864]
   :     +- *Project [(id#0L % 500) AS key1#3L]
   :        +- *Filter isnotnull((id#0L % 500))
   :           +- *Range (0, 1000, step=1, splits=Some(10))
   +- *Sort [key2#11L ASC NULLS FIRST], false, 0
      +- Exchange(coordinator id: 793927319) hashpartitioning(key2#11L, 10), coordinator[target post-shuffle partition size: 67108864]
         +- *Project [(id#8L % 500) AS key2#11L, id#8L AS value2#12L]
            +- *Filter isnotnull((id#8L % 500))
               +- *Range (0, 1000, step=1, splits=Some(10))
```
After the patch, two exchange coordinator id are the same.

Author: Carson Wang <carson.wang@intel.com>

Closes #16952 from carsonwang/FixCoordinatorId.
2017-03-10 11:13:26 -08:00
Kazuaki Ishizaki fcb68e0f5d [SPARK-19786][SQL] Facilitate loop optimizations in a JIT compiler regarding range()
## What changes were proposed in this pull request?

This PR improves performance of operations with `range()` by changing Java code generated by Catalyst. This PR is inspired by the [blog article](https://databricks.com/blog/2017/02/16/processing-trillion-rows-per-second-single-machine-can-nested-loop-joins-fast.html).

This PR changes generated code in the following two points.
1. Replace a while-loop with long instance variables a for-loop with int local varibles
2. Suppress generation of `shouldStop()` method if this method is unnecessary (e.g. `append()` is not generated).

These points facilitates compiler optimizations in a JIT compiler by feeding the simplified Java code into the JIT compiler. The performance is improved by 7.6x.

Benchmark program:
```java
val N = 1 << 29
val iters = 2
val benchmark = new Benchmark("range.count", N * iters)
benchmark.addCase(s"with this PR") { i =>
  var n = 0
  var len = 0
  while (n < iters) {
    len += sparkSession.range(N).selectExpr("count(id)").collect.length
    n += 1
  }
}
benchmark.run
```

Performance result without this PR
```
OpenJDK 64-Bit Server VM 1.8.0_111-8u111-b14-2ubuntu0.16.04.2-b14 on Linux 4.4.0-47-generic
Intel(R) Xeon(R) CPU E5-2667 v3  3.20GHz
range.count:                             Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
w/o this PR                                   1349 / 1356        796.2           1.3       1.0X
```

Performance result with this PR
```
OpenJDK 64-Bit Server VM 1.8.0_111-8u111-b14-2ubuntu0.16.04.2-b14 on Linux 4.4.0-47-generic
Intel(R) Xeon(R) CPU E5-2667 v3  3.20GHz
range.count:                             Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
with this PR                                   177 /  271       6065.3           0.2       1.0X
```

Here is a comparison between generated code w/o and with this PR. Only the method ```agg_doAggregateWithoutKey``` is changed.

Generated code without this PR
```java

/* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */   private Object[] references;
/* 007 */   private scala.collection.Iterator[] inputs;
/* 008 */   private boolean agg_initAgg;
/* 009 */   private boolean agg_bufIsNull;
/* 010 */   private long agg_bufValue;
/* 011 */   private org.apache.spark.sql.execution.metric.SQLMetric range_numOutputRows;
/* 012 */   private org.apache.spark.sql.execution.metric.SQLMetric range_numGeneratedRows;
/* 013 */   private boolean range_initRange;
/* 014 */   private long range_number;
/* 015 */   private TaskContext range_taskContext;
/* 016 */   private InputMetrics range_inputMetrics;
/* 017 */   private long range_batchEnd;
/* 018 */   private long range_numElementsTodo;
/* 019 */   private scala.collection.Iterator range_input;
/* 020 */   private UnsafeRow range_result;
/* 021 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder range_holder;
/* 022 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter range_rowWriter;
/* 023 */   private org.apache.spark.sql.execution.metric.SQLMetric agg_numOutputRows;
/* 024 */   private org.apache.spark.sql.execution.metric.SQLMetric agg_aggTime;
/* 025 */   private UnsafeRow agg_result;
/* 026 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder;
/* 027 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter;
/* 028 */
/* 029 */   public GeneratedIterator(Object[] references) {
/* 030 */     this.references = references;
/* 031 */   }
/* 032 */
/* 033 */   public void init(int index, scala.collection.Iterator[] inputs) {
/* 034 */     partitionIndex = index;
/* 035 */     this.inputs = inputs;
/* 036 */     agg_initAgg = false;
/* 037 */
/* 038 */     this.range_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[0];
/* 039 */     this.range_numGeneratedRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[1];
/* 040 */     range_initRange = false;
/* 041 */     range_number = 0L;
/* 042 */     range_taskContext = TaskContext.get();
/* 043 */     range_inputMetrics = range_taskContext.taskMetrics().inputMetrics();
/* 044 */     range_batchEnd = 0;
/* 045 */     range_numElementsTodo = 0L;
/* 046 */     range_input = inputs[0];
/* 047 */     range_result = new UnsafeRow(1);
/* 048 */     this.range_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(range_result, 0);
/* 049 */     this.range_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(range_holder, 1);
/* 050 */     this.agg_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[2];
/* 051 */     this.agg_aggTime = (org.apache.spark.sql.execution.metric.SQLMetric) references[3];
/* 052 */     agg_result = new UnsafeRow(1);
/* 053 */     this.agg_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_result, 0);
/* 054 */     this.agg_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_holder, 1);
/* 055 */
/* 056 */   }
/* 057 */
/* 058 */   private void agg_doAggregateWithoutKey() throws java.io.IOException {
/* 059 */     // initialize aggregation buffer
/* 060 */     agg_bufIsNull = false;
/* 061 */     agg_bufValue = 0L;
/* 062 */
/* 063 */     // initialize Range
/* 064 */     if (!range_initRange) {
/* 065 */       range_initRange = true;
/* 066 */       initRange(partitionIndex);
/* 067 */     }
/* 068 */
/* 069 */     while (true) {
/* 070 */       while (range_number != range_batchEnd) {
/* 071 */         long range_value = range_number;
/* 072 */         range_number += 1L;
/* 073 */
/* 074 */         // do aggregate
/* 075 */         // common sub-expressions
/* 076 */
/* 077 */         // evaluate aggregate function
/* 078 */         boolean agg_isNull1 = false;
/* 079 */
/* 080 */         long agg_value1 = -1L;
/* 081 */         agg_value1 = agg_bufValue + 1L;
/* 082 */         // update aggregation buffer
/* 083 */         agg_bufIsNull = false;
/* 084 */         agg_bufValue = agg_value1;
/* 085 */
/* 086 */         if (shouldStop()) return;
/* 087 */       }
/* 088 */
/* 089 */       if (range_taskContext.isInterrupted()) {
/* 090 */         throw new TaskKilledException();
/* 091 */       }
/* 092 */
/* 093 */       long range_nextBatchTodo;
/* 094 */       if (range_numElementsTodo > 1000L) {
/* 095 */         range_nextBatchTodo = 1000L;
/* 096 */         range_numElementsTodo -= 1000L;
/* 097 */       } else {
/* 098 */         range_nextBatchTodo = range_numElementsTodo;
/* 099 */         range_numElementsTodo = 0;
/* 100 */         if (range_nextBatchTodo == 0) break;
/* 101 */       }
/* 102 */       range_numOutputRows.add(range_nextBatchTodo);
/* 103 */       range_inputMetrics.incRecordsRead(range_nextBatchTodo);
/* 104 */
/* 105 */       range_batchEnd += range_nextBatchTodo * 1L;
/* 106 */     }
/* 107 */
/* 108 */   }
/* 109 */
/* 110 */   private void initRange(int idx) {
/* 111 */     java.math.BigInteger index = java.math.BigInteger.valueOf(idx);
/* 112 */     java.math.BigInteger numSlice = java.math.BigInteger.valueOf(2L);
/* 113 */     java.math.BigInteger numElement = java.math.BigInteger.valueOf(10000L);
/* 114 */     java.math.BigInteger step = java.math.BigInteger.valueOf(1L);
/* 115 */     java.math.BigInteger start = java.math.BigInteger.valueOf(0L);
/* 117 */
/* 118 */     java.math.BigInteger st = index.multiply(numElement).divide(numSlice).multiply(step).add(start);
/* 119 */     if (st.compareTo(java.math.BigInteger.valueOf(Long.MAX_VALUE)) > 0) {
/* 120 */       range_number = Long.MAX_VALUE;
/* 121 */     } else if (st.compareTo(java.math.BigInteger.valueOf(Long.MIN_VALUE)) < 0) {
/* 122 */       range_number = Long.MIN_VALUE;
/* 123 */     } else {
/* 124 */       range_number = st.longValue();
/* 125 */     }
/* 126 */     range_batchEnd = range_number;
/* 127 */
/* 128 */     java.math.BigInteger end = index.add(java.math.BigInteger.ONE).multiply(numElement).divide(numSlice)
/* 129 */     .multiply(step).add(start);
/* 130 */     if (end.compareTo(java.math.BigInteger.valueOf(Long.MAX_VALUE)) > 0) {
/* 131 */       partitionEnd = Long.MAX_VALUE;
/* 132 */     } else if (end.compareTo(java.math.BigInteger.valueOf(Long.MIN_VALUE)) < 0) {
/* 133 */       partitionEnd = Long.MIN_VALUE;
/* 134 */     } else {
/* 135 */       partitionEnd = end.longValue();
/* 136 */     }
/* 137 */
/* 138 */     java.math.BigInteger startToEnd = java.math.BigInteger.valueOf(partitionEnd).subtract(
/* 139 */       java.math.BigInteger.valueOf(range_number));
/* 140 */     range_numElementsTodo  = startToEnd.divide(step).longValue();
/* 141 */     if (range_numElementsTodo < 0) {
/* 142 */       range_numElementsTodo = 0;
/* 143 */     } else if (startToEnd.remainder(step).compareTo(java.math.BigInteger.valueOf(0L)) != 0) {
/* 144 */       range_numElementsTodo++;
/* 145 */     }
/* 146 */   }
/* 147 */
/* 148 */   protected void processNext() throws java.io.IOException {
/* 149 */     while (!agg_initAgg) {
/* 150 */       agg_initAgg = true;
/* 151 */       long agg_beforeAgg = System.nanoTime();
/* 152 */       agg_doAggregateWithoutKey();
/* 153 */       agg_aggTime.add((System.nanoTime() - agg_beforeAgg) / 1000000);
/* 154 */
/* 155 */       // output the result
/* 156 */
/* 157 */       agg_numOutputRows.add(1);
/* 158 */       agg_rowWriter.zeroOutNullBytes();
/* 159 */
/* 160 */       if (agg_bufIsNull) {
/* 161 */         agg_rowWriter.setNullAt(0);
/* 162 */       } else {
/* 163 */         agg_rowWriter.write(0, agg_bufValue);
/* 164 */       }
/* 165 */       append(agg_result);
/* 166 */     }
/* 167 */   }
/* 168 */ }
```

Generated code with this PR
```java
/* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */   private Object[] references;
/* 007 */   private scala.collection.Iterator[] inputs;
/* 008 */   private boolean agg_initAgg;
/* 009 */   private boolean agg_bufIsNull;
/* 010 */   private long agg_bufValue;
/* 011 */   private org.apache.spark.sql.execution.metric.SQLMetric range_numOutputRows;
/* 012 */   private org.apache.spark.sql.execution.metric.SQLMetric range_numGeneratedRows;
/* 013 */   private boolean range_initRange;
/* 014 */   private long range_number;
/* 015 */   private TaskContext range_taskContext;
/* 016 */   private InputMetrics range_inputMetrics;
/* 017 */   private long range_batchEnd;
/* 018 */   private long range_numElementsTodo;
/* 019 */   private scala.collection.Iterator range_input;
/* 020 */   private UnsafeRow range_result;
/* 021 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder range_holder;
/* 022 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter range_rowWriter;
/* 023 */   private org.apache.spark.sql.execution.metric.SQLMetric agg_numOutputRows;
/* 024 */   private org.apache.spark.sql.execution.metric.SQLMetric agg_aggTime;
/* 025 */   private UnsafeRow agg_result;
/* 026 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder;
/* 027 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter;
/* 028 */
/* 029 */   public GeneratedIterator(Object[] references) {
/* 030 */     this.references = references;
/* 031 */   }
/* 032 */
/* 033 */   public void init(int index, scala.collection.Iterator[] inputs) {
/* 034 */     partitionIndex = index;
/* 035 */     this.inputs = inputs;
/* 036 */     agg_initAgg = false;
/* 037 */
/* 038 */     this.range_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[0];
/* 039 */     this.range_numGeneratedRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[1];
/* 040 */     range_initRange = false;
/* 041 */     range_number = 0L;
/* 042 */     range_taskContext = TaskContext.get();
/* 043 */     range_inputMetrics = range_taskContext.taskMetrics().inputMetrics();
/* 044 */     range_batchEnd = 0;
/* 045 */     range_numElementsTodo = 0L;
/* 046 */     range_input = inputs[0];
/* 047 */     range_result = new UnsafeRow(1);
/* 048 */     this.range_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(range_result, 0);
/* 049 */     this.range_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(range_holder, 1);
/* 050 */     this.agg_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[2];
/* 051 */     this.agg_aggTime = (org.apache.spark.sql.execution.metric.SQLMetric) references[3];
/* 052 */     agg_result = new UnsafeRow(1);
/* 053 */     this.agg_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_result, 0);
/* 054 */     this.agg_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_holder, 1);
/* 055 */
/* 056 */   }
/* 057 */
/* 058 */   private void agg_doAggregateWithoutKey() throws java.io.IOException {
/* 059 */     // initialize aggregation buffer
/* 060 */     agg_bufIsNull = false;
/* 061 */     agg_bufValue = 0L;
/* 062 */
/* 063 */     // initialize Range
/* 064 */     if (!range_initRange) {
/* 065 */       range_initRange = true;
/* 066 */       initRange(partitionIndex);
/* 067 */     }
/* 068 */
/* 069 */     while (true) {
/* 070 */       long range_range = range_batchEnd - range_number;
/* 071 */       if (range_range != 0L) {
/* 072 */         int range_localEnd = (int)(range_range / 1L);
/* 073 */         for (int range_localIdx = 0; range_localIdx < range_localEnd; range_localIdx++) {
/* 074 */           long range_value = ((long)range_localIdx * 1L) + range_number;
/* 075 */
/* 076 */           // do aggregate
/* 077 */           // common sub-expressions
/* 078 */
/* 079 */           // evaluate aggregate function
/* 080 */           boolean agg_isNull1 = false;
/* 081 */
/* 082 */           long agg_value1 = -1L;
/* 083 */           agg_value1 = agg_bufValue + 1L;
/* 084 */           // update aggregation buffer
/* 085 */           agg_bufIsNull = false;
/* 086 */           agg_bufValue = agg_value1;
/* 087 */
/* 088 */           // shouldStop check is eliminated
/* 089 */         }
/* 090 */         range_number = range_batchEnd;
/* 091 */       }
/* 092 */
/* 093 */       if (range_taskContext.isInterrupted()) {
/* 094 */         throw new TaskKilledException();
/* 095 */       }
/* 096 */
/* 097 */       long range_nextBatchTodo;
/* 098 */       if (range_numElementsTodo > 1000L) {
/* 099 */         range_nextBatchTodo = 1000L;
/* 100 */         range_numElementsTodo -= 1000L;
/* 101 */       } else {
/* 102 */         range_nextBatchTodo = range_numElementsTodo;
/* 103 */         range_numElementsTodo = 0;
/* 104 */         if (range_nextBatchTodo == 0) break;
/* 105 */       }
/* 106 */       range_numOutputRows.add(range_nextBatchTodo);
/* 107 */       range_inputMetrics.incRecordsRead(range_nextBatchTodo);
/* 108 */
/* 109 */       range_batchEnd += range_nextBatchTodo * 1L;
/* 110 */     }
/* 111 */
/* 112 */   }
/* 113 */
/* 114 */   private void initRange(int idx) {
/* 115 */     java.math.BigInteger index = java.math.BigInteger.valueOf(idx);
/* 116 */     java.math.BigInteger numSlice = java.math.BigInteger.valueOf(2L);
/* 117 */     java.math.BigInteger numElement = java.math.BigInteger.valueOf(10000L);
/* 118 */     java.math.BigInteger step = java.math.BigInteger.valueOf(1L);
/* 119 */     java.math.BigInteger start = java.math.BigInteger.valueOf(0L);
/* 120 */     long partitionEnd;
/* 121 */
/* 122 */     java.math.BigInteger st = index.multiply(numElement).divide(numSlice).multiply(step).add(start);
/* 123 */     if (st.compareTo(java.math.BigInteger.valueOf(Long.MAX_VALUE)) > 0) {
/* 124 */       range_number = Long.MAX_VALUE;
/* 125 */     } else if (st.compareTo(java.math.BigInteger.valueOf(Long.MIN_VALUE)) < 0) {
/* 126 */       range_number = Long.MIN_VALUE;
/* 127 */     } else {
/* 128 */       range_number = st.longValue();
/* 129 */     }
/* 130 */     range_batchEnd = range_number;
/* 131 */
/* 132 */     java.math.BigInteger end = index.add(java.math.BigInteger.ONE).multiply(numElement).divide(numSlice)
/* 133 */     .multiply(step).add(start);
/* 134 */     if (end.compareTo(java.math.BigInteger.valueOf(Long.MAX_VALUE)) > 0) {
/* 135 */       partitionEnd = Long.MAX_VALUE;
/* 136 */     } else if (end.compareTo(java.math.BigInteger.valueOf(Long.MIN_VALUE)) < 0) {
/* 137 */       partitionEnd = Long.MIN_VALUE;
/* 138 */     } else {
/* 139 */       partitionEnd = end.longValue();
/* 140 */     }
/* 141 */
/* 142 */     java.math.BigInteger startToEnd = java.math.BigInteger.valueOf(partitionEnd).subtract(
/* 143 */       java.math.BigInteger.valueOf(range_number));
/* 144 */     range_numElementsTodo  = startToEnd.divide(step).longValue();
/* 145 */     if (range_numElementsTodo < 0) {
/* 146 */       range_numElementsTodo = 0;
/* 147 */     } else if (startToEnd.remainder(step).compareTo(java.math.BigInteger.valueOf(0L)) != 0) {
/* 148 */       range_numElementsTodo++;
/* 149 */     }
/* 150 */   }
/* 151 */
/* 152 */   protected void processNext() throws java.io.IOException {
/* 153 */     while (!agg_initAgg) {
/* 154 */       agg_initAgg = true;
/* 155 */       long agg_beforeAgg = System.nanoTime();
/* 156 */       agg_doAggregateWithoutKey();
/* 157 */       agg_aggTime.add((System.nanoTime() - agg_beforeAgg) / 1000000);
/* 158 */
/* 159 */       // output the result
/* 160 */
/* 161 */       agg_numOutputRows.add(1);
/* 162 */       agg_rowWriter.zeroOutNullBytes();
/* 163 */
/* 164 */       if (agg_bufIsNull) {
/* 165 */         agg_rowWriter.setNullAt(0);
/* 166 */       } else {
/* 167 */         agg_rowWriter.write(0, agg_bufValue);
/* 168 */       }
/* 169 */       append(agg_result);
/* 170 */     }
/* 171 */   }
/* 172 */ }
```

A part of suppressing `shouldStop()` was originally developed by inouehrs

## How was this patch tested?

Add new tests into `DataFrameRangeSuite`

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes #17122 from kiszk/SPARK-19786.
2017-03-10 18:04:37 +01:00
Tyson Condie 501b711199 [SPARK-19891][SS] Await Batch Lock notified on stream execution exit
## What changes were proposed in this pull request?

We need to notify the await batch lock when the stream exits early e.g., when an exception has been thrown.

## How was this patch tested?

Current tests that throw exceptions at runtime will finish faster as a result of this update.

zsxwing

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Tyson Condie <tcondie@gmail.com>

Closes #17231 from tcondie/kafka-writer.
2017-03-09 23:02:13 -08:00
Kazuaki Ishizaki 5949e6c447 [SPARK-19008][SQL] Improve performance of Dataset.map by eliminating boxing/unboxing
## What changes were proposed in this pull request?

This PR improve performance of Dataset.map() for primitive types by removing boxing/unbox operations. This is based on [the discussion](https://github.com/apache/spark/pull/16391#discussion_r93788919) with cloud-fan.

Current Catalyst generates a method call to a `apply()` method of an anonymous function written in Scala. The types of an argument and return value are `java.lang.Object`. As a result, each method call for a primitive value involves a pair of unboxing and boxing for calling this `apply()` method and a pair of boxing and unboxing for returning from this `apply()` method.

This PR directly calls a specialized version of a `apply()` method without boxing and unboxing. For example, if types of an arguments ant return value is `int`, this PR generates a method call to `apply$mcII$sp`. This PR supports any combination of `Int`, `Long`, `Float`, and `Double`.

The following is a benchmark result using [this program](https://github.com/apache/spark/pull/16391/files) with 4.7x. Here is a Dataset part of this program.

Without this PR
```
OpenJDK 64-Bit Server VM 1.8.0_111-8u111-b14-2ubuntu0.16.04.2-b14 on Linux 4.4.0-47-generic
Intel(R) Xeon(R) CPU E5-2667 v3  3.20GHz
back-to-back map:                        Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
RDD                                           1923 / 1952         52.0          19.2       1.0X
DataFrame                                      526 /  548        190.2           5.3       3.7X
Dataset                                       3094 / 3154         32.3          30.9       0.6X
```

With this PR
```
OpenJDK 64-Bit Server VM 1.8.0_111-8u111-b14-2ubuntu0.16.04.2-b14 on Linux 4.4.0-47-generic
Intel(R) Xeon(R) CPU E5-2667 v3  3.20GHz
back-to-back map:                        Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
RDD                                           1883 / 1892         53.1          18.8       1.0X
DataFrame                                      502 /  642        199.1           5.0       3.7X
Dataset                                        657 /  784        152.2           6.6       2.9X
```

```java
  def backToBackMap(spark: SparkSession, numRows: Long, numChains: Int): Benchmark = {
    import spark.implicits._
    val rdd = spark.sparkContext.range(0, numRows)
    val ds = spark.range(0, numRows)
    val func = (l: Long) => l + 1
    val benchmark = new Benchmark("back-to-back map", numRows)
...
    benchmark.addCase("Dataset") { iter =>
      var res = ds.as[Long]
      var i = 0
      while (i < numChains) {
        res = res.map(func)
        i += 1
      }
      res.queryExecution.toRdd.foreach(_ => Unit)
    }
    benchmark
  }
```

A motivating example
```java
Seq(1, 2, 3).toDS.map(i => i * 7).show
```

Generated code without this PR
```java
/* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */   private Object[] references;
/* 007 */   private scala.collection.Iterator[] inputs;
/* 008 */   private scala.collection.Iterator inputadapter_input;
/* 009 */   private UnsafeRow deserializetoobject_result;
/* 010 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder deserializetoobject_holder;
/* 011 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter deserializetoobject_rowWriter;
/* 012 */   private int mapelements_argValue;
/* 013 */   private UnsafeRow mapelements_result;
/* 014 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder mapelements_holder;
/* 015 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter mapelements_rowWriter;
/* 016 */   private UnsafeRow serializefromobject_result;
/* 017 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder serializefromobject_holder;
/* 018 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter serializefromobject_rowWriter;
/* 019 */
/* 020 */   public GeneratedIterator(Object[] references) {
/* 021 */     this.references = references;
/* 022 */   }
/* 023 */
/* 024 */   public void init(int index, scala.collection.Iterator[] inputs) {
/* 025 */     partitionIndex = index;
/* 026 */     this.inputs = inputs;
/* 027 */     inputadapter_input = inputs[0];
/* 028 */     deserializetoobject_result = new UnsafeRow(1);
/* 029 */     this.deserializetoobject_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(deserializetoobject_result, 0);
/* 030 */     this.deserializetoobject_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(deserializetoobject_holder, 1);
/* 031 */
/* 032 */     mapelements_result = new UnsafeRow(1);
/* 033 */     this.mapelements_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(mapelements_result, 0);
/* 034 */     this.mapelements_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(mapelements_holder, 1);
/* 035 */     serializefromobject_result = new UnsafeRow(1);
/* 036 */     this.serializefromobject_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(serializefromobject_result, 0);
/* 037 */     this.serializefromobject_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(serializefromobject_holder, 1);
/* 038 */
/* 039 */   }
/* 040 */
/* 041 */   protected void processNext() throws java.io.IOException {
/* 042 */     while (inputadapter_input.hasNext() && !stopEarly()) {
/* 043 */       InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
/* 044 */       int inputadapter_value = inputadapter_row.getInt(0);
/* 045 */
/* 046 */       boolean mapelements_isNull = true;
/* 047 */       int mapelements_value = -1;
/* 048 */       if (!false) {
/* 049 */         mapelements_argValue = inputadapter_value;
/* 050 */
/* 051 */         mapelements_isNull = false;
/* 052 */         if (!mapelements_isNull) {
/* 053 */           Object mapelements_funcResult = null;
/* 054 */           mapelements_funcResult = ((scala.Function1) references[0]).apply(mapelements_argValue);
/* 055 */           if (mapelements_funcResult == null) {
/* 056 */             mapelements_isNull = true;
/* 057 */           } else {
/* 058 */             mapelements_value = (Integer) mapelements_funcResult;
/* 059 */           }
/* 060 */
/* 061 */         }
/* 062 */
/* 063 */       }
/* 064 */
/* 065 */       serializefromobject_rowWriter.zeroOutNullBytes();
/* 066 */
/* 067 */       if (mapelements_isNull) {
/* 068 */         serializefromobject_rowWriter.setNullAt(0);
/* 069 */       } else {
/* 070 */         serializefromobject_rowWriter.write(0, mapelements_value);
/* 071 */       }
/* 072 */       append(serializefromobject_result);
/* 073 */       if (shouldStop()) return;
/* 074 */     }
/* 075 */   }
/* 076 */ }
```

Generated code with this PR (lines 48-56 are changed)
```java
/* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */   private Object[] references;
/* 007 */   private scala.collection.Iterator[] inputs;
/* 008 */   private scala.collection.Iterator inputadapter_input;
/* 009 */   private UnsafeRow deserializetoobject_result;
/* 010 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder deserializetoobject_holder;
/* 011 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter deserializetoobject_rowWriter;
/* 012 */   private int mapelements_argValue;
/* 013 */   private UnsafeRow mapelements_result;
/* 014 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder mapelements_holder;
/* 015 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter mapelements_rowWriter;
/* 016 */   private UnsafeRow serializefromobject_result;
/* 017 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder serializefromobject_holder;
/* 018 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter serializefromobject_rowWriter;
/* 019 */
/* 020 */   public GeneratedIterator(Object[] references) {
/* 021 */     this.references = references;
/* 022 */   }
/* 023 */
/* 024 */   public void init(int index, scala.collection.Iterator[] inputs) {
/* 025 */     partitionIndex = index;
/* 026 */     this.inputs = inputs;
/* 027 */     inputadapter_input = inputs[0];
/* 028 */     deserializetoobject_result = new UnsafeRow(1);
/* 029 */     this.deserializetoobject_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(deserializetoobject_result, 0);
/* 030 */     this.deserializetoobject_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(deserializetoobject_holder, 1);
/* 031 */
/* 032 */     mapelements_result = new UnsafeRow(1);
/* 033 */     this.mapelements_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(mapelements_result, 0);
/* 034 */     this.mapelements_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(mapelements_holder, 1);
/* 035 */     serializefromobject_result = new UnsafeRow(1);
/* 036 */     this.serializefromobject_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(serializefromobject_result, 0);
/* 037 */     this.serializefromobject_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(serializefromobject_holder, 1);
/* 038 */
/* 039 */   }
/* 040 */
/* 041 */   protected void processNext() throws java.io.IOException {
/* 042 */     while (inputadapter_input.hasNext() && !stopEarly()) {
/* 043 */       InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
/* 044 */       int inputadapter_value = inputadapter_row.getInt(0);
/* 045 */
/* 046 */       boolean mapelements_isNull = true;
/* 047 */       int mapelements_value = -1;
/* 048 */       if (!false) {
/* 049 */         mapelements_argValue = inputadapter_value;
/* 050 */
/* 051 */         mapelements_isNull = false;
/* 052 */         if (!mapelements_isNull) {
/* 053 */           mapelements_value = ((scala.Function1) references[0]).apply$mcII$sp(mapelements_argValue);
/* 054 */         }
/* 055 */
/* 056 */       }
/* 057 */
/* 058 */       serializefromobject_rowWriter.zeroOutNullBytes();
/* 059 */
/* 060 */       if (mapelements_isNull) {
/* 061 */         serializefromobject_rowWriter.setNullAt(0);
/* 062 */       } else {
/* 063 */         serializefromobject_rowWriter.write(0, mapelements_value);
/* 064 */       }
/* 065 */       append(serializefromobject_result);
/* 066 */       if (shouldStop()) return;
/* 067 */     }
/* 068 */   }
/* 069 */ }
```

Java bytecode for methods for `i => i * 7`
```java
$ javap -c Test\$\$anonfun\$5\$\$anonfun\$apply\$mcV\$sp\$1.class
Compiled from "Test.scala"
public final class org.apache.spark.sql.Test$$anonfun$5$$anonfun$apply$mcV$sp$1 extends scala.runtime.AbstractFunction1$mcII$sp implements scala.Serializable {
  public static final long serialVersionUID;

  public final int apply(int);
    Code:
       0: aload_0
       1: iload_1
       2: invokevirtual #18                 // Method apply$mcII$sp:(I)I
       5: ireturn

  public int apply$mcII$sp(int);
    Code:
       0: iload_1
       1: bipush        7
       3: imul
       4: ireturn

  public final java.lang.Object apply(java.lang.Object);
    Code:
       0: aload_0
       1: aload_1
       2: invokestatic  #29                 // Method scala/runtime/BoxesRunTime.unboxToInt:(Ljava/lang/Object;)I
       5: invokevirtual #31                 // Method apply:(I)I
       8: invokestatic  #35                 // Method scala/runtime/BoxesRunTime.boxToInteger:(I)Ljava/lang/Integer;
      11: areturn

  public org.apache.spark.sql.Test$$anonfun$5$$anonfun$apply$mcV$sp$1(org.apache.spark.sql.Test$$anonfun$5);
    Code:
       0: aload_0
       1: invokespecial #42                 // Method scala/runtime/AbstractFunction1$mcII$sp."<init>":()V
       4: return
}
```
## How was this patch tested?

Added new test suites to `DatasetPrimitiveSuite`.

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes #17172 from kiszk/SPARK-19008.
2017-03-09 22:58:52 -08:00
Budde f79371ad86 [SPARK-19611][SQL] Introduce configurable table schema inference
## Summary of changes

Add a new configuration option that allows Spark SQL to infer a case-sensitive schema from a Hive Metastore table's data files when a case-sensitive schema can't be read from the table properties.

- Add spark.sql.hive.caseSensitiveInferenceMode param to SQLConf
- Add schemaPreservesCase field to CatalogTable (set to false when schema can't
  successfully be read from Hive table props)
- Perform schema inference in HiveMetastoreCatalog if schemaPreservesCase is
  false, depending on spark.sql.hive.caseSensitiveInferenceMode
- Add alterTableSchema() method to the ExternalCatalog interface
- Add HiveSchemaInferenceSuite tests
- Refactor and move ParquetFileForamt.meregeMetastoreParquetSchema() as
  HiveMetastoreCatalog.mergeWithMetastoreSchema
- Move schema merging tests from ParquetSchemaSuite to HiveSchemaInferenceSuite

[JIRA for this change](https://issues.apache.org/jira/browse/SPARK-19611)

## How was this patch tested?

The tests in ```HiveSchemaInferenceSuite``` should verify that schema inference is working as expected. ```ExternalCatalogSuite``` has also been extended to cover the new ```alterTableSchema()``` API.

Author: Budde <budde@amazon.com>

Closes #16944 from budde/SPARK-19611.
2017-03-09 12:55:33 -08:00
Jeff Zhang cabe1df860 [SPARK-12334][SQL][PYSPARK] Support read from multiple input paths for orc file in DataFrameReader.orc
Beside the issue in spark api, also fix 2 minor issues in pyspark
- support read from multiple input paths for orc
- support read from multiple input paths for text

Author: Jeff Zhang <zjffdu@apache.org>

Closes #10307 from zjffdu/SPARK-12334.
2017-03-09 11:44:34 -08:00
uncleGen 30b18e6936 [SPARK-19861][SS] watermark should not be a negative time.
## What changes were proposed in this pull request?

`watermark` should not be negative. This behavior is invalid, check it before real run.

## How was this patch tested?

add new unit test.

Author: uncleGen <hustyugm@gmail.com>
Author: dylon <hustyugm@gmail.com>

Closes #17202 from uncleGen/SPARK-19861.
2017-03-09 11:07:31 -08:00
Liwei Lin 40da4d181d [SPARK-19715][STRUCTURED STREAMING] Option to Strip Paths in FileSource
## What changes were proposed in this pull request?

Today, we compare the whole path when deciding if a file is new in the FileSource for structured streaming. However, this would cause false negatives in the case where the path has changed in a cosmetic way (i.e. changing `s3n` to `s3a`).

This patch adds an option `fileNameOnly` that causes the new file check to be based only on the filename (but still store the whole path in the log).

## Usage

```scala
spark
  .readStream
  .option("fileNameOnly", true)
  .text("s3n://bucket/dir1/dir2")
  .writeStream
  ...
```
## How was this patch tested?

Added a test case

Author: Liwei Lin <lwlin7@gmail.com>

Closes #17120 from lw-lin/filename-only.
2017-03-09 11:02:44 -08:00
Jason White 206030bd12 [SPARK-19561][SQL] add int case handling for TimestampType
## What changes were proposed in this pull request?

Add handling of input of type `Int` for dataType `TimestampType` to `EvaluatePython.scala`. Py4J serializes ints smaller than MIN_INT or larger than MAX_INT to Long, which are handled correctly already, but values between MIN_INT and MAX_INT are serialized to Int.

These range limits correspond to roughly half an hour on either side of the epoch. As a result, PySpark doesn't allow TimestampType values to be created in this range.

Alternatives attempted: patching the `TimestampType.toInternal` function to cast return values to `long`, so Py4J would always serialize them to Scala Long. Python3 does not have a `long` type, so this approach failed on Python3.

## How was this patch tested?

Added a new PySpark-side test that fails without the change.

The contribution is my original work and I license the work to the project under the project’s open source license.

Resubmission of https://github.com/apache/spark/pull/16896. The original PR didn't go through Jenkins and broke the build. davies dongjoon-hyun

cloud-fan Could you kick off a Jenkins run for me? It passed everything for me locally, but it's possible something has changed in the last few weeks.

Author: Jason White <jason.white@shopify.com>

Closes #17200 from JasonMWhite/SPARK-19561.
2017-03-09 10:34:54 -08:00
windpiger 274973d2a3 [SPARK-19763][SQL] qualified external datasource table location stored in catalog
## What changes were proposed in this pull request?

If we create a external datasource table with a non-qualified location , we should qualified it to store in catalog.

```
CREATE TABLE t(a string)
USING parquet
LOCATION '/path/xx'

CREATE TABLE t1(a string, b string)
USING parquet
PARTITIONED BY(b)
LOCATION '/path/xx'
```

when we get the table from catalog, the location should be qualified, e.g.'file:/path/xxx'
## How was this patch tested?
unit test added

Author: windpiger <songjun@outlook.com>

Closes #17095 from windpiger/tablepathQualified.
2017-03-09 01:18:17 -08:00
uncleGen eeb1d6db87 [SPARK-19859][SS][FOLLOW-UP] The new watermark should override the old one.
## What changes were proposed in this pull request?

A follow up to SPARK-19859:

- extract the calculation of `delayMs` and reuse it.
- update EventTimeWatermarkExec
- use the correct `delayMs` in EventTimeWatermark

## How was this patch tested?

Jenkins.

Author: uncleGen <hustyugm@gmail.com>

Closes #17221 from uncleGen/SPARK-19859.
2017-03-08 23:23:10 -08:00
Xiao Li 09829be621 [SPARK-19235][SQL][TESTS] Enable Test Cases in DDLSuite with Hive Metastore
### What changes were proposed in this pull request?
So far, the test cases in DDLSuites only verify the behaviors of InMemoryCatalog. That means, they do not cover the scenarios using HiveExternalCatalog. Thus, we need to improve the existing test suite to run these cases using Hive metastore.

When porting these test cases, a bug of `SET LOCATION` is found. `path` is not set when the location is changed.

After this PR, a few changes are made, as summarized below,
- `DDLSuite` becomes an abstract class. Both `InMemoryCatalogedDDLSuite` and `HiveCatalogedDDLSuite` extend it. `InMemoryCatalogedDDLSuite` is using `InMemoryCatalog`. `HiveCatalogedDDLSuite` is using `HiveExternalCatalog`.
- `InMemoryCatalogedDDLSuite` contains all the existing test cases in `DDLSuite`.
- `HiveCatalogedDDLSuite` contains a subset of `DDLSuite`. The following test cases are excluded:

1. The following test cases only make sense for `InMemoryCatalog`:
```
  test("desc table for parquet data source table using in-memory catalog")
  test("create a managed Hive source table") {
  test("create an external Hive source table")
  test("Create Hive Table As Select")
```

2. The following test cases are unable to be ported because we are unable to alter table provider when using Hive metastore. In the future PRs we need to improve the test cases so that altering table provider is not needed:
```
  test("alter table: set location (datasource table)")
  test("alter table: set properties (datasource table)")
  test("alter table: unset properties (datasource table)")
  test("alter table: set serde (datasource table)")
  test("alter table: set serde partition (datasource table)")
  test("alter table: change column (datasource table)")
  test("alter table: add partition (datasource table)")
  test("alter table: drop partition (datasource table)")
  test("alter table: rename partition (datasource table)")
  test("drop table - data source table")
```

**TODO** : in the future PRs, we need to remove `HiveDDLSuite` and move the test cases to either `DDLSuite`,  `InMemoryCatalogedDDLSuite` or `HiveCatalogedDDLSuite`.

### How was this patch tested?
N/A

Author: Xiao Li <gatorsmile@gmail.com>
Author: gatorsmile <gatorsmile@gmail.com>

Closes #16592 from gatorsmile/refactorDDLSuite.
2017-03-08 23:12:10 -08:00
Dilip Biswal d809ceed97 [MINOR][SQL] The analyzer rules are fired twice for cases when AnalysisException is raised from analyzer.
## What changes were proposed in this pull request?
In general we have a checkAnalysis phase which validates the logical plan and throws AnalysisException on semantic errors. However we also can throw AnalysisException from a few analyzer rules like ResolveSubquery.

I found that we fire up the analyzer rules twice for the queries that throw AnalysisException from one of the analyzer rules. This is a very minor fix. We don't have to strictly fix it. I just got confused seeing the rule getting fired two times when i was not expecting it.

## How was this patch tested?

Tested manually.

Author: Dilip Biswal <dbiswal@us.ibm.com>

Closes #17214 from dilipbiswal/analyis_twice.
2017-03-08 17:33:49 -08:00
Burak Yavuz a3648b5d4f [SPARK-19813] maxFilesPerTrigger combo latestFirst may miss old files in combination with maxFileAge in FileStreamSource
## What changes were proposed in this pull request?

**The Problem**
There is a file stream source option called maxFileAge which limits how old the files can be, relative the latest file that has been seen. This is used to limit the files that need to be remembered as "processed". Files older than the latest processed files are ignored. This values is by default 7 days.
This causes a problem when both
latestFirst = true
maxFilesPerTrigger > total files to be processed.
Here is what happens in all combinations
1) latestFirst = false - Since files are processed in order, there wont be any unprocessed file older than the latest processed file. All files will be processed.
2) latestFirst = true AND maxFilesPerTrigger is not set - The maxFileAge thresholding mechanism takes one batch initialize. If maxFilesPerTrigger is not, then all old files get processed in the first batch, and so no file is left behind.
3) latestFirst = true AND maxFilesPerTrigger is set to X - The first batch process the latest X files. That sets the threshold latest file - maxFileAge, so files older than this threshold will never be considered for processing.
The bug is with case 3.

**The Solution**

Ignore `maxFileAge` when both `maxFilesPerTrigger` and `latestFirst` are set.

## How was this patch tested?

Regression test in `FileStreamSourceSuite`

Author: Burak Yavuz <brkyvz@gmail.com>

Closes #17153 from brkyvz/maxFileAge.
2017-03-08 14:35:07 -08:00
hyukjinkwon 455129020c [SPARK-15463][SQL] Add an API to load DataFrame from Dataset[String] storing CSV
## What changes were proposed in this pull request?

This PR proposes to add an API that loads `DataFrame` from `Dataset[String]` storing csv.

It allows pre-processing before loading into CSV, which means allowing a lot of workarounds for many narrow cases, for example, as below:

- Case 1 - pre-processing

  ```scala
  val df = spark.read.text("...")
  // Pre-processing with this.
  spark.read.csv(df.as[String])
  ```

- Case 2 - use other input formats

  ```scala
  val rdd = spark.sparkContext.newAPIHadoopFile("/file.csv.lzo",
    classOf[com.hadoop.mapreduce.LzoTextInputFormat],
    classOf[org.apache.hadoop.io.LongWritable],
    classOf[org.apache.hadoop.io.Text])
  val stringRdd = rdd.map(pair => new String(pair._2.getBytes, 0, pair._2.getLength))

  spark.read.csv(stringRdd.toDS)
  ```

## How was this patch tested?

Added tests in `CSVSuite` and build with Scala 2.10.

```
./dev/change-scala-version.sh 2.10
./build/mvn -Pyarn -Phadoop-2.4 -Dscala-2.10 -DskipTests clean package
```

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #16854 from HyukjinKwon/SPARK-15463.
2017-03-08 13:43:09 -08:00
Kunal Khamar 6570cfd7ab [SPARK-19540][SQL] Add ability to clone SparkSession wherein cloned session has an identical copy of the SessionState
Forking a newSession() from SparkSession currently makes a new SparkSession that does not retain SessionState (i.e. temporary tables, SQL config, registered functions etc.) This change adds a method cloneSession() which creates a new SparkSession with a copy of the parent's SessionState.

Subsequent changes to base session are not propagated to cloned session, clone is independent after creation.
If the base is changed after clone has been created, say user registers new UDF, then the new UDF will not be available inside the clone. Same goes for configs and temp tables.

Unit tests

Author: Kunal Khamar <kkhamar@outlook.com>
Author: Shixiong Zhu <shixiong@databricks.com>

Closes #16826 from kunalkhamar/fork-sparksession.
2017-03-08 13:20:45 -08:00
Shixiong Zhu 1bf9012380 [SPARK-19858][SS] Add output mode to flatMapGroupsWithState and disallow invalid cases
## What changes were proposed in this pull request?

Add a output mode parameter to `flatMapGroupsWithState` and just define `mapGroupsWithState` as `flatMapGroupsWithState(Update)`.

`UnsupportedOperationChecker` is modified to disallow unsupported cases.

- Batch mapGroupsWithState or flatMapGroupsWithState is always allowed.
- For streaming (map/flatMap)GroupsWithState, see the following table:

| Operators  | Supported Query Output Mode |
| ------------- | ------------- |
| flatMapGroupsWithState(Update) without aggregation  | Update |
| flatMapGroupsWithState(Update) with aggregation  | None |
| flatMapGroupsWithState(Append) without aggregation  | Append |
| flatMapGroupsWithState(Append) before aggregation  | Append, Update, Complete |
| flatMapGroupsWithState(Append) after aggregation  | None |
| Multiple flatMapGroupsWithState(Append)s  | Append |
| Multiple mapGroupsWithStates  | None |
| Mxing mapGroupsWithStates  and flatMapGroupsWithStates | None |
| Other cases of multiple flatMapGroupsWithState | None |

## How was this patch tested?

The added unit tests. Here are the tests related to (map/flatMap)GroupsWithState:
```
[info] - batch plan - flatMapGroupsWithState - flatMapGroupsWithState(Append) on batch relation: supported (1 millisecond)
[info] - batch plan - flatMapGroupsWithState - multiple flatMapGroupsWithState(Append)s on batch relation: supported (0 milliseconds)
[info] - batch plan - flatMapGroupsWithState - flatMapGroupsWithState(Update) on batch relation: supported (0 milliseconds)
[info] - batch plan - flatMapGroupsWithState - multiple flatMapGroupsWithState(Update)s on batch relation: supported (0 milliseconds)
[info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Update) on streaming relation without aggregation in update mode: supported (2 milliseconds)
[info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Update) on streaming relation without aggregation in append mode: not supported (7 milliseconds)
[info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Update) on streaming relation without aggregation in complete mode: not supported (5 milliseconds)
[info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Update) on streaming relation with aggregation in Append mode: not supported (11 milliseconds)
[info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Update) on streaming relation with aggregation in Update mode: not supported (5 milliseconds)
[info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Update) on streaming relation with aggregation in Complete mode: not supported (5 milliseconds)
[info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Append) on streaming relation without aggregation in append mode: supported (1 millisecond)
[info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Append) on streaming relation without aggregation in update mode: not supported (6 milliseconds)
[info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Append) on streaming relation before aggregation in Append mode: supported (1 millisecond)
[info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Append) on streaming relation before aggregation in Update mode: supported (0 milliseconds)
[info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Append) on streaming relation before aggregation in Complete mode: supported (1 millisecond)
[info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Append) on streaming relation after aggregation in Append mode: not supported (6 milliseconds)
[info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Append) on streaming relation after aggregation in Update mode: not supported (4 milliseconds)
[info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Update) on streaming relation in complete mode: not supported (2 milliseconds)
[info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Append) on batch relation inside streaming relation in Append output mode: supported (1 millisecond)
[info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Append) on batch relation inside streaming relation in Update output mode: supported (1 millisecond)
[info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Update) on batch relation inside streaming relation in Append output mode: supported (0 milliseconds)
[info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Update) on batch relation inside streaming relation in Update output mode: supported (0 milliseconds)
[info] - streaming plan - flatMapGroupsWithState - multiple flatMapGroupsWithStates on streaming relation and all are in append mode: supported (2 milliseconds)
[info] - streaming plan - flatMapGroupsWithState -  multiple flatMapGroupsWithStates on s streaming relation but some are not in append mode: not supported (7 milliseconds)
[info] - streaming plan - mapGroupsWithState - mapGroupsWithState on streaming relation without aggregation in append mode: not supported (3 milliseconds)
[info] - streaming plan - mapGroupsWithState - mapGroupsWithState on streaming relation without aggregation in complete mode: not supported (3 milliseconds)
[info] - streaming plan - mapGroupsWithState - mapGroupsWithState on streaming relation with aggregation in Append mode: not supported (6 milliseconds)
[info] - streaming plan - mapGroupsWithState - mapGroupsWithState on streaming relation with aggregation in Update mode: not supported (3 milliseconds)
[info] - streaming plan - mapGroupsWithState - mapGroupsWithState on streaming relation with aggregation in Complete mode: not supported (4 milliseconds)
[info] - streaming plan - mapGroupsWithState - multiple mapGroupsWithStates on streaming relation and all are in append mode: not supported (4 milliseconds)
[info] - streaming plan - mapGroupsWithState - mixing mapGroupsWithStates and flatMapGroupsWithStates on streaming relation: not supported (4 milliseconds)
```

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #17197 from zsxwing/mapgroups-check.
2017-03-08 13:18:07 -08:00
Wojtek Szymanski e9e2c612d5 [SPARK-19727][SQL] Fix for round function that modifies original column
## What changes were proposed in this pull request?

Fix for SQL round function that modifies original column when underlying data frame is created from a local product.

    import org.apache.spark.sql.functions._

    case class NumericRow(value: BigDecimal)

    val df = spark.createDataFrame(Seq(NumericRow(BigDecimal("1.23456789"))))

    df.show()
    +--------------------+
    |               value|
    +--------------------+
    |1.234567890000000000|
    +--------------------+

    df.withColumn("value_rounded", round('value)).show()

    // before
    +--------------------+-------------+
    |               value|value_rounded|
    +--------------------+-------------+
    |1.000000000000000000|            1|
    +--------------------+-------------+

    // after
    +--------------------+-------------+
    |               value|value_rounded|
    +--------------------+-------------+
    |1.234567890000000000|            1|
    +--------------------+-------------+

## How was this patch tested?

New unit test added to existing suite `org.apache.spark.sql.MathFunctionsSuite`

Author: Wojtek Szymanski <wk.szymanski@gmail.com>

Closes #17075 from wojtek-szymanski/SPARK-19727.
2017-03-08 12:36:16 -08:00
windpiger f3387d9748 [SPARK-19864][SQL][TEST] provide a makeQualifiedPath functions to optimize some code
## What changes were proposed in this pull request?

Currently there are lots of places to make the path qualified, it is better to provide a function to do this, then the code will be more simple.

## How was this patch tested?
N/A

Author: windpiger <songjun@outlook.com>

Closes #17204 from windpiger/addQualifiledPathUtil.
2017-03-08 10:48:53 -08:00
Xiao Li 9a6ac7226f [SPARK-19601][SQL] Fix CollapseRepartition rule to preserve shuffle-enabled Repartition
### What changes were proposed in this pull request?

Observed by felixcheung  in https://github.com/apache/spark/pull/16739, when users use the shuffle-enabled `repartition` API, they expect the partition they got should be the exact number they provided, even if they call shuffle-disabled `coalesce` later.

Currently, `CollapseRepartition` rule does not consider whether shuffle is enabled or not. Thus, we got the following unexpected result.

```Scala
    val df = spark.range(0, 10000, 1, 5)
    val df2 = df.repartition(10)
    assert(df2.coalesce(13).rdd.getNumPartitions == 5)
    assert(df2.coalesce(7).rdd.getNumPartitions == 5)
    assert(df2.coalesce(3).rdd.getNumPartitions == 3)
```

This PR is to fix the issue. We preserve shuffle-enabled Repartition.

### How was this patch tested?
Added a test case

Author: Xiao Li <gatorsmile@gmail.com>

Closes #16933 from gatorsmile/CollapseRepartition.
2017-03-08 09:36:01 -08:00
jiangxingbo 5f7d835d38 [SPARK-19865][SQL] remove the view identifier in SubqueryAlias
## What changes were proposed in this pull request?

Since we have a `View` node now, we can remove the view identifier in `SubqueryAlias`, which was used to indicate a view node before.

## How was this patch tested?

Update the related test cases.

Author: jiangxingbo <jiangxb1987@gmail.com>

Closes #17210 from jiangxb1987/SubqueryAlias.
2017-03-08 16:18:17 +01:00
wangzhenhua e44274870d [SPARK-17080][SQL] join reorder
## What changes were proposed in this pull request?

Reorder the joins using a dynamic programming algorithm (Selinger paper):
First we put all items (basic joined nodes) into level 1, then we build all two-way joins at level 2 from plans at level 1 (single items), then build all 3-way joins from plans at previous levels (two-way joins and single items), then 4-way joins ... etc, until we build all n-way joins and pick the best plan among them.

When building m-way joins, we only keep the best plan (with the lowest cost) for the same set of m items. E.g., for 3-way joins, we keep only the best plan for items {A, B, C} among plans (A J B) J C, (A J C) J B and (B J C) J A. Thus, the plans maintained for each level when reordering four items A, B, C, D are as follows:
```
level 1: p({A}), p({B}), p({C}), p({D})
level 2: p({A, B}), p({A, C}), p({A, D}), p({B, C}), p({B, D}), p({C, D})
level 3: p({A, B, C}), p({A, B, D}), p({A, C, D}), p({B, C, D})
level 4: p({A, B, C, D})
```
where p({A, B, C, D}) is the final output plan.

For cost evaluation, since physical costs for operators are not available currently, we use cardinalities and sizes to compute costs.

## How was this patch tested?
add test cases

Author: wangzhenhua <wangzhenhua@huawei.com>
Author: Zhenhua Wang <wzh_zju@163.com>

Closes #17138 from wzhfy/joinReorder.
2017-03-08 16:01:28 +01:00
Yuming Wang 3f9f9180c2 [SPARK-19693][SQL] Make the SET mapreduce.job.reduces automatically converted to spark.sql.shuffle.partitions
## What changes were proposed in this pull request?
Make the `SET mapreduce.job.reduces` automatically converted to `spark.sql.shuffle.partitions`, it's similar to `SET mapred.reduce.tasks`.

## How was this patch tested?

unit tests

Author: Yuming Wang <wgyumg@gmail.com>

Closes #17020 from wangyum/SPARK-19693.
2017-03-08 11:31:01 +00:00
Shixiong Zhu d8830c5039 [SPARK-19859][SS] The new watermark should override the old one
## What changes were proposed in this pull request?

The new watermark should override the old one. Otherwise, we just pick up the first column which has a watermark, it may be unexpected.

## How was this patch tested?

The new test.

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #17199 from zsxwing/SPARK-19859.
2017-03-07 20:34:55 -08:00
Shixiong Zhu ca849ac4e8 [SPARK-19841][SS] watermarkPredicate should filter based on keys
## What changes were proposed in this pull request?

`StreamingDeduplicateExec.watermarkPredicate` should filter based on keys. Otherwise, it may generate a wrong answer if the watermark column in `keyExpression` has a different position in the row.

`StateStoreSaveExec` has the same codes but its parent can makes sure the watermark column positions in `keyExpression` and `row` are the same.

## How was this patch tested?

The added test.

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #17183 from zsxwing/SPARK-19841.
2017-03-07 20:32:51 -08:00
jiangxingbo b9783a92f7 [SPARK-18389][SQL] Disallow cyclic view reference
## What changes were proposed in this pull request?

Disallow cyclic view references, a cyclic view reference may be created by the following queries:
```
CREATE VIEW testView AS SELECT id FROM tbl
CREATE VIEW testView2 AS SELECT id FROM testView
ALTER VIEW testView AS SELECT * FROM testView2
```
In the above example, a reference cycle (testView -> testView2 -> testView) exsits.

We disallow cyclic view references by checking that in ALTER VIEW command, when the `analyzedPlan` contains the same `View` node with the altered view, we should prevent the behavior and throw an AnalysisException.

## How was this patch tested?

Test by `SQLViewSuite.test("correctly handle a cyclic view reference")`.

Author: jiangxingbo <jiangxb1987@gmail.com>

Closes #17152 from jiangxb1987/cyclic-view.
2017-03-07 20:25:38 -08:00
Wenchen Fan c05baabf10 [SPARK-19765][SPARK-18549][SQL] UNCACHE TABLE should un-cache all cached plans that refer to this table
## What changes were proposed in this pull request?

When un-cache a table, we should not only remove the cache entry for this table, but also un-cache any other cached plans that refer to this table.

This PR also includes some refactors:

1. use `java.util.LinkedList` to store the cache entries, so that it's safer to remove elements while iterating
2. rename `invalidateCache` to `recacheByPlan`, which is more obvious about what it does.

## How was this patch tested?

new regression test

Author: Wenchen Fan <wenchen@databricks.com>

Closes #17097 from cloud-fan/cache.
2017-03-07 09:21:58 -08:00
Takeshi Yamamuro 030acdd1f0 [SPARK-19637][SQL] Add to_json in FunctionRegistry
## What changes were proposed in this pull request?
This pr added entries  in `FunctionRegistry` and supported `to_json` in SQL.

## How was this patch tested?
Added tests in `JsonFunctionsSuite`.

Author: Takeshi Yamamuro <yamamuro@apache.org>

Closes #16981 from maropu/SPARK-19637.
2017-03-07 09:00:14 -08:00
windpiger e52499ea9c [SPARK-19832][SQL] DynamicPartitionWriteTask get partitionPath should escape the partition name
## What changes were proposed in this pull request?

Currently in DynamicPartitionWriteTask, when we get the paritionPath of a parition, we just escape the partition value, not escape the partition name.

this will cause some problems for some  special partition name situation, for example :
1) if the partition name contains '%' etc,  there will be two partition path created in the filesytem, one is for escaped path like '/path/a%25b=1', another is for unescaped path like '/path/a%b=1'.
and the data inserted stored in unescaped path, while the show partitions table will return 'a%25b=1' which the partition name is escaped. So here it is not consist. And I think the data should be stored in the escaped path in filesystem, which Hive2.0.0 also have the same action.

2) if the partition name contains ':', there will throw exception that new Path("/path","a:b"), this is illegal which has a colon in the relative path.

```
java.lang.IllegalArgumentException: java.net.URISyntaxException: Relative path in absolute URI: a:b
  at org.apache.hadoop.fs.Path.initialize(Path.java:205)
  at org.apache.hadoop.fs.Path.<init>(Path.java:171)
  at org.apache.hadoop.fs.Path.<init>(Path.java:88)
  ... 48 elided
Caused by: java.net.URISyntaxException: Relative path in absolute URI: a:b
  at java.net.URI.checkPath(URI.java:1823)
  at java.net.URI.<init>(URI.java:745)
  at org.apache.hadoop.fs.Path.initialize(Path.java:202)
  ... 50 more
```
## How was this patch tested?
unit test added

Author: windpiger <songjun@outlook.com>

Closes #17173 from windpiger/fixDatasourceSpecialCharPartitionName.
2017-03-06 22:36:43 -08:00
wangzhenhua 9909f6d361 [SPARK-19350][SQL] Cardinality estimation of Limit and Sample
## What changes were proposed in this pull request?

Before this pr, LocalLimit/GlobalLimit/Sample propagates the same row count and column stats from its child, which is incorrect.
We can get the correct rowCount in Statistics for GlobalLimit/Sample whether cbo is enabled or not.
We don't know the rowCount for LocalLimit because we don't know the partition number at that time. Column stats should not be propagated because we don't know the distribution of columns after Limit or Sample.

## How was this patch tested?

Added test cases.

Author: wangzhenhua <wangzhenhua@huawei.com>

Closes #16696 from wzhfy/limitEstimation.
2017-03-06 21:45:36 -08:00
Wojtek Szymanski f6471dc0d5 [SPARK-19709][SQL] Read empty file with CSV data source
## What changes were proposed in this pull request?

Bugfix for reading empty file with CSV data source. Instead of throwing `NoSuchElementException`, an empty data frame is returned.

## How was this patch tested?

Added new unit test in `org.apache.spark.sql.execution.datasources.csv.CSVSuite`

Author: Wojtek Szymanski <wk.szymanski@gmail.com>

Closes #17068 from wojtek-szymanski/SPARK-19709.
2017-03-06 13:19:36 -08:00
jiangxingbo 9991c2dad6 [SPARK-19211][SQL] Explicitly prevent Insert into View or Create View As Insert
## What changes were proposed in this pull request?

Currently we don't explicitly forbid the following behaviors:
1. The statement CREATE VIEW AS INSERT INTO throws the following exception:
```
scala> spark.sql("CREATE VIEW testView AS INSERT INTO tab VALUES (1, \"a\")")
org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: org.apache.hadoop.hive.ql.metadata.HiveException: at least one column must be specified for the table;
 scala> spark.sql("CREATE VIEW testView(a, b) AS INSERT INTO tab VALUES (1, \"a\")")
org.apache.spark.sql.AnalysisException: The number of columns produced by the SELECT clause (num: `0`) does not match the number of column names specified by CREATE VIEW (num: `2`).;
```

2. The statement INSERT INTO view VALUES throws the following exception from checkAnalysis:
```
scala> spark.sql("INSERT INTO testView VALUES (1, \"a\")")
org.apache.spark.sql.AnalysisException: Inserting into an RDD-based table is not allowed.;;
'InsertIntoTable View (`default`.`testView`, [a#16,b#17]), false, false
+- LocalRelation [col1#14, col2#15]
```

After this PR, the behavior changes to:
```
scala> spark.sql("CREATE VIEW testView AS INSERT INTO tab VALUES (1, \"a\")")
org.apache.spark.sql.catalyst.parser.ParseException: Operation not allowed: CREATE VIEW ... AS INSERT INTO;

scala> spark.sql("CREATE VIEW testView(a, b) AS INSERT INTO tab VALUES (1, \"a\")")
org.apache.spark.sql.catalyst.parser.ParseException: Operation not allowed: CREATE VIEW ... AS INSERT INTO;

scala> spark.sql("INSERT INTO testView VALUES (1, \"a\")")
org.apache.spark.sql.AnalysisException: `default`.`testView` is a view, inserting into a view is not allowed;
```

## How was this patch tested?

Add a new test case in `SparkSqlParserSuite`;
Update the corresponding test case in `SQLViewSuite`.

Author: jiangxingbo <jiangxb1987@gmail.com>

Closes #17125 from jiangxb1987/insert-with-view.
2017-03-06 12:35:03 -08:00
windpiger 096df6d933 [SPARK-19257][SQL] location for table/partition/database should be java.net.URI
## What changes were proposed in this pull request?

Currently we treat the location of table/partition/database as URI string.

It will be safer if we can make the type of location as java.net.URI.

In this PR, there are following classes changes:
**1. CatalogDatabase**
```
case class CatalogDatabase(
    name: String,
    description: String,
    locationUri: String,
    properties: Map[String, String])
--->
case class CatalogDatabase(
    name: String,
    description: String,
    locationUri: URI,
    properties: Map[String, String])
```
**2. CatalogStorageFormat**
```
case class CatalogStorageFormat(
    locationUri: Option[String],
    inputFormat: Option[String],
    outputFormat: Option[String],
    serde: Option[String],
    compressed: Boolean,
    properties: Map[String, String])
---->
case class CatalogStorageFormat(
    locationUri: Option[URI],
    inputFormat: Option[String],
    outputFormat: Option[String],
    serde: Option[String],
    compressed: Boolean,
    properties: Map[String, String])
```

Before and After this PR, it is transparent for user, there is no change that the user should concern. The `String` to `URI` just happened in SparkSQL internally.

Here list some operation related location:
**1. whitespace in the location**
   e.g.  `/a/b c/d`
   For both table location and partition location,
   After `CREATE TABLE  t... (PARTITIONED BY ...) LOCATION '/a/b c/d'` ,
   then `DESC EXTENDED t ` show the location is `/a/b c/d`,
   and the real path in the FileSystem also show `/a/b c/d`

**2. colon(:) in the location**
   e.g.  `/a/b:c/d`
   For both table location and partition location,
   when `CREATE TABLE  t... (PARTITIONED BY ...)  LOCATION '/a/b:c/d'` ,

  **In linux file system**
   `DESC EXTENDED t ` show the location is `/a/b:c/d`,
   and the real path in the FileSystem also show `/a/b:c/d`

  **in HDFS** throw exception:
  `java.lang.IllegalArgumentException: Pathname /a/b:c/d from hdfs://iZbp1151s8hbnnwriekxdeZ:9000/a/b:c/d is not a valid DFS filename.`

  **while** After `INSERT INTO TABLE t PARTITION(a="a:b") SELECT 1`
   then `DESC EXTENDED t ` show the location is `/xxx/a=a%3Ab`,
   and the real path in the FileSystem also show `/xxx/a=a%3Ab`

**3. percent sign(%) in the location**
   e.g.  `/a/b%c/d`
   For both table location and partition location,
   After `CREATE TABLE  t... (PARTITIONED BY ...) LOCATION '/a/b%c/d'` ,
   then `DESC EXTENDED t ` show the location is `/a/b%c/d`,
   and the real path in the FileSystem also show `/a/b%c/d`

**4. encoded(%25) in the location**
   e.g.  `/a/b%25c/d`
   For both table location and partition location,
   After `CREATE TABLE  t... (PARTITIONED BY ...)  LOCATION '/a/b%25c/d'` ,
   then `DESC EXTENDED t ` show the location is `/a/b%25c/d`,
   and the real path in the FileSystem also show `/a/b%25c/d`

   **while** After `INSERT INTO TABLE t PARTITION(a="%25") SELECT 1`
   then `DESC EXTENDED t ` show the location is `/xxx/a=%2525`,
   and the real path in the FileSystem also show `/xxx/a=%2525`

**Additionally**, except the location, there are two other factors will affect the location of the table/partition. one is the table name which does not allowed to have special characters, and the  other is `partition name` which have the same actions with `partition value`, and `partition name` with special character situation has add some testcase and resolve a bug in [PR](https://github.com/apache/spark/pull/17173)

### Summary:
After `CREATE TABLE  t... (PARTITIONED BY ...)  LOCATION path`,
the path which we get from `DESC TABLE` and `real path in FileSystem` are all the same with the `CREATE TABLE` command(different filesystem has different action that allow what kind of special character to create the path, e.g. HDFS does not allow colon, but linux filesystem allow it ).

`DataBase` also have the same logic with `CREATE TABLE`

while if the `partition value` has some special character like `%` `:` `#` etc, then we will get the path with encoded `partition value` like `/xxx/a=A%25B` from `DESC TABLE` and `real path in FileSystem`

In this PR, the core change code is using `new Path(str).toUri` and `new Path(uri).toString`
which transfrom `str to uri `or `uri to str`.
for example:
```
val str = '/a/b c/d'
val uri = new Path(str).toUri  --> '/a/b%20c/d'
val strFromUri = new Path(uri).toString -> '/a/b c/d'
```

when we restore table/partition from metastore, or get the location from `CREATE TABLE` command, we can use it as above to change string to uri `new Path(str).toUri `

## How was this patch tested?
unit test added.
The `current master branch` also `passed all the test cases` added in this PR by a litter change.
https://github.com/apache/spark/pull/17149/files#diff-b7094baa12601424a5d19cb930e3402fR1764
here `toURI` -> `toString` when test in master branch.

This can show that this PR  is transparent for user.

Author: windpiger <songjun@outlook.com>

Closes #17149 from windpiger/changeStringToURI.
2017-03-06 10:44:26 -08:00
hyukjinkwon 369a148e59 [SPARK-19595][SQL] Support json array in from_json
## What changes were proposed in this pull request?

This PR proposes to both,

**Do not allow json arrays with multiple elements and return null in `from_json` with `StructType` as the schema.**

Currently, it only reads the single row when the input is a json array. So, the codes below:

```scala
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
val schema = StructType(StructField("a", IntegerType) :: Nil)
Seq(("""[{"a": 1}, {"a": 2}]""")).toDF("struct").select(from_json(col("struct"), schema)).show()
```
prints

```
+--------------------+
|jsontostruct(struct)|
+--------------------+
|                 [1]|
+--------------------+
```

This PR simply suggests to print this as `null` if the schema is `StructType` and input is json array.with multiple elements

```
+--------------------+
|jsontostruct(struct)|
+--------------------+
|                null|
+--------------------+
```

**Support json arrays in `from_json` with `ArrayType` as the schema.**

```scala
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
val schema = ArrayType(StructType(StructField("a", IntegerType) :: Nil))
Seq(("""[{"a": 1}, {"a": 2}]""")).toDF("array").select(from_json(col("array"), schema)).show()
```

prints

```
+-------------------+
|jsontostruct(array)|
+-------------------+
|         [[1], [2]]|
+-------------------+
```

## How was this patch tested?

Unit test in `JsonExpressionsSuite`, `JsonFunctionsSuite`, Python doctests and manual test.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #16929 from HyukjinKwon/disallow-array.
2017-03-05 14:35:06 -08:00
Takeshi Yamamuro 14bb398fae [SPARK-19254][SQL] Support Seq, Map, and Struct in functions.lit
## What changes were proposed in this pull request?
This pr is to support Seq, Map, and Struct in functions.lit; it adds a new IF named `lit2` with `TypeTag` for avoiding type erasure.

## How was this patch tested?
Added tests in `LiteralExpressionSuite`

Author: Takeshi Yamamuro <yamamuro@apache.org>
Author: Takeshi YAMAMURO <linguin.m.s@gmail.com>

Closes #16610 from maropu/SPARK-19254.
2017-03-05 03:53:19 -08:00
uncleGen f48461ab2b [SPARK-19805][TEST] Log the row type when query result dose not match
## What changes were proposed in this pull request?

improve the log message when query result does not match.

before pr:

```
== Results ==
!== Correct Answer - 3 ==   == Spark Answer - 3 ==
 [1]                        [1]
 [2]                        [2]
 [3]                        [3]

```

after pr:

~~== Results ==
!== Correct Answer - 3 ==   == Spark Answer - 3 ==
!RowType[string]            RowType[integer]
 [1]                        [1]
 [2]                        [2]
 [3]                        [3]~~

```
== Results ==
!== Correct Answer - 3 ==   == Spark Answer - 3 ==
!struct<value:string>       struct<value:int>
 [1]                        [1]
 [2]                        [2]
 [3]                        [3]
```

## How was this patch tested?

Jenkins

Author: uncleGen <hustyugm@gmail.com>

Closes #17145 from uncleGen/improve-test-result.
2017-03-05 03:35:42 -08:00
Shixiong Zhu fbc4058037 [SPARK-19816][SQL][TESTS] Fix an issue that DataFrameCallbackSuite doesn't recover the log level
## What changes were proposed in this pull request?

"DataFrameCallbackSuite.execute callback functions when a DataFrame action failed" sets the log level to "fatal" but doesn't recover it. Hence, tests running after it won't output any logs except fatal logs.

This PR uses `testQuietly` instead to avoid changing the log level.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #17156 from zsxwing/SPARK-19816.
2017-03-03 19:00:35 -08:00
Shixiong Zhu a6a7a95e2f [SPARK-19718][SS] Handle more interrupt cases properly for Hadoop
## What changes were proposed in this pull request?

[SPARK-19617](https://issues.apache.org/jira/browse/SPARK-19617) changed `HDFSMetadataLog` to enable interrupts when using the local file system. However, now we hit [HADOOP-12074](https://issues.apache.org/jira/browse/HADOOP-12074): `Shell.runCommand` converts `InterruptedException` to `new IOException(ie.toString())` before Hadoop 2.8. This is the Hadoop patch to fix HADOOP-1207: 95c73d49b1

This PR adds new logic to handle the following cases related to `InterruptedException`.
- Check if the message of IOException starts with `java.lang.InterruptedException`. If so, treat it as `InterruptedException`. This is for pre-Hadoop 2.8.
- Treat `InterruptedIOException` as `InterruptedException`. This is for Hadoop 2.8+ and other places that may throw `InterruptedIOException` when the thread is interrupted.

## How was this patch tested?

The new unit test.

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #17044 from zsxwing/SPARK-19718.
2017-03-03 17:10:11 -08:00
Takuya UESHIN 2a7921a813 [SPARK-18939][SQL] Timezone support in partition values.
## What changes were proposed in this pull request?

This is a follow-up pr of #16308 and #16750.

This pr enables timezone support in partition values.

We should use `timeZone` option introduced at #16750 to parse/format partition values of the `TimestampType`.

For example, if you have timestamp `"2016-01-01 00:00:00"` in `GMT` which will be used for partition values, the values written by the default timezone option, which is `"GMT"` because the session local timezone is `"GMT"` here, are:

```scala
scala> spark.conf.set("spark.sql.session.timeZone", "GMT")

scala> val df = Seq((1, new java.sql.Timestamp(1451606400000L))).toDF("i", "ts")
df: org.apache.spark.sql.DataFrame = [i: int, ts: timestamp]

scala> df.show()
+---+-------------------+
|  i|                 ts|
+---+-------------------+
|  1|2016-01-01 00:00:00|
+---+-------------------+

scala> df.write.partitionBy("ts").save("/path/to/gmtpartition")
```

```sh
$ ls /path/to/gmtpartition/
_SUCCESS			ts=2016-01-01 00%3A00%3A00
```

whereas setting the option to `"PST"`, they are:

```scala
scala> df.write.option("timeZone", "PST").partitionBy("ts").save("/path/to/pstpartition")
```

```sh
$ ls /path/to/pstpartition/
_SUCCESS			ts=2015-12-31 16%3A00%3A00
```

We can properly read the partition values if the session local timezone and the timezone of the partition values are the same:

```scala
scala> spark.read.load("/path/to/gmtpartition").show()
+---+-------------------+
|  i|                 ts|
+---+-------------------+
|  1|2016-01-01 00:00:00|
+---+-------------------+
```

And even if the timezones are different, we can properly read the values with setting corrent timezone option:

```scala
// wrong result
scala> spark.read.load("/path/to/pstpartition").show()
+---+-------------------+
|  i|                 ts|
+---+-------------------+
|  1|2015-12-31 16:00:00|
+---+-------------------+

// correct result
scala> spark.read.option("timeZone", "PST").load("/path/to/pstpartition").show()
+---+-------------------+
|  i|                 ts|
+---+-------------------+
|  1|2016-01-01 00:00:00|
+---+-------------------+
```

## How was this patch tested?

Existing tests and added some tests.

Author: Takuya UESHIN <ueshin@happy-camper.st>

Closes #17053 from ueshin/issues/SPARK-18939.
2017-03-03 16:35:54 -08:00
Burak Yavuz 9314c08377 [SPARK-19774] StreamExecution should call stop() on sources when a stream fails
## What changes were proposed in this pull request?

We call stop() on a Structured Streaming Source only when the stream is shutdown when a user calls streamingQuery.stop(). We should actually stop all sources when the stream fails as well, otherwise we may leak resources, e.g. connections to Kafka.

## How was this patch tested?

Unit tests in `StreamingQuerySuite`.

Author: Burak Yavuz <brkyvz@gmail.com>

Closes #17107 from brkyvz/close-source.
2017-03-03 10:35:15 -08:00
Pete Robbins 37a1c0e461 [SPARK-19710][SQL][TESTS] Fix ordering of rows in query results
## What changes were proposed in this pull request?
Changes to SQLQueryTests to make the order of the results constant.
Where possible ORDER BY has been added to match the existing expected output

## How was this patch tested?
Test runs on x86, zLinux (big endian), ppc (big endian)

Author: Pete Robbins <robbinspg@gmail.com>

Closes #17039 from robbinspg/SPARK-19710.
2017-03-03 07:53:46 -08:00
Liang-Chi Hsieh 98bcc188f9 [SPARK-19758][SQL] Resolving timezone aware expressions with time zone when resolving inline table
## What changes were proposed in this pull request?

When we resolve inline tables in analyzer, we will evaluate the expressions of inline tables.

When it evaluates a `TimeZoneAwareExpression` expression, an error will happen because the `TimeZoneAwareExpression` is not associated with timezone yet.

So we need to resolve these `TimeZoneAwareExpression`s with time zone when resolving inline tables.

## How was this patch tested?

Jenkins tests.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #17114 from viirya/resolve-timeawareexpr-inline-table.
2017-03-03 07:14:37 -08:00
hyukjinkwon d556b31703 [SPARK-18699][SQL][FOLLOWUP] Add explanation in CSV parser and minor cleanup
## What changes were proposed in this pull request?

This PR suggests adding some comments in `UnivocityParser` logics to explain what happens. Also, it proposes, IMHO, a little bit cleaner (at least easy for me to explain).

## How was this patch tested?

Unit tests in `CSVSuite`.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #17142 from HyukjinKwon/SPARK-18699.
2017-03-03 00:50:58 -08:00
windpiger 982f3223b4 [SPARK-18726][SQL] resolveRelation for FileFormat DataSource don't need to listFiles twice
## What changes were proposed in this pull request?

Currently when we resolveRelation for a `FileFormat DataSource` without providing user schema, it will execute `listFiles`  twice in `InMemoryFileIndex` during `resolveRelation`.

This PR add a `FileStatusCache` for DataSource, this can avoid listFiles twice.

But there is a bug in `InMemoryFileIndex` see:
 [SPARK-19748](https://github.com/apache/spark/pull/17079)
 [SPARK-19761](https://github.com/apache/spark/pull/17093),
so this pr should be after SPARK-19748/ SPARK-19761.

## How was this patch tested?
unit test added

Author: windpiger <songjun@outlook.com>

Closes #17081 from windpiger/resolveDataSourceScanFilesTwice.
2017-03-02 23:54:01 -08:00
guifeng e24f21b5f8 [SPARK-19779][SS] Delete needless tmp file after restart structured streaming job
## What changes were proposed in this pull request?

[SPARK-19779](https://issues.apache.org/jira/browse/SPARK-19779)

The PR (https://github.com/apache/spark/pull/17012) can to fix restart a Structured Streaming application using hdfs as fileSystem, but also exist a problem that a tmp file of delta file is still reserved in hdfs. And Structured Streaming don't delete the tmp file generated when restart streaming job in future.

## How was this patch tested?
 unit tests

Author: guifeng <guifengleaf@gmail.com>

Closes #17124 from gf53520/SPARK-19779.
2017-03-02 21:19:29 -08:00
Sunitha Kambhampati f37bb14302 [SPARK-19602][SQL][TESTS] Add tests for qualified column names
## What changes were proposed in this pull request?
- Add tests covering different scenarios with qualified column names
- Please see Section 2 in the design doc for the various test scenarios [here](https://issues.apache.org/jira/secure/attachment/12854681/Design_ColResolution_JIRA19602.pdf)
- As part of SPARK-19602, changes are made to support three part column name. In order to aid in the review and to reduce the diff, the test scenarios are separated out into this PR.

## How was this patch tested?
- This is a **test only** change. The individual test suites were run successfully.

Author: Sunitha Kambhampati <skambha@us.ibm.com>

Closes #17067 from skambha/colResolutionTests.
2017-03-02 21:19:22 -08:00
Felix Cheung 8d6ef895ee [SPARK-18352][DOCS] wholeFile JSON update doc and programming guide
## What changes were proposed in this pull request?

Update doc for R, programming guide. Clarify default behavior for all languages.

## How was this patch tested?

manually

Author: Felix Cheung <felixcheung_m@hotmail.com>

Closes #17128 from felixcheung/jsonwholefiledoc.
2017-03-02 01:02:38 -08:00
windpiger de2b53df4c [SPARK-19583][SQL] CTAS for data source table with a created location should succeed
## What changes were proposed in this pull request?

```
  spark.sql(
          s"""
             |CREATE TABLE t
             |USING parquet
             |PARTITIONED BY(a, b)
             |LOCATION '$dir'
             |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
           """.stripMargin)
```

Failed with the error message:
```
path file:/private/var/folders/6r/15tqm8hn3ldb3rmbfqm1gf4c0000gn/T/spark-195cd513-428a-4df9-b196-87db0c73e772 already exists.;
org.apache.spark.sql.AnalysisException: path file:/private/var/folders/6r/15tqm8hn3ldb3rmbfqm1gf4c0000gn/T/spark-195cd513-428a-4df9-b196-87db0c73e772 already exists.;
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:102)
```
while hive table is ok ,so we should fix it for datasource table.

The reason is that the SaveMode check is put in  `InsertIntoHadoopFsRelationCommand` , and the SaveMode check actually use `path`, this is fine when we use `DataFrameWriter.save()`, because this situation of SaveMode act on `path`.

While when we use  `CreateDataSourceAsSelectCommand`, the situation of SaveMode act on table, and
we have already do SaveMode check in `CreateDataSourceAsSelectCommand` for table , so we should not do SaveMode check in the following logic in `InsertIntoHadoopFsRelationCommand` for path, this is redundant and wrong logic for `CreateDataSourceAsSelectCommand`

After this PR, the following DDL will succeed, when the location has been created we will append it or overwrite it.
```
CREATE TABLE ... (PARTITIONED BY ...) LOCATION path AS SELECT ...
```

## How was this patch tested?
unit test added

Author: windpiger <songjun@outlook.com>

Closes #16938 from windpiger/CTASDataSourceWitLocation.
2017-03-01 22:50:25 -08:00
windpiger 8aa560b75e [SPARK-19761][SQL] create InMemoryFileIndex with an empty rootPaths when set PARALLEL_PARTITION_DISCOVERY_THRESHOLD to zero failed
## What changes were proposed in this pull request?

If we create a InMemoryFileIndex with an empty rootPaths when set PARALLEL_PARTITION_DISCOVERY_THRESHOLD to zero, it will throw an  exception:

```
Positive number of slices required
java.lang.IllegalArgumentException: Positive number of slices required
        at org.apache.spark.rdd.ParallelCollectionRDD$.slice(ParallelCollectionRDD.scala:119)
        at org.apache.spark.rdd.ParallelCollectionRDD.getPartitions(ParallelCollectionRDD.scala:97)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2084)
        at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
        at org.apache.spark.rdd.RDD.collect(RDD.scala:935)
        at org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex$.org$apache$spark$sql$execution$datasources$PartitioningAwareFileIndex$$bulkListLeafFiles(PartitioningAwareFileIndex.scala:357)
        at org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.listLeafFiles(PartitioningAwareFileIndex.scala:256)
        at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.refresh0(InMemoryFileIndex.scala:74)
        at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.<init>(InMemoryFileIndex.scala:50)
        at org.apache.spark.sql.execution.datasources.FileIndexSuite$$anonfun$9$$anonfun$apply$mcV$sp$2.apply$mcV$sp(FileIndexSuite.scala:186)
        at org.apache.spark.sql.test.SQLTestUtils$class.withSQLConf(SQLTestUtils.scala:105)
        at org.apache.spark.sql.execution.datasources.FileIndexSuite.withSQLConf(FileIndexSuite.scala:33)
        at org.apache.spark.sql.execution.datasources.FileIndexSuite$$anonfun$9.apply$mcV$sp(FileIndexSuite.scala:185)
        at org.apache.spark.sql.execution.datasources.FileIndexSuite$$anonfun$9.apply(FileIndexSuite.scala:185)
        at org.apache.spark.sql.execution.datasources.FileIndexSuite$$anonfun$9.apply(FileIndexSuite.scala:185)
        at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
        at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
```

## How was this patch tested?
unit test added

Author: windpiger <songjun@outlook.com>

Closes #17093 from windpiger/fixEmptiPathInBulkListFiles.
2017-03-01 08:16:29 -08:00
Stan Zhai 5502a9cf88 [SPARK-19766][SQL] Constant alias columns in INNER JOIN should not be folded by FoldablePropagation rule
## What changes were proposed in this pull request?
This PR fixes the code in Optimizer phase where the constant alias columns of a `INNER JOIN` query are folded in Rule `FoldablePropagation`.

For the following query():

```
val sqlA =
  """
    |create temporary view ta as
    |select a, 'a' as tag from t1 union all
    |select a, 'b' as tag from t2
  """.stripMargin

val sqlB =
  """
    |create temporary view tb as
    |select a, 'a' as tag from t3 union all
    |select a, 'b' as tag from t4
  """.stripMargin

val sql =
  """
    |select tb.* from ta inner join tb on
    |ta.a = tb.a and
    |ta.tag = tb.tag
  """.stripMargin
```

The tag column is an constant alias column, it's folded by `FoldablePropagation` like this:

```
TRACE SparkOptimizer:
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.FoldablePropagation ===
 Project [a#4, tag#14]                              Project [a#4, tag#14]
!+- Join Inner, ((a#0 = a#4) && (tag#8 = tag#14))   +- Join Inner, ((a#0 = a#4) && (a = a))
    :- Union                                           :- Union
    :  :- Project [a#0, a AS tag#8]                    :  :- Project [a#0, a AS tag#8]
    :  :  +- LocalRelation [a#0]                       :  :  +- LocalRelation [a#0]
    :  +- Project [a#2, b AS tag#9]                    :  +- Project [a#2, b AS tag#9]
    :     +- LocalRelation [a#2]                       :     +- LocalRelation [a#2]
    +- Union                                           +- Union
       :- Project [a#4, a AS tag#14]                      :- Project [a#4, a AS tag#14]
       :  +- LocalRelation [a#4]                          :  +- LocalRelation [a#4]
       +- Project [a#6, b AS tag#15]                      +- Project [a#6, b AS tag#15]
          +- LocalRelation [a#6]                             +- LocalRelation [a#6]
```

Finally the Result of Batch Operator Optimizations is:

```
Project [a#4, tag#14]                              Project [a#4, tag#14]
!+- Join Inner, ((a#0 = a#4) && (tag#8 = tag#14))   +- Join Inner, (a#0 = a#4)
!   :- SubqueryAlias ta, `ta`                          :- Union
!   :  +- Union                                        :  :- LocalRelation [a#0]
!   :     :- Project [a#0, a AS tag#8]                 :  +- LocalRelation [a#2]
!   :     :  +- SubqueryAlias t1, `t1`                 +- Union
!   :     :     +- Project [a#0]                          :- LocalRelation [a#4, tag#14]
!   :     :        +- SubqueryAlias grouping              +- LocalRelation [a#6, tag#15]
!   :     :           +- LocalRelation [a#0]
!   :     +- Project [a#2, b AS tag#9]
!   :        +- SubqueryAlias t2, `t2`
!   :           +- Project [a#2]
!   :              +- SubqueryAlias grouping
!   :                 +- LocalRelation [a#2]
!   +- SubqueryAlias tb, `tb`
!      +- Union
!         :- Project [a#4, a AS tag#14]
!         :  +- SubqueryAlias t3, `t3`
!         :     +- Project [a#4]
!         :        +- SubqueryAlias grouping
!         :           +- LocalRelation [a#4]
!         +- Project [a#6, b AS tag#15]
!            +- SubqueryAlias t4, `t4`
!               +- Project [a#6]
!                  +- SubqueryAlias grouping
!                     +- LocalRelation [a#6]
```

The condition `tag#8 = tag#14` of INNER JOIN has been removed. This leads to the data of inner join being wrong.

After fix:

```
=== Result of Batch LocalRelation ===
 GlobalLimit 21                                           GlobalLimit 21
 +- LocalLimit 21                                         +- LocalLimit 21
    +- Project [a#4, tag#11]                                 +- Project [a#4, tag#11]
       +- Join Inner, ((a#0 = a#4) && (tag#8 = tag#11))         +- Join Inner, ((a#0 = a#4) && (tag#8 = tag#11))
!         :- SubqueryAlias ta                                      :- Union
!         :  +- Union                                              :  :- LocalRelation [a#0, tag#8]
!         :     :- Project [a#0, a AS tag#8]                       :  +- LocalRelation [a#2, tag#9]
!         :     :  +- SubqueryAlias t1                             +- Union
!         :     :     +- Project [a#0]                                :- LocalRelation [a#4, tag#11]
!         :     :        +- SubqueryAlias grouping                    +- LocalRelation [a#6, tag#12]
!         :     :           +- LocalRelation [a#0]
!         :     +- Project [a#2, b AS tag#9]
!         :        +- SubqueryAlias t2
!         :           +- Project [a#2]
!         :              +- SubqueryAlias grouping
!         :                 +- LocalRelation [a#2]
!         +- SubqueryAlias tb
!            +- Union
!               :- Project [a#4, a AS tag#11]
!               :  +- SubqueryAlias t3
!               :     +- Project [a#4]
!               :        +- SubqueryAlias grouping
!               :           +- LocalRelation [a#4]
!               +- Project [a#6, b AS tag#12]
!                  +- SubqueryAlias t4
!                     +- Project [a#6]
!                        +- SubqueryAlias grouping
!                           +- LocalRelation [a#6]
```

## How was this patch tested?

add sql-tests/inputs/inner-join.sql
All tests passed.

Author: Stan Zhai <zhaishidan@haizhi.com>

Closes #17099 from stanzhai/fix-inner-join.
2017-03-01 07:52:35 -08:00
Liang-Chi Hsieh 38e7835347 [SPARK-19736][SQL] refreshByPath should clear all cached plans with the specified path
## What changes were proposed in this pull request?

`Catalog.refreshByPath` can refresh the cache entry and the associated metadata for all dataframes (if any), that contain the given data source path.

However, `CacheManager.invalidateCachedPath` doesn't clear all cached plans with the specified path. It causes some strange behaviors reported in SPARK-15678.

## How was this patch tested?

Jenkins tests.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #17064 from viirya/fix-refreshByPath.
2017-03-01 00:19:57 -08:00
Liwei Lin 4913c92c2f [SPARK-19633][SS] FileSource read from FileSink
## What changes were proposed in this pull request?

Right now file source always uses `InMemoryFileIndex` to scan files from a given path.

But when reading the outputs from another streaming query, the file source should use `MetadataFileIndex` to list files from the sink log. This patch adds this support.

## `MetadataFileIndex` or `InMemoryFileIndex`
```scala
spark
  .readStream
  .format(...)
  .load("/some/path") // for a non-glob path:
                      //   - use `MetadataFileIndex` when `/some/path/_spark_meta` exists
                      //   - fall back to `InMemoryFileIndex` otherwise
```
```scala
spark
  .readStream
  .format(...)
  .load("/some/path/*/*") // for a glob path: always use `InMemoryFileIndex`
```

## How was this patch tested?

two newly added tests

Author: Liwei Lin <lwlin7@gmail.com>

Closes #16987 from lw-lin/source-read-from-sink.
2017-02-28 22:58:51 -08:00
Jeff Zhang 7315880568 [SPARK-19572][SPARKR] Allow to disable hive in sparkR shell
## What changes were proposed in this pull request?
SPARK-15236 do this for scala shell, this ticket is for sparkR shell. This is not only for sparkR itself, but can also benefit downstream project like livy which use shell.R for its interactive session. For now, livy has no control of whether enable hive or not.

## How was this patch tested?

Tested it manually, run `bin/sparkR --master local --conf spark.sql.catalogImplementation=in-memory` and verify hive is not enabled.

Author: Jeff Zhang <zjffdu@apache.org>

Closes #16907 from zjffdu/SPARK-19572.
2017-02-28 22:21:29 -08:00
hyukjinkwon 7e5359be5c [SPARK-19610][SQL] Support parsing multiline CSV files
## What changes were proposed in this pull request?

This PR proposes the support for multiple lines for CSV by resembling the multiline supports in JSON datasource (in case of JSON, per file).

So, this PR introduces `wholeFile` option which makes the format not splittable and reads each whole file. Since Univocity parser can produces each row from a stream, it should be capable of parsing very large documents when the internal rows are fix in the memory.

## How was this patch tested?

Unit tests in `CSVSuite` and `tests.py`

Manual tests with a single 9GB CSV file in local file system, for example,

```scala
spark.read.option("wholeFile", true).option("inferSchema", true).csv("tmp.csv").count()
```

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #16976 from HyukjinKwon/SPARK-19610.
2017-02-28 13:34:33 -08:00
windpiger ce233f18e3 [SPARK-19463][SQL] refresh cache after the InsertIntoHadoopFsRelationCommand
## What changes were proposed in this pull request?

If we first cache a DataSource table, then we insert some data into the table, we should refresh the data in the cache after the insert command.

## How was this patch tested?
unit test added

Author: windpiger <songjun@outlook.com>

Closes #16809 from windpiger/refreshCacheAfterInsert.
2017-02-28 11:59:18 -08:00
Roberto Agostino Vitillo 9734a928a7 [SPARK-19677][SS] Committing a delta file atop an existing one should not fail on HDFS
## What changes were proposed in this pull request?

HDFSBackedStateStoreProvider fails to rename files on HDFS but not on the local filesystem. According to the [implementation notes](https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/filesystem/filesystem.html) of `rename()`, the behavior of the local filesystem and HDFS varies:

> Destination exists and is a file
> Renaming a file atop an existing file is specified as failing, raising an exception.
>    - Local FileSystem : the rename succeeds; the destination file is replaced by the source file.
>    - HDFS : The rename fails, no exception is raised. Instead the method call simply returns false.

This patch ensures that `rename()` isn't called if the destination file already exists. It's still semantically correct because Structured Streaming requires that rerunning a batch should generate the same output.

## How was this patch tested?

This patch was tested by running `StateStoreSuite`.

Author: Roberto Agostino Vitillo <ra.vitillo@gmail.com>

Closes #17012 from vitillo/fix_rename.
2017-02-28 10:49:07 -08:00
Wenchen Fan 7c7fc30b4a [SPARK-19678][SQL] remove MetastoreRelation
## What changes were proposed in this pull request?

`MetastoreRelation` is used to represent table relation for hive tables, and provides some hive related information. We will resolve `SimpleCatalogRelation` to `MetastoreRelation` for hive tables, which is unnecessary as these 2 are the same essentially. This PR merges `SimpleCatalogRelation` and `MetastoreRelation`

## How was this patch tested?

existing tests

Author: Wenchen Fan <wenchen@databricks.com>

Closes #17015 from cloud-fan/table-relation.
2017-02-28 09:24:36 -08:00
Yuming Wang 9b8eca65dc [SPARK-19660][CORE][SQL] Replace the configuration property names that are deprecated in the version of Hadoop 2.6
## What changes were proposed in this pull request?

Replace all the Hadoop deprecated configuration property names according to [DeprecatedProperties](https://hadoop.apache.org/docs/r2.6.0/hadoop-project-dist/hadoop-common/DeprecatedProperties.html).

except:
https://github.com/apache/spark/blob/v2.1.0/python/pyspark/sql/tests.py#L1533
https://github.com/apache/spark/blob/v2.1.0/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala#L987
https://github.com/apache/spark/blob/v2.1.0/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala#L45
https://github.com/apache/spark/blob/v2.1.0/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L614

## How was this patch tested?

Existing tests

Author: Yuming Wang <wgyumg@gmail.com>

Closes #16990 from wangyum/HadoopDeprecatedProperties.
2017-02-28 10:13:42 +00:00
windpiger a350bc16d3 [SPARK-19748][SQL] refresh function has a wrong order to do cache invalidate and regenerate the inmemory var for InMemoryFileIndex with FileStatusCache
## What changes were proposed in this pull request?

If we refresh a InMemoryFileIndex with a FileStatusCache, it will first use the FileStatusCache to re-generate the cachedLeafFiles etc, then call FileStatusCache.invalidateAll.

While the order to do these two actions is wrong, this lead to the refresh action does not take effect.

```
  override def refresh(): Unit = {
    refresh0()
    fileStatusCache.invalidateAll()
  }

  private def refresh0(): Unit = {
    val files = listLeafFiles(rootPaths)
    cachedLeafFiles =
      new mutable.LinkedHashMap[Path, FileStatus]() ++= files.map(f => f.getPath -> f)
    cachedLeafDirToChildrenFiles = files.toArray.groupBy(_.getPath.getParent)
    cachedPartitionSpec = null
  }
```
## How was this patch tested?
unit test added

Author: windpiger <songjun@outlook.com>

Closes #17079 from windpiger/fixInMemoryFileIndexRefresh.
2017-02-28 00:16:49 -08:00
uncleGen 7353038353 [SPARK-19749][SS] Name socket source with a meaningful name
## What changes were proposed in this pull request?

Name socket source with a meaningful name

## How was this patch tested?

Jenkins

Author: uncleGen <hustyugm@gmail.com>

Closes #17082 from uncleGen/SPARK-19749.
2017-02-27 18:02:45 -08:00
hyukjinkwon 8a5a58506c [SPARK-15615][SQL][BUILD][FOLLOW-UP] Replace deprecated usage of json(RDD[String]) API
## What changes were proposed in this pull request?

This PR proposes to replace the deprecated `json(RDD[String])` usage to `json(Dataset[String])`.

This currently produces so many warnings.

## How was this patch tested?

Fixed tests.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #17071 from HyukjinKwon/SPARK-15615-followup.
2017-02-27 14:33:02 -08:00
hyukjinkwon 4ba9c6c453 [MINOR][BUILD] Fix lint-java breaks in Java
## What changes were proposed in this pull request?

This PR proposes to fix the lint-breaks as below:

```
[ERROR] src/test/java/org/apache/spark/network/TransportResponseHandlerSuite.java:[29,8] (imports) UnusedImports: Unused import - org.apache.spark.network.buffer.ManagedBuffer.
[ERROR] src/main/java/org/apache/spark/unsafe/types/UTF8String.java:[156,10] (modifier) ModifierOrder: 'Nonnull' annotation modifier does not precede non-annotation modifiers.
[ERROR] src/main/java/org/apache/spark/SparkFirehoseListener.java:[122] (sizes) LineLength: Line is longer than 100 characters (found 105).
[ERROR] src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java:[164,78] (coding) OneStatementPerLine: Only one statement per line allowed.
[ERROR] src/test/java/test/org/apache/spark/JavaAPISuite.java:[1157] (sizes) LineLength: Line is longer than 100 characters (found 121).
[ERROR] src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java:[149] (sizes) LineLength: Line is longer than 100 characters (found 113).
[ERROR] src/test/java/test/org/apache/spark/streaming/Java8APISuite.java:[146] (sizes) LineLength: Line is longer than 100 characters (found 122).
[ERROR] src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java:[32,8] (imports) UnusedImports: Unused import - org.apache.spark.streaming.Time.
[ERROR] src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java:[611] (sizes) LineLength: Line is longer than 100 characters (found 101).
[ERROR] src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java:[1317] (sizes) LineLength: Line is longer than 100 characters (found 102).
[ERROR] src/test/java/test/org/apache/spark/sql/JavaDatasetAggregatorSuite.java:[91] (sizes) LineLength: Line is longer than 100 characters (found 102).
[ERROR] src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java:[113] (sizes) LineLength: Line is longer than 100 characters (found 101).
[ERROR] src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java:[164] (sizes) LineLength: Line is longer than 100 characters (found 110).
[ERROR] src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java:[212] (sizes) LineLength: Line is longer than 100 characters (found 114).
[ERROR] src/test/java/org/apache/spark/mllib/tree/JavaDecisionTreeSuite.java:[36] (sizes) LineLength: Line is longer than 100 characters (found 101).
[ERROR] src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java:[26,8] (imports) UnusedImports: Unused import - com.amazonaws.regions.RegionUtils.
[ERROR] src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisStreamSuite.java:[20,8] (imports) UnusedImports: Unused import - com.amazonaws.regions.RegionUtils.
[ERROR] src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisStreamSuite.java:[94] (sizes) LineLength: Line is longer than 100 characters (found 103).
[ERROR] src/main/java/org/apache/spark/examples/ml/JavaTokenizerExample.java:[30,8] (imports) UnusedImports: Unused import - org.apache.spark.sql.api.java.UDF1.
[ERROR] src/main/java/org/apache/spark/examples/ml/JavaTokenizerExample.java:[72] (sizes) LineLength: Line is longer than 100 characters (found 104).
[ERROR] src/main/java/org/apache/spark/examples/mllib/JavaRankingMetricsExample.java:[121] (sizes) LineLength: Line is longer than 100 characters (found 101).
[ERROR] src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java:[28,8] (imports) UnusedImports: Unused import - org.apache.spark.api.java.JavaRDD.
[ERROR] src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java:[29,8] (imports) UnusedImports: Unused import - org.apache.spark.api.java.JavaSparkContext.
```

## How was this patch tested?

Manually via

```bash
./dev/lint-java
```

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #17072 from HyukjinKwon/java-lint.
2017-02-27 08:44:26 +00:00
Eyal Zituny 9f8e392159 [SPARK-19594][STRUCTURED STREAMING] StreamingQueryListener fails to handle QueryTerminatedEvent if more then one listeners exists
## What changes were proposed in this pull request?

currently if multiple streaming queries listeners exists, when a QueryTerminatedEvent is triggered, only one of the listeners will be invoked while the rest of the listeners will ignore the event.
this is caused since the the streaming queries listeners bus holds a set of running queries ids and when a termination event is triggered, after the first listeners is handling the event, the terminated query id is being removed from the set.
in this PR, the query id will be removed from the set only after all the listeners handles the event

## How was this patch tested?

a test with multiple listeners has been added to StreamingQueryListenerSuite

Author: Eyal Zituny <eyal.zituny@equalum.io>

Closes #16991 from eyalzit/master.
2017-02-26 15:57:32 -08:00
Dilip Biswal 68f2142cfd [SQL] Duplicate test exception in SQLQueryTestSuite due to meta files(.DS_Store) on Mac
## What changes were proposed in this pull request?
After adding the tests for subquery, we now have multiple level of directories under "sql-tests/inputs".  Some times on Mac while using Finder application it creates the meta data files called ".DS_Store". When these files are present at different levels in directory hierarchy, we get duplicate test exception while running the tests  as we just use the file name as the test case name. In this PR, we use the relative file path from the base directory along with the test file as the test name. Also after this change, we can have the same test file name under different directory like exists/basic.sql , in/basic.sql. Here is the truncated output of the test run after the change.

```SQL
info] SQLQueryTestSuite:
[info] - arithmetic.sql (5 seconds, 235 milliseconds)
[info] - array.sql (536 milliseconds)
[info] - blacklist.sql !!! IGNORED !!!
[info] - cast.sql (550 milliseconds)
....
....
....
[info] - union.sql (315 milliseconds)
[info] - subquery/.DS_Store !!! IGNORED !!!
[info] - subquery/exists-subquery/.DS_Store !!! IGNORED !!!
[info] - subquery/exists-subquery/exists-aggregate.sql (2 seconds, 451 milliseconds)
....
....
[info] - subquery/in-subquery/in-group-by.sql (12 seconds, 264 milliseconds)
....
....
[info] - subquery/scalar-subquery/scalar-subquery-predicate.sql (7 seconds, 769 milliseconds)
[info] - subquery/scalar-subquery/scalar-subquery-select.sql (4 seconds, 119 milliseconds)
```
Since this is a simple change, i haven't created a JIRA for it.
## How was this patch tested?
Manually verified. This is change to test infrastructure

Author: Dilip Biswal <dbiswal@us.ibm.com>

Closes #17060 from dilipbiswal/sqlquerytestsuite.
2017-02-25 23:56:57 -08:00
Herman van Hovell 8f0511ed49 [SPARK-19650] Commands should not trigger a Spark job
Spark executes SQL commands eagerly. It does this by creating an RDD which contains the command's results. The downside to this is that any action on this RDD triggers a Spark job which is expensive and is unnecessary.

This PR fixes this by avoiding the materialization of an `RDD` for `Command`s; it just materializes the result and puts them in a `LocalRelation`.

Added a regression test to `SQLQuerySuite`.

Author: Herman van Hovell <hvanhovell@databricks.com>

Closes #17027 from hvanhovell/no-job-command.
2017-02-24 23:05:59 -08:00
Xiao Li 4cb025afaf [SPARK-19735][SQL] Remove HOLD_DDLTIME from Catalog APIs
### What changes were proposed in this pull request?
As explained in Hive JIRA https://issues.apache.org/jira/browse/HIVE-12224, HOLD_DDLTIME was broken as soon as it landed. Hive 2.0 removes HOLD_DDLTIME from the API. In Spark SQL, we always set it to FALSE. Like Hive, we should also remove it from our Catalog APIs.

### How was this patch tested?
N/A

Author: Xiao Li <gatorsmile@gmail.com>

Closes #17063 from gatorsmile/removalHoldDDLTime.
2017-02-24 23:03:59 -08:00
wangzhenhua 69d0da6373 [SPARK-17078][SQL] Show stats when explain
## What changes were proposed in this pull request?

Currently we can only check the estimated stats in logical plans by debugging. We need to provide an easier and more efficient way for developers/users.

In this pr, we add EXPLAIN COST command to show stats in the optimized logical plan.
E.g.
```
spark-sql> EXPLAIN COST select count(1) from store_returns;

...
== Optimized Logical Plan ==
Aggregate [count(1) AS count(1)#24L], Statistics(sizeInBytes=16.0 B, rowCount=1, isBroadcastable=false)
+- Project, Statistics(sizeInBytes=4.3 GB, rowCount=5.76E+8, isBroadcastable=false)
   +- Relation[sr_returned_date_sk#3,sr_return_time_sk#4,sr_item_sk#5,sr_customer_sk#6,sr_cdemo_sk#7,sr_hdemo_sk#8,sr_addr_sk#9,sr_store_sk#10,sr_reason_sk#11,sr_ticket_number#12,sr_return_quantity#13,sr_return_amt#14,sr_return_tax#15,sr_return_amt_inc_tax#16,sr_fee#17,sr_return_ship_cost#18,sr_refunded_cash#19,sr_reversed_charge#20,sr_store_credit#21,sr_net_loss#22] parquet, Statistics(sizeInBytes=28.6 GB, rowCount=5.76E+8, isBroadcastable=false)
...
```

## How was this patch tested?

Add test cases.

Author: wangzhenhua <wangzhenhua@huawei.com>
Author: Zhenhua Wang <wzh_zju@163.com>

Closes #16594 from wzhfy/showStats.
2017-02-24 10:24:59 -08:00
windpiger 8f33731e79 [SPARK-19664][SQL] put hive.metastore.warehouse.dir in hadoopconf to overwrite its original value
## What changes were proposed in this pull request?

In [SPARK-15959](https://issues.apache.org/jira/browse/SPARK-15959), we bring back the `hive.metastore.warehouse.dir` , while in the logic, when use the value of  `spark.sql.warehouse.dir` to overwrite `hive.metastore.warehouse.dir` , it set it to `sparkContext.conf` which does not overwrite the value is hadoopConf, I think it should put in `sparkContext.hadoopConfiguration` and overwrite the original value of hadoopConf

https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala#L64

## How was this patch tested?
N/A

Author: windpiger <songjun@outlook.com>

Closes #16996 from windpiger/hivemetawarehouseConf.
2017-02-23 22:57:23 -08:00
Carson Wang eff7b40890 [SPARK-19674][SQL] Ignore driver accumulator updates don't belong to the execution when merging all accumulator updates
## What changes were proposed in this pull request?
In SQLListener.getExecutionMetrics, driver accumulator updates don't belong to the execution should be ignored when merging all accumulator updates to prevent NoSuchElementException.

## How was this patch tested?
Updated unit test.

Author: Carson Wang <carson.wang@intel.com>

Closes #17009 from carsonwang/FixSQLMetrics.
2017-02-23 14:31:16 -08:00
Takeshi Yamamuro 09ed6e7711 [SPARK-18699][SQL] Put malformed tokens into a new field when parsing CSV data
## What changes were proposed in this pull request?
This pr added a logic to put malformed tokens into a new field when parsing CSV data  in case of permissive modes. In the current master, if the CSV parser hits these malformed ones, it throws an exception below (and then a job fails);
```
Caused by: java.lang.IllegalArgumentException
	at java.sql.Date.valueOf(Date.java:143)
	at org.apache.spark.sql.catalyst.util.DateTimeUtils$.stringToTime(DateTimeUtils.scala:137)
	at org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$$anonfun$castTo$6.apply$mcJ$sp(CSVInferSchema.scala:272)
	at org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$$anonfun$castTo$6.apply(CSVInferSchema.scala:272)
	at org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$$anonfun$castTo$6.apply(CSVInferSchema.scala:272)
	at scala.util.Try.getOrElse(Try.scala:79)
	at org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$.castTo(CSVInferSchema.scala:269)
	at
```
In case that users load large CSV-formatted data, the job failure makes users get some confused. So, this fix set NULL for original columns and put malformed tokens in a new field.

## How was this patch tested?
Added tests in `CSVSuite`.

Author: Takeshi Yamamuro <yamamuro@apache.org>

Closes #16928 from maropu/SPARK-18699-2.
2017-02-23 12:09:36 -08:00
Shixiong Zhu 9bf4e2baad [SPARK-19497][SS] Implement streaming deduplication
## What changes were proposed in this pull request?

This PR adds a special streaming deduplication operator to support `dropDuplicates` with `aggregation` and watermark. It reuses the `dropDuplicates` API but creates new logical plan `Deduplication` and new physical plan `DeduplicationExec`.

The following cases are supported:

- one or multiple `dropDuplicates()` without aggregation (with or without watermark)
- `dropDuplicates` before aggregation

Not supported cases:

- `dropDuplicates` after aggregation

Breaking changes:
- `dropDuplicates` without aggregation doesn't work with `complete` or `update` mode.

## How was this patch tested?

The new unit tests.

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #16970 from zsxwing/dedup.
2017-02-23 11:25:39 -08:00
Takeshi Yamamuro 93aa427159 [SPARK-19691][SQL] Fix ClassCastException when calculating percentile of decimal column
## What changes were proposed in this pull request?
This pr fixed a class-cast exception below;
```
scala> spark.range(10).selectExpr("cast (id as decimal) as x").selectExpr("percentile(x, 0.5)").collect()
 java.lang.ClassCastException: org.apache.spark.sql.types.Decimal cannot be cast to java.lang.Number
	at org.apache.spark.sql.catalyst.expressions.aggregate.Percentile.update(Percentile.scala:141)
	at org.apache.spark.sql.catalyst.expressions.aggregate.Percentile.update(Percentile.scala:58)
	at org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate.update(interfaces.scala:514)
	at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$1$$anonfun$applyOrElse$1.apply(AggregationIterator.scala:171)
	at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$1$$anonfun$applyOrElse$1.apply(AggregationIterator.scala:171)
	at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateProcessRow$1.apply(AggregationIterator.scala:187)
	at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateProcessRow$1.apply(AggregationIterator.scala:181)
	at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.processInputs(ObjectAggregationIterator.scala:151)
	at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.<init>(ObjectAggregationIterator.scala:78)
	at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:109)
	at
```
This fix simply converts catalyst values (i.e., `Decimal`) into scala ones by using `CatalystTypeConverters`.

## How was this patch tested?
Added a test in `DataFrameSuite`.

Author: Takeshi Yamamuro <yamamuro@apache.org>

Closes #17028 from maropu/SPARK-19691.
2017-02-23 16:28:36 +01:00
Takeshi Yamamuro 769aa0f1d2 [SPARK-19695][SQL] Throw an exception if a columnNameOfCorruptRecord field violates requirements in json formats
## What changes were proposed in this pull request?
This pr comes from #16928 and fixed a json behaviour along with the CSV one.

## How was this patch tested?
Added tests in `JsonSuite`.

Author: Takeshi Yamamuro <yamamuro@apache.org>

Closes #17023 from maropu/SPARK-19695.
2017-02-22 21:39:20 -08:00
pj.fanning d3147502e7 [SPARK-15615][SQL] Add an API to load DataFrame from Dataset[String] storing JSON
## What changes were proposed in this pull request?

SPARK-15615 proposes replacing the sqlContext.read.json(rdd) with a dataset equivalent.
SPARK-15463 adds a CSV API for reading from Dataset[String] so this keeps the API consistent.
I am deprecating the existing RDD based APIs.

## How was this patch tested?

There are existing tests. I left most tests to use the existing APIs as they delegate to the new json API.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: pj.fanning <pj.fanning@workday.com>
Author: PJ Fanning <pjfanning@users.noreply.github.com>

Closes #16895 from pjfanning/SPARK-15615.
2017-02-22 18:03:25 -08:00
Xiao Li dc005ed53c [SPARK-19658][SQL] Set NumPartitions of RepartitionByExpression In Parser
### What changes were proposed in this pull request?

Currently, if `NumPartitions` is not set in RepartitionByExpression, we will set it using `spark.sql.shuffle.partitions` during Planner. However, this is not following the general resolution process. This PR is to set it in `Parser` and then `Optimizer` can use the value for plan optimization.

### How was this patch tested?

Added a test case.

Author: Xiao Li <gatorsmile@gmail.com>

Closes #16988 from gatorsmile/resolveRepartition.
2017-02-22 17:26:56 -08:00
hyukjinkwon 37112fcfcd [SPARK-19666][SQL] Skip a property without getter in Java schema inference and allow empty bean in encoder creation
## What changes were proposed in this pull request?

This PR proposes to fix two.

**Skip a property without a getter in beans**

Currently, if we use a JavaBean without the getter as below:

```java
public static class BeanWithoutGetter implements Serializable {
  private String a;

  public void setA(String a) {
    this.a = a;
  }
}

BeanWithoutGetter bean = new BeanWithoutGetter();
List<BeanWithoutGetter> data = Arrays.asList(bean);
spark.createDataFrame(data, BeanWithoutGetter.class).show();
```

- Before

It throws an exception as below:

```
java.lang.NullPointerException
	at org.spark_project.guava.reflect.TypeToken.method(TypeToken.java:465)
	at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$2.apply(JavaTypeInference.scala:126)
	at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$2.apply(JavaTypeInference.scala:125)
```

- After

```
++
||
++
||
++
```

**Supports empty bean in encoder creation**

```java
public static class EmptyBean implements Serializable {}

EmptyBean bean = new EmptyBean();
List<EmptyBean> data = Arrays.asList(bean);
spark.createDataset(data, Encoders.bean(EmptyBean.class)).show();
```

- Before

throws an exception as below:

```
java.lang.UnsupportedOperationException: Cannot infer type for class EmptyBean because it is not bean-compliant
	at org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$serializerFor(JavaTypeInference.scala:436)
	at org.apache.spark.sql.catalyst.JavaTypeInference$.serializerFor(JavaTypeInference.scala:341)
```

- After

```
++
||
++
||
++
```

## How was this patch tested?

Unit test in `JavaDataFrameSuite`.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #17013 from HyukjinKwon/SPARK-19666.
2017-02-22 12:42:23 -08:00
Xiao Li 1a45d2b2cc [SPARK-19670][SQL][TEST] Enable Bucketed Table Reading and Writing Testing Without Hive Support
### What changes were proposed in this pull request?
Bucketed table reading and writing does not need Hive support. We can move the test cases from `sql/hive` to `sql/core`. After this PR, we can improve the test case coverage. Bucket table reading and writing can be tested with and without Hive support.

### How was this patch tested?
N/A

Author: Xiao Li <gatorsmile@gmail.com>

Closes #17004 from gatorsmile/mvTestCaseForBuckets.
2017-02-21 19:30:36 -08:00
hyukjinkwon 17b93b5feb
[SPARK-18922][TESTS] Fix new test failures on Windows due to path and resource not closed
## What changes were proposed in this pull request?

This PR proposes to fix new test failures on WIndows as below:

**Before**

```
KafkaRelationSuite:
 - test late binding start offsets *** FAILED *** (7 seconds, 679 milliseconds)
   Cause: java.nio.file.FileSystemException: C:\projects\spark\target\tmp\spark-4c4b0cd1-4cb7-4908-949d-1b0cc8addb50\topic-4-0\00000000000000000000.log -> C:\projects\spark\target\tmp\spark-4c4b0cd1-4cb7-4908-949d-1b0cc8addb50\topic-4-0\00000000000000000000.log.deleted: The process cannot access the file because it is being used by another process.

KafkaSourceSuite:
 - deserialization of initial offset with Spark 2.1.0 *** FAILED *** (3 seconds, 542 milliseconds)
   java.io.IOException: Failed to delete: C:\projects\spark\target\tmp\spark-97ef64fc-ae61-4ce3-ac59-287fd38bd824

 - deserialization of initial offset written by Spark 2.1.0 *** FAILED *** (60 milliseconds)
   java.nio.file.InvalidPathException: Illegal char <:> at index 2: /C:/projects/spark/external/kafka-0-10-sql/target/scala-2.11/test-classes/kafka-source-initial-offset-version-2.1.0.b

HiveDDLSuite:
 - partitioned table should always put partition columns at the end of table schema *** FAILED *** (657 milliseconds)
   org.apache.spark.sql.AnalysisException: Path does not exist: file:/C:projectsspark	arget	mpspark-f1b83d09-850a-4bba-8e43-a2a28dfaa757;

DDLSuite:
 - create a data source table without schema *** FAILED *** (94 milliseconds)
   org.apache.spark.sql.AnalysisException: Path does not exist: file:/C:projectsspark	arget	mpspark-a3f3c161-afae-4d6f-9182-e8642f77062b;

 - SET LOCATION for managed table *** FAILED *** (219 milliseconds)
   org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
 Exchange SinglePartit
 +- *HashAggregate(keys=[], functions=[partial_count(1)], output=[count#99367L])
    +- *FileScan parquet default.tbl[] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/C:projectsspark	arget	mpspark-15be2f2f-4ea9-4c47-bfee-1b7b49363033], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<>

 - insert data to a data source table which has a not existed location should succeed *** FAILED *** (16 milliseconds)
   org.apache.spark.sql.AnalysisException: Path does not exist: file:/C:projectsspark	arget	mpspark-34987671-e8d1-4624-ba5b-db1012e1246b;

 - insert into a data source table with no existed partition location should succeed *** FAILED *** (16 milliseconds)
   org.apache.spark.sql.AnalysisException: Path does not exist: file:/C:projectsspark	arget	mpspark-4c6ccfbf-4091-4032-9fbc-3d40c58267d5;

 - read data from a data source table which has a not existed location should succeed *** FAILED *** (0 milliseconds)

 - read data from a data source table with no existed partition location should succeed *** FAILED *** (0 milliseconds)
   org.apache.spark.sql.AnalysisException: Path does not exist: file:/C:projectsspark	arget	mpspark-6af39e37-abd1-44e8-ac68-e2dfcf67a2f3;

InputOutputMetricsSuite:
 - output metrics on records written *** FAILED *** (0 milliseconds)
   java.lang.IllegalArgumentException: Wrong FS: file://C:\projects\spark\target\tmp\spark-cd69ee77-88f2-4202-bed6-19c0ee05ef55\InputOutputMetricsSuite, expected: file:///

 - output metrics on records written - new Hadoop API *** FAILED *** (16 milliseconds)
   java.lang.IllegalArgumentException: Wrong FS: file://C:\projects\spark\target\tmp\spark-b69e8fcb-047b-4de8-9cdf-5f026efb6762\InputOutputMetricsSuite, expected: file:///
```

**After**

```
KafkaRelationSuite:
 - test late binding start offsets !!! CANCELED !!! (62 milliseconds)

KafkaSourceSuite:
 - deserialization of initial offset with Spark 2.1.0 (5 seconds, 341 milliseconds)
 - deserialization of initial offset written by Spark 2.1.0 (910 milliseconds)

HiveDDLSuite:
 - partitioned table should always put partition columns at the end of table schema (2 seconds)

DDLSuite:
 - create a data source table without schema (828 milliseconds)
 - SET LOCATION for managed table (406 milliseconds)
 - insert data to a data source table which has a not existed location should succeed (406 milliseconds)
 - insert into a data source table with no existed partition location should succeed (453 milliseconds)
 - read data from a data source table which has a not existed location should succeed (94 milliseconds)
 - read data from a data source table with no existed partition location should succeed (265 milliseconds)

InputOutputMetricsSuite:
 - output metrics on records written (172 milliseconds)
 - output metrics on records written - new Hadoop API (297 milliseconds)
```

## How was this patch tested?

Fixed tests in `InputOutputMetricsSuite`, `KafkaRelationSuite`,  `KafkaSourceSuite`, `DDLSuite.scala` and `HiveDDLSuite`.

Manually tested via AppVeyor as below:

`InputOutputMetricsSuite`: https://ci.appveyor.com/project/spark-test/spark/build/633-20170219-windows-test/job/ex8nvwa6tsh7rmto
`KafkaRelationSuite`: https://ci.appveyor.com/project/spark-test/spark/build/633-20170219-windows-test/job/h8dlcowew52y8ncw
`KafkaSourceSuite`: https://ci.appveyor.com/project/spark-test/spark/build/634-20170219-windows-test/job/9ybgjl7yeubxcre4
`DDLSuite`: https://ci.appveyor.com/project/spark-test/spark/build/635-20170219-windows-test
`HiveDDLSuite`: https://ci.appveyor.com/project/spark-test/spark/build/633-20170219-windows-test/job/up6o9n47er087ltb

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #16999 from HyukjinKwon/windows-fix.
2017-02-20 21:26:54 -08:00
windpiger 73f065569d [SPARK-19669][HOTFIX][SQL] sessionState access privileges compiled failed in TestSQLContext
## What changes were proposed in this pull request?

In [SPARK-19669](0733a54a45) change the sessionState access privileges from private to public, this lead to the compile failed in TestSQLContext

this pr is a hotfix for this.

## How was this patch tested?
N/A

Author: windpiger <songjun@outlook.com>

Closes #17008 from windpiger/hotfixcompile.
2017-02-20 19:20:23 -08:00
Reynold Xin 0733a54a45 [SPARK-19669][SQL] Open up visibility for sharedState, sessionState, and a few other functions
## What changes were proposed in this pull request?
To ease debugging, most of Spark SQL internals have public level visibility. Two of the most important internal states, sharedState and sessionState, however, are package private. It would make more sense to open these up as well with clear documentation that they are internal.

In addition, users currently have way to set active/default SparkSession, but no way to actually get them back. We should open those up as well.

## How was this patch tested?
N/A - only visibility change.

Author: Reynold Xin <rxin@databricks.com>

Closes #17002 from rxin/SPARK-19669.
2017-02-20 12:21:07 -08:00
Wenchen Fan 776b8f17cf [SPARK-19563][SQL] avoid unnecessary sort in FileFormatWriter
## What changes were proposed in this pull request?

In `FileFormatWriter`, we will sort the input rows by partition columns and bucket id and sort columns, if we want to write data out partitioned or bucketed.

However, if the data is already sorted, we will sort it again, which is unnecssary.

This PR removes the sorting logic in `FileFormatWriter` and use `SortExec` instead. We will not add `SortExec` if the data is already sorted.

## How was this patch tested?

I did a micro benchmark manually
```
val df = spark.range(10000000).select($"id", $"id" % 10 as "part").sort("part")
spark.time(df.write.partitionBy("part").parquet("/tmp/test"))
```
The result was about 6.4 seconds before this PR, and is 5.7 seconds afterwards.

close https://github.com/apache/spark/pull/16724

Author: Wenchen Fan <wenchen@databricks.com>

Closes #16898 from cloud-fan/writer.
2017-02-19 18:13:12 -08:00
windpiger 65fe902e13 [SPARK-19598][SQL] Remove the alias parameter in UnresolvedRelation
## What changes were proposed in this pull request?

Remove the alias parameter in `UnresolvedRelation`, and use `SubqueryAlias` to replace it.
This can simplify some `match case` situations.

For example, the broadcast hint pull request can have one fewer case https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala#L57-L61

## How was this patch tested?
add some unit tests

Author: windpiger <songjun@outlook.com>

Closes #16956 from windpiger/removeUnresolveTableAlias.
2017-02-19 16:50:16 -08:00
Sean Owen 1487c9af20
[SPARK-19534][TESTS] Convert Java tests to use lambdas, Java 8 features
## What changes were proposed in this pull request?

Convert tests to use Java 8 lambdas, and modest related fixes to surrounding code.

## How was this patch tested?

Jenkins tests

Author: Sean Owen <sowen@cloudera.com>

Closes #16964 from srowen/SPARK-19534.
2017-02-19 09:42:50 -08:00
jinxing ba8912e5f3
[SPARK-19450] Replace askWithRetry with askSync.
## What changes were proposed in this pull request?

`askSync` is already added in `RpcEndpointRef` (see SPARK-19347 and https://github.com/apache/spark/pull/16690#issuecomment-276850068) and `askWithRetry` is marked as deprecated.
As mentioned SPARK-18113(https://github.com/apache/spark/pull/16503#event-927953218):

>askWithRetry is basically an unneeded API, and a leftover from the akka days that doesn't make sense anymore. It's prone to cause deadlocks (exactly because it's blocking), it imposes restrictions on the caller (e.g. idempotency) and other things that people generally don't pay that much attention to when using it.

Since `askWithRetry` is just used inside spark and not in user logic. It might make sense to replace all of them with `askSync`.

## How was this patch tested?
This PR doesn't change code logic, existing unit test can cover.

Author: jinxing <jinxing@meituan.com>

Closes #16790 from jinxing64/SPARK-19450.
2017-02-19 04:34:07 -08:00
Ala Luszczak b486ffc86d [SPARK-19447] Make Range operator generate "recordsRead" metric
## What changes were proposed in this pull request?

The Range was modified to produce "recordsRead" metric instead of "generated rows". The tests were updated and partially moved to SQLMetricsSuite.

## How was this patch tested?

Unit tests.

Author: Ala Luszczak <ala@databricks.com>

Closes #16960 from ala/range-records-read.
2017-02-18 07:51:41 -08:00
Shixiong Zhu 15b144d2bf [SPARK-19617][SS] Fix the race condition when starting and stopping a query quickly
## What changes were proposed in this pull request?

The streaming thread in StreamExecution uses the following ways to check if it should exit:
- Catch an InterruptException.
- `StreamExecution.state` is TERMINATED.

When starting and stopping a query quickly, the above two checks may both fail:
- Hit [HADOOP-14084](https://issues.apache.org/jira/browse/HADOOP-14084) and swallow InterruptException
- StreamExecution.stop is called before `state` becomes `ACTIVE`. Then [runBatches](dcc2d540a5/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala (L252)) changes the state from `TERMINATED` to `ACTIVE`.

If the above cases both happen, the query will hang forever.

This PR changes `state` to `AtomicReference` and uses`compareAndSet` to make sure we only change the state from `INITIALIZING` to `ACTIVE`. It also removes the `runUninterruptibly` hack from ``HDFSMetadata`, because HADOOP-14084 won't cause any problem after we fix the race condition.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #16947 from zsxwing/SPARK-19617.
2017-02-17 19:04:45 -08:00