Commit graph

2720 commits

Author SHA1 Message Date
anabranch 1f6ded6455 [SPARK-19127][DOCS] Update Rank Function Documentation
## What changes were proposed in this pull request?

- [X] Fix inconsistencies in function reference for dense rank and dense
- [X] Make all languages equivalent in their reference to `dense_rank` and `rank`.

## How was this patch tested?

N/A for docs.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: anabranch <wac.chambers@gmail.com>

Closes #16505 from anabranch/SPARK-19127.
2017-01-08 17:53:53 -08:00
Dilip Biswal 4351e62207 [SPARK-19093][SQL] Cached tables are not used in SubqueryExpression
## What changes were proposed in this pull request?
Consider the plans inside subquery expressions while looking up cache manager to make
use of cached data. Currently CacheManager.useCachedData does not consider the
subquery expressions in the plan.

SQL
```
select * from rows where not exists (select * from rows)
```
Before the fix
```
== Optimized Logical Plan ==
Join LeftAnti
:- InMemoryRelation [_1#3775, _2#3776], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
:     +- *FileScan parquet [_1#3775,_2#3776] Batched: true, Format: Parquet, Location: InMemoryFileIndex[dbfs:/tmp/rows], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<_1:string,_2:string>
+- Project [_1#3775 AS _1#3775#4001, _2#3776 AS _2#3776#4002]
   +- Relation[_1#3775,_2#3776] parquet
```

After
```
== Optimized Logical Plan ==
Join LeftAnti
:- InMemoryRelation [_1#256, _2#257], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
:     +- *FileScan parquet [_1#256,_2#257] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/tmp/rows], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<_1:string,_2:string>
+- Project [_1#256 AS _1#256#298, _2#257 AS _2#257#299]
   +- InMemoryRelation [_1#256, _2#257], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
         +- *FileScan parquet [_1#256,_2#257] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/tmp/rows], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<_1:string,_2:string>
```

Query2
```
 SELECT * FROM t1
 WHERE
 c1 IN (SELECT c1 FROM t2 WHERE c1 IN (SELECT c1 FROM t3 WHERE c1 = 1))
```
Before
```
== Analyzed Logical Plan ==
c1: int
Project [c1#3]
+- Filter predicate-subquery#47 [(c1#3 = c1#10)]
   :  +- Project [c1#10]
   :     +- Filter predicate-subquery#46 [(c1#10 = c1#17)]
   :        :  +- Project [c1#17]
   :        :     +- Filter (c1#17 = 1)
   :        :        +- SubqueryAlias t3, `t3`
   :        :           +- Project [value#15 AS c1#17]
   :        :              +- LocalRelation [value#15]
   :        +- SubqueryAlias t2, `t2`
   :           +- Project [value#8 AS c1#10]
   :              +- LocalRelation [value#8]
   +- SubqueryAlias t1, `t1`
      +- Project [value#1 AS c1#3]
         +- LocalRelation [value#1]

== Optimized Logical Plan ==
Join LeftSemi, (c1#3 = c1#10)
:- InMemoryRelation [c1#3], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), t1
:     +- LocalTableScan [c1#3]
+- Project [value#8 AS c1#10]
   +- Join LeftSemi, (value#8 = c1#17)
      :- LocalRelation [value#8]
      +- Project [value#15 AS c1#17]
         +- Filter (value#15 = 1)
            +- LocalRelation [value#15]

```
After
```
== Analyzed Logical Plan ==
c1: int
Project [c1#3]
+- Filter predicate-subquery#47 [(c1#3 = c1#10)]
   :  +- Project [c1#10]
   :     +- Filter predicate-subquery#46 [(c1#10 = c1#17)]
   :        :  +- Project [c1#17]
   :        :     +- Filter (c1#17 = 1)
   :        :        +- SubqueryAlias t3, `t3`
   :        :           +- Project [value#15 AS c1#17]
   :        :              +- LocalRelation [value#15]
   :        +- SubqueryAlias t2, `t2`
   :           +- Project [value#8 AS c1#10]
   :              +- LocalRelation [value#8]
   +- SubqueryAlias t1, `t1`
      +- Project [value#1 AS c1#3]
         +- LocalRelation [value#1]

== Optimized Logical Plan ==
Join LeftSemi, (c1#3 = c1#10)
:- InMemoryRelation [c1#3], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), t1
:     +- LocalTableScan [c1#3]
+- Join LeftSemi, (c1#10 = c1#17)
   :- InMemoryRelation [c1#10], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), t2
   :     +- LocalTableScan [c1#10]
   +- Filter (c1#17 = 1)
      +- InMemoryRelation [c1#17], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), t1
            +- LocalTableScan [c1#3]
```
## How was this patch tested?
Added new tests in CachedTableSuite.

Author: Dilip Biswal <dbiswal@us.ibm.com>

Closes #16493 from dilipbiswal/SPARK-19093.
2017-01-08 23:09:07 +01:00
Wenchen Fan b3d39620c5 [SPARK-19085][SQL] cleanup OutputWriterFactory and OutputWriter
## What changes were proposed in this pull request?

`OutputWriterFactory`/`OutputWriter` are internal interfaces and we can remove some unnecessary APIs:
1. `OutputWriterFactory.newWriter(path: String)`: no one calls it and no one implements it.
2. `OutputWriter.write(row: Row)`: during execution we only call `writeInternal`, which is weird as `OutputWriter` is already an internal interface. We should rename `writeInternal` to `write` and remove `def write(row: Row)` and it's related converter code. All implementations should just implement `def write(row: InternalRow)`

## How was this patch tested?

existing tests.

Author: Wenchen Fan <wenchen@databricks.com>

Closes #16479 from cloud-fan/hive-writer.
2017-01-08 00:42:09 +08:00
Tathagata Das b59cddaba0 [SPARK-19074][SS][DOCS] Updated Structured Streaming Programming Guide for update mode and source/sink options
## What changes were proposed in this pull request?

Updates
- Updated Late Data Handling section by adding a figure for Update Mode. Its more intuitive to explain late data handling with Update Mode, so I added the new figure before the Append Mode figure.
- Updated Output Modes section with Update mode
- Added options for all the sources and sinks

---------------------------
---------------------------

![image](https://cloud.githubusercontent.com/assets/663212/21665176/f150b224-d29f-11e6-8372-14d32da21db9.png)

---------------------------
---------------------------
<img width="931" alt="screen shot 2017-01-03 at 6 09 11 pm" src="https://cloud.githubusercontent.com/assets/663212/21629740/d21c9bb8-d1df-11e6-915b-488a59589fa6.png">
<img width="933" alt="screen shot 2017-01-03 at 6 10 00 pm" src="https://cloud.githubusercontent.com/assets/663212/21629749/e22bdabe-d1df-11e6-86d3-7e51d2f28dbc.png">

---------------------------
---------------------------
![image](https://cloud.githubusercontent.com/assets/663212/21665200/108e18fc-d2a0-11e6-8640-af598cab090b.png)
![image](https://cloud.githubusercontent.com/assets/663212/21665148/cfe414fa-d29f-11e6-9baa-4124ccbab093.png)
![image](https://cloud.githubusercontent.com/assets/663212/21665226/2e8f39e4-d2a0-11e6-85b1-7657e2df5491.png)

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #16468 from tdas/SPARK-19074.
2017-01-06 11:29:01 -08:00
Michal Senkyr 903bb8e8a2 [SPARK-16792][SQL] Dataset containing a Case Class with a List type causes a CompileException (converting sequence to list)
## What changes were proposed in this pull request?

Added a `to` call at the end of the code generated by `ScalaReflection.deserializerFor` if the requested type is not a supertype of `WrappedArray[_]` that uses `CanBuildFrom[_, _, _]` to convert result into an arbitrary subtype of `Seq[_]`.

Care was taken to preserve the original deserialization where it is possible to avoid the overhead of conversion in cases where it is not needed

`ScalaReflection.serializerFor` could already be used to serialize any `Seq[_]` so it was not altered

`SQLImplicits` had to be altered and new implicit encoders added to permit serialization of other sequence types

Also fixes [SPARK-16815] Dataset[List[T]] leads to ArrayStoreException

## How was this patch tested?
```bash
./build/mvn -DskipTests clean package && ./dev/run-tests
```

Also manual execution of the following sets of commands in the Spark shell:
```scala
case class TestCC(key: Int, letters: List[String])

val ds1 = sc.makeRDD(Seq(
(List("D")),
(List("S","H")),
(List("F","H")),
(List("D","L","L"))
)).map(x=>(x.length,x)).toDF("key","letters").as[TestCC]

val test1=ds1.map{_.key}
test1.show
```

```scala
case class X(l: List[String])
spark.createDataset(Seq(List("A"))).map(X).show
```

```scala
spark.sqlContext.createDataset(sc.parallelize(List(1) :: Nil)).collect
```

After adding arbitrary sequence support also tested with the following commands:

```scala
case class QueueClass(q: scala.collection.immutable.Queue[Int])

spark.createDataset(Seq(List(1,2,3))).map(x => QueueClass(scala.collection.immutable.Queue(x: _*))).map(_.q.dequeue).collect
```

Author: Michal Senkyr <mike.senkyr@gmail.com>

Closes #16240 from michalsenkyr/sql-caseclass-list-fix.
2017-01-06 15:05:20 +08:00
Wenchen Fan cca945b6aa [SPARK-18885][SQL] unify CREATE TABLE syntax for data source and hive serde tables
## What changes were proposed in this pull request?

Today we have different syntax to create data source or hive serde tables, we should unify them to not confuse users and step forward to make hive a data source.

Please read https://issues.apache.org/jira/secure/attachment/12843835/CREATE-TABLE.pdf for  details.

TODO(for follow-up PRs):
1. TBLPROPERTIES is not added to the new syntax, we should decide if we wanna add it later.
2. `SHOW CREATE TABLE` should be updated to use the new syntax.
3. we should decide if we wanna change the behavior of `SET LOCATION`.

## How was this patch tested?

new tests

Author: Wenchen Fan <wenchen@databricks.com>

Closes #16296 from cloud-fan/create-table.
2017-01-05 17:40:27 -08:00
Wenchen Fan 30345c43b7 [SPARK-19058][SQL] fix partition related behaviors with DataFrameWriter.saveAsTable
## What changes were proposed in this pull request?

When we append data to a partitioned table with `DataFrameWriter.saveAsTable`, there are 2 issues:
1. doesn't work when the partition has custom location.
2. will recover all partitions

This PR fixes them by moving the special partition handling code from `DataSourceAnalysis` to `InsertIntoHadoopFsRelationCommand`, so that the `DataFrameWriter.saveAsTable` code path can also benefit from it.

## How was this patch tested?

newly added regression tests

Author: Wenchen Fan <wenchen@databricks.com>

Closes #16460 from cloud-fan/append.
2017-01-05 14:11:05 +08:00
Herman van Hovell 4262fb0d55 [SPARK-19070] Clean-up dataset actions
## What changes were proposed in this pull request?
Dataset actions currently spin off a new `Dataframe` only to track query execution. This PR simplifies this code path by using the `Dataset.queryExecution` directly. This PR also merges the typed and untyped action evaluation paths.

## How was this patch tested?
Existing tests.

Author: Herman van Hovell <hvanhovell@databricks.com>

Closes #16466 from hvanhovell/SPARK-19070.
2017-01-04 23:47:58 +08:00
Niranjan Padmanabhan a1e40b1f5d
[MINOR][DOCS] Remove consecutive duplicated words/typo in Spark Repo
## What changes were proposed in this pull request?
There are many locations in the Spark repo where the same word occurs consecutively. Sometimes they are appropriately placed, but many times they are not. This PR removes the inappropriately duplicated words.

## How was this patch tested?
N/A since only docs or comments were updated.

Author: Niranjan Padmanabhan <niranjan.padmanabhan@gmail.com>

Closes #16455 from neurons/np.structure_streaming_doc.
2017-01-04 15:07:29 +00:00
Wenchen Fan 101556d0fa [SPARK-19060][SQL] remove the supportsPartial flag in AggregateFunction
## What changes were proposed in this pull request?

Now all aggregation functions support partial aggregate, we can remove the `supportsPartual` flag in `AggregateFunction`

## How was this patch tested?

existing tests.

Author: Wenchen Fan <wenchen@databricks.com>

Closes #16461 from cloud-fan/partial.
2017-01-04 12:46:30 +01:00
gatorsmile b67b35f76b [SPARK-19048][SQL] Delete Partition Location when Dropping Managed Partitioned Tables in InMemoryCatalog
### What changes were proposed in this pull request?
The data in the managed table should be deleted after table is dropped. However, if the partition location is not under the location of the partitioned table, it is not deleted as expected. Users can specify any location for the partition when they adding a partition.

This PR is to delete partition location when dropping managed partitioned tables stored in `InMemoryCatalog`.

### How was this patch tested?
Added test cases for both HiveExternalCatalog and InMemoryCatalog

Author: gatorsmile <gatorsmile@gmail.com>

Closes #16448 from gatorsmile/unsetSerdeProp.
2017-01-03 11:43:47 -08:00
Dongjoon Hyun 7a2b5f93bc [SPARK-18877][SQL] CSVInferSchema.inferField on DecimalType should find a common type with typeSoFar
## What changes were proposed in this pull request?

CSV type inferencing causes `IllegalArgumentException` on decimal numbers with heterogeneous precisions and scales because the current logic uses the last decimal type in a **partition**. Specifically, `inferRowType`, the **seqOp** of **aggregate**, returns the last decimal type. This PR fixes it to use `findTightestCommonType`.

**decimal.csv**
```
9.03E+12
1.19E+11
```

**BEFORE**
```scala
scala> spark.read.format("csv").option("inferSchema", true).load("decimal.csv").printSchema
root
 |-- _c0: decimal(3,-9) (nullable = true)

scala> spark.read.format("csv").option("inferSchema", true).load("decimal.csv").show
16/12/16 14:32:49 ERROR Executor: Exception in task 0.0 in stage 4.0 (TID 4)
java.lang.IllegalArgumentException: requirement failed: Decimal precision 4 exceeds max precision 3
```

**AFTER**
```scala
scala> spark.read.format("csv").option("inferSchema", true).load("decimal.csv").printSchema
root
 |-- _c0: decimal(4,-9) (nullable = true)

scala> spark.read.format("csv").option("inferSchema", true).load("decimal.csv").show
+---------+
|      _c0|
+---------+
|9.030E+12|
| 1.19E+11|
+---------+
```

## How was this patch tested?

Pass the newly add test case.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #16320 from dongjoon-hyun/SPARK-18877.
2017-01-03 23:06:50 +08:00
Zhenhua Wang ae83c21125 [SPARK-18998][SQL] Add a cbo conf to switch between default statistics and estimated statistics
## What changes were proposed in this pull request?

We add a cbo configuration to switch between default stats and estimated stats.
We also define a new statistics method `planStats` in LogicalPlan with conf as its parameter, in order to pass the cbo switch and other estimation related configurations in the future. `planStats` is used on the caller sides (i.e. in Optimizer and Strategies) to make transformation decisions based on stats.

## How was this patch tested?

Add a test case using a dummy LogicalPlan.

Author: Zhenhua Wang <wzh_zju@163.com>

Closes #16401 from wzhfy/cboSwitch.
2017-01-03 12:19:52 +08:00
Dongjoon Hyun b85e29437d [SPARK-18123][SQL] Use db column names instead of RDD column ones during JDBC Writing
## What changes were proposed in this pull request?

Apache Spark supports the following cases **by quoting RDD column names** while saving through JDBC.
- Allow reserved keyword as a column name, e.g., 'order'.
- Allow mixed-case colume names like the following, e.g., `[a: int, A: int]`.

  ``` scala
  scala> val df = sql("select 1 a, 1 A")
  df: org.apache.spark.sql.DataFrame = [a: int, A: int]
  ...
  scala> df.write.mode("overwrite").format("jdbc").options(option).save()
  scala> df.write.mode("append").format("jdbc").options(option).save()
  ```

This PR aims to use **database column names** instead of RDD column ones in order to support the following additionally.
Note that this case succeeds with `MySQL`, but fails on `Postgres`/`Oracle` before.

``` scala
val df1 = sql("select 1 a")
val df2 = sql("select 1 A")
...
df1.write.mode("overwrite").format("jdbc").options(option).save()
df2.write.mode("append").format("jdbc").options(option).save()
```
## How was this patch tested?

Pass the Jenkins test with a new testcase.

Author: Dongjoon Hyun <dongjoon@apache.org>
Author: gatorsmile <gatorsmile@gmail.com>

Closes #15664 from dongjoon-hyun/SPARK-18123.
2016-12-30 10:27:14 -08:00
Dongjoon Hyun 752d9eeb9b [SPARK-19012][SQL] Fix createTempViewCommand to throw AnalysisException instead of ParseException
## What changes were proposed in this pull request?

Currently, `createTempView`, `createOrReplaceTempView`, and `createGlobalTempView` show `ParseExceptions` on invalid table names. We had better show better error message. Also, this PR also adds and updates the missing description on the API docs correctly.

**BEFORE**
```
scala> spark.range(10).createOrReplaceTempView("11111")
org.apache.spark.sql.catalyst.parser.ParseException:
mismatched input '11111' expecting {'SELECT', 'FROM', 'ADD', ...}(line 1, pos 0)

== SQL ==
11111
...
```

**AFTER**
```
scala> spark.range(10).createOrReplaceTempView("11111")
org.apache.spark.sql.AnalysisException: Invalid view name: 11111;
...
```

## How was this patch tested?

Pass the Jenkins with updated a test case.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #16427 from dongjoon-hyun/SPARK-19012.
2016-12-29 21:22:13 +01:00
Wenchen Fan 7d19b6ab7d [SPARK-18567][SQL] Simplify CreateDataSourceTableAsSelectCommand
## What changes were proposed in this pull request?

The `CreateDataSourceTableAsSelectCommand` is quite complex now, as it has a lot of work to do if the table already exists:

1. throw exception if we don't want to ignore it.
2. do some check and adjust the schema if we want to append data.
3. drop the table and create it again if we want to overwrite.

The work 2 and 3 should be done by analyzer, so that we can also apply it to hive tables.

## How was this patch tested?

existing tests.

Author: Wenchen Fan <wenchen@databricks.com>

Closes #15996 from cloud-fan/append.
2016-12-28 21:50:21 -08:00
Kazuaki Ishizaki 93f35569fd [SPARK-16213][SQL] Reduce runtime overhead of a program that creates an primitive array in DataFrame
## What changes were proposed in this pull request?

This PR reduces runtime overhead of a program the creates an primitive array in DataFrame by using the similar approach to #15044. Generated code performs boxing operation in an assignment from InternalRow to an `Object[]` temporary array (at Lines 051 and 061 in the generated code before without this PR). If we know that type of array elements is primitive, we apply the following optimizations:
1. Eliminate a pair of `isNullAt()` and a null assignment
2. Allocate an primitive array instead of `Object[]` (eliminate boxing operations)
3. Create `UnsafeArrayData` by using `UnsafeArrayWriter` to keep a primitive array in a row format instead of doing non-lightweight operations in constructor of `GenericArrayData`
The PR also performs the same things for `CreateMap`.

Here are performance results of [DataFrame programs](6bf54ec5e2/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/PrimitiveArrayBenchmark.scala (L83-L112)) by up to 17.9x over without this PR.

```
Without SPARK-16043
OpenJDK 64-Bit Server VM 1.8.0_91-b14 on Linux 4.4.11-200.fc22.x86_64
Intel Xeon E3-12xx v2 (Ivy Bridge)
Read a primitive array in DataFrame:     Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
Int                                           3805 / 4150          0.0      507308.9       1.0X
Double                                        3593 / 3852          0.0      479056.9       1.1X

With SPARK-16043
Read a primitive array in DataFrame:     Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
Int                                            213 /  271          0.0       28387.5       1.0X
Double                                         204 /  223          0.0       27250.9       1.0X
```
Note : #15780 is enabled for these measurements

An motivating example

``` java
val df = sparkContext.parallelize(Seq(0.0d, 1.0d), 1).toDF
df.selectExpr("Array(value + 1.1d, value + 2.2d)").show
```

Generated code without this PR

``` java
/* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */   private Object[] references;
/* 007 */   private scala.collection.Iterator[] inputs;
/* 008 */   private scala.collection.Iterator inputadapter_input;
/* 009 */   private UnsafeRow serializefromobject_result;
/* 010 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder serializefromobject_holder;
/* 011 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter serializefromobject_rowWriter;
/* 012 */   private Object[] project_values;
/* 013 */   private UnsafeRow project_result;
/* 014 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder project_holder;
/* 015 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter project_rowWriter;
/* 016 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter project_arrayWriter;
/* 017 */
/* 018 */   public GeneratedIterator(Object[] references) {
/* 019 */     this.references = references;
/* 020 */   }
/* 021 */
/* 022 */   public void init(int index, scala.collection.Iterator[] inputs) {
/* 023 */     partitionIndex = index;
/* 024 */     this.inputs = inputs;
/* 025 */     inputadapter_input = inputs[0];
/* 026 */     serializefromobject_result = new UnsafeRow(1);
/* 027 */     this.serializefromobject_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(serializefromobject_result, 0);
/* 028 */     this.serializefromobject_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(serializefromobject_holder, 1);
/* 029 */     this.project_values = null;
/* 030 */     project_result = new UnsafeRow(1);
/* 031 */     this.project_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(project_result, 32);
/* 032 */     this.project_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(project_holder, 1);
/* 033 */     this.project_arrayWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter();
/* 034 */
/* 035 */   }
/* 036 */
/* 037 */   protected void processNext() throws java.io.IOException {
/* 038 */     while (inputadapter_input.hasNext()) {
/* 039 */       InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
/* 040 */       double inputadapter_value = inputadapter_row.getDouble(0);
/* 041 */
/* 042 */       final boolean project_isNull = false;
/* 043 */       this.project_values = new Object[2];
/* 044 */       boolean project_isNull1 = false;
/* 045 */
/* 046 */       double project_value1 = -1.0;
/* 047 */       project_value1 = inputadapter_value + 1.1D;
/* 048 */       if (false) {
/* 049 */         project_values[0] = null;
/* 050 */       } else {
/* 051 */         project_values[0] = project_value1;
/* 052 */       }
/* 053 */
/* 054 */       boolean project_isNull4 = false;
/* 055 */
/* 056 */       double project_value4 = -1.0;
/* 057 */       project_value4 = inputadapter_value + 2.2D;
/* 058 */       if (false) {
/* 059 */         project_values[1] = null;
/* 060 */       } else {
/* 061 */         project_values[1] = project_value4;
/* 062 */       }
/* 063 */
/* 064 */       final ArrayData project_value = new org.apache.spark.sql.catalyst.util.GenericArrayData(project_values);
/* 065 */       this.project_values = null;
/* 066 */       project_holder.reset();
/* 067 */
/* 068 */       project_rowWriter.zeroOutNullBytes();
/* 069 */
/* 070 */       if (project_isNull) {
/* 071 */         project_rowWriter.setNullAt(0);
/* 072 */       } else {
/* 073 */         // Remember the current cursor so that we can calculate how many bytes are
/* 074 */         // written later.
/* 075 */         final int project_tmpCursor = project_holder.cursor;
/* 076 */
/* 077 */         if (project_value instanceof UnsafeArrayData) {
/* 078 */           final int project_sizeInBytes = ((UnsafeArrayData) project_value).getSizeInBytes();
/* 079 */           // grow the global buffer before writing data.
/* 080 */           project_holder.grow(project_sizeInBytes);
/* 081 */           ((UnsafeArrayData) project_value).writeToMemory(project_holder.buffer, project_holder.cursor);
/* 082 */           project_holder.cursor += project_sizeInBytes;
/* 083 */
/* 084 */         } else {
/* 085 */           final int project_numElements = project_value.numElements();
/* 086 */           project_arrayWriter.initialize(project_holder, project_numElements, 8);
/* 087 */
/* 088 */           for (int project_index = 0; project_index < project_numElements; project_index++) {
/* 089 */             if (project_value.isNullAt(project_index)) {
/* 090 */               project_arrayWriter.setNullDouble(project_index);
/* 091 */             } else {
/* 092 */               final double project_element = project_value.getDouble(project_index);
/* 093 */               project_arrayWriter.write(project_index, project_element);
/* 094 */             }
/* 095 */           }
/* 096 */         }
/* 097 */
/* 098 */         project_rowWriter.setOffsetAndSize(0, project_tmpCursor, project_holder.cursor - project_tmpCursor);
/* 099 */       }
/* 100 */       project_result.setTotalSize(project_holder.totalSize());
/* 101 */       append(project_result);
/* 102 */       if (shouldStop()) return;
/* 103 */     }
/* 104 */   }
/* 105 */ }
```

Generated code with this PR

``` java
/* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */   private Object[] references;
/* 007 */   private scala.collection.Iterator[] inputs;
/* 008 */   private scala.collection.Iterator inputadapter_input;
/* 009 */   private UnsafeRow serializefromobject_result;
/* 010 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder serializefromobject_holder;
/* 011 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter serializefromobject_rowWriter;
/* 012 */   private UnsafeArrayData project_arrayData;
/* 013 */   private UnsafeRow project_result;
/* 014 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder project_holder;
/* 015 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter project_rowWriter;
/* 016 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter project_arrayWriter;
/* 017 */
/* 018 */   public GeneratedIterator(Object[] references) {
/* 019 */     this.references = references;
/* 020 */   }
/* 021 */
/* 022 */   public void init(int index, scala.collection.Iterator[] inputs) {
/* 023 */     partitionIndex = index;
/* 024 */     this.inputs = inputs;
/* 025 */     inputadapter_input = inputs[0];
/* 026 */     serializefromobject_result = new UnsafeRow(1);
/* 027 */     this.serializefromobject_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(serializefromobject_result, 0);
/* 028 */     this.serializefromobject_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(serializefromobject_holder, 1);
/* 029 */
/* 030 */     project_result = new UnsafeRow(1);
/* 031 */     this.project_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(project_result, 32);
/* 032 */     this.project_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(project_holder, 1);
/* 033 */     this.project_arrayWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter();
/* 034 */
/* 035 */   }
/* 036 */
/* 037 */   protected void processNext() throws java.io.IOException {
/* 038 */     while (inputadapter_input.hasNext()) {
/* 039 */       InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
/* 040 */       double inputadapter_value = inputadapter_row.getDouble(0);
/* 041 */
/* 042 */       byte[] project_array = new byte[32];
/* 043 */       project_arrayData = new UnsafeArrayData();
/* 044 */       Platform.putLong(project_array, 16, 2);
/* 045 */       project_arrayData.pointTo(project_array, 16, 32);
/* 046 */
/* 047 */       boolean project_isNull1 = false;
/* 048 */
/* 049 */       double project_value1 = -1.0;
/* 050 */       project_value1 = inputadapter_value + 1.1D;
/* 051 */       if (false) {
/* 052 */         project_arrayData.setNullAt(0);
/* 053 */       } else {
/* 054 */         project_arrayData.setDouble(0, project_value1);
/* 055 */       }
/* 056 */
/* 057 */       boolean project_isNull4 = false;
/* 058 */
/* 059 */       double project_value4 = -1.0;
/* 060 */       project_value4 = inputadapter_value + 2.2D;
/* 061 */       if (false) {
/* 062 */         project_arrayData.setNullAt(1);
/* 063 */       } else {
/* 064 */         project_arrayData.setDouble(1, project_value4);
/* 065 */       }
/* 066 */       project_holder.reset();
/* 067 */
/* 068 */       // Remember the current cursor so that we can calculate how many bytes are
/* 069 */       // written later.
/* 070 */       final int project_tmpCursor = project_holder.cursor;
/* 071 */
/* 072 */       if (project_arrayData instanceof UnsafeArrayData) {
/* 073 */         final int project_sizeInBytes = ((UnsafeArrayData) project_arrayData).getSizeInBytes();
/* 074 */         // grow the global buffer before writing data.
/* 075 */         project_holder.grow(project_sizeInBytes);
/* 076 */         ((UnsafeArrayData) project_arrayData).writeToMemory(project_holder.buffer, project_holder.cursor);
/* 077 */         project_holder.cursor += project_sizeInBytes;
/* 078 */
/* 079 */       } else {
/* 080 */         final int project_numElements = project_arrayData.numElements();
/* 081 */         project_arrayWriter.initialize(project_holder, project_numElements, 8);
/* 082 */
/* 083 */         for (int project_index = 0; project_index < project_numElements; project_index++) {
/* 084 */           if (project_arrayData.isNullAt(project_index)) {
/* 085 */             project_arrayWriter.setNullDouble(project_index);
/* 086 */           } else {
/* 087 */             final double project_element = project_arrayData.getDouble(project_index);
/* 088 */             project_arrayWriter.write(project_index, project_element);
/* 089 */           }
/* 090 */         }
/* 091 */       }
/* 092 */
/* 093 */       project_rowWriter.setOffsetAndSize(0, project_tmpCursor, project_holder.cursor - project_tmpCursor);
/* 094 */       project_result.setTotalSize(project_holder.totalSize());
/* 095 */       append(project_result);
/* 096 */       if (shouldStop()) return;
/* 097 */     }
/* 098 */   }
/* 099 */ }
```
## How was this patch tested?

Added unit tests into `DataFrameComplexTypeSuite`

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>
Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #13909 from kiszk/SPARK-16213.
2016-12-29 10:59:37 +08:00
Carson Wang 2a5f52a714
[MINOR][DOC] Fix doc of ForeachWriter to use writeStream
## What changes were proposed in this pull request?

Fix the document of `ForeachWriter` to use `writeStream` instead of `write` for a streaming dataset.

## How was this patch tested?
Docs only.

Author: Carson Wang <carson.wang@intel.com>

Closes #16419 from carsonwang/FixDoc.
2016-12-28 12:12:44 +00:00
uncleGen 76e9bd7488
[SPARK-18960][SQL][SS] Avoid double reading file which is being copied.
## What changes were proposed in this pull request?

In HDFS, when we copy a file into target directory, there will a temporary `._COPY_` file for a period of time. The duration depends on file size. If we do not skip this file, we will may read the same data for two times.

## How was this patch tested?
update unit test

Author: uncleGen <hustyugm@gmail.com>

Closes #16370 from uncleGen/SPARK-18960.
2016-12-28 10:42:47 +00:00
gatorsmile 5ac62043cf [SPARK-18992][SQL] Move spark.sql.hive.thriftServer.singleSession to SQLConf
### What changes were proposed in this pull request?

Since `spark.sql.hive.thriftServer.singleSession` is a configuration of SQL component, this conf can be moved from `SparkConf` to `StaticSQLConf`.

When we introduced `spark.sql.hive.thriftServer.singleSession`, all the SQL configuration are session specific. They can be modified in different sessions.

In Spark 2.1, static SQL configuration is added. It is a perfect fit for `spark.sql.hive.thriftServer.singleSession`. Previously, we did the same move for `spark.sql.warehouse.dir` from `SparkConf` to `StaticSQLConf`

### How was this patch tested?
Added test cases in HiveThriftServer2Suites.scala

Author: gatorsmile <gatorsmile@gmail.com>

Closes #16392 from gatorsmile/hiveThriftServerSingleSession.
2016-12-28 10:16:22 +08:00
Wenchen Fan 8a7db8a608 [SPARK-18980][SQL] implement Aggregator with TypedImperativeAggregate
## What changes were proposed in this pull request?

Currently we implement `Aggregator` with `DeclarativeAggregate`, which will serialize/deserialize the buffer object every time we process an input.

This PR implements `Aggregator` with `TypedImperativeAggregate` and avoids to serialize/deserialize buffer object many times. The benchmark shows we get about 2 times speed up.

For simple buffer object that doesn't need serialization, we still go with `DeclarativeAggregate`, to avoid performance regression.

## How was this patch tested?

N/A

Author: Wenchen Fan <wenchen@databricks.com>

Closes #16383 from cloud-fan/aggregator.
2016-12-26 22:10:20 +08:00
hyukjinkwon d6cbec7598 [SPARK-18943][SQL] Avoid per-record type dispatch in CSV when reading
## What changes were proposed in this pull request?

`CSVRelation.csvParser` does type dispatch for each value in each row. We can prevent this because the schema is already kept in `CSVRelation`.

So, this PR proposes that converters are created first according to the schema, and then apply them to each.

I just ran some small benchmarks as below after resembling the logics in 7c33b0fd05/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala (L170-L178) to test the updated logics.

```scala
test("Benchmark for CSV converter") {
  var numMalformedRecords = 0
  val N = 500 << 12
  val schema = StructType(
    StructField("a", StringType) ::
    StructField("b", StringType) ::
    StructField("c", StringType) ::
    StructField("d", StringType) :: Nil)

  val row = Array("1.0", "test", "2015-08-20 14:57:00", "FALSE")
  val data = spark.sparkContext.parallelize(List.fill(N)(row))
  val parser = CSVRelation.csvParser(schema, schema.fieldNames, CSVOptions())

  val benchmark = new Benchmark("CSV converter", N)
  benchmark.addCase("cast CSV string tokens", 10) { _ =>
    data.flatMap { recordTokens =>
      parser(recordTokens, numMalformedRecords)
    }.collect()
  }
  benchmark.run()
}
```

**Before**

```
CSV converter:                           Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
cast CSV string tokens                        1061 / 1130          1.9         517.9       1.0X
```

**After**

```
CSV converter:                           Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
cast CSV string tokens                         940 / 1011          2.2         459.2       1.0X
```

## How was this patch tested?

Tests in `CSVTypeCastSuite` and `CSVRelation`

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #16351 from HyukjinKwon/type-dispatch.
2016-12-24 23:28:34 +08:00
Liang-Chi Hsieh 07fcbea516
[SPARK-18800][SQL] Correct the assert in UnsafeKVExternalSorter which ensures array size
## What changes were proposed in this pull request?

`UnsafeKVExternalSorter` uses `UnsafeInMemorySorter` to sort the records of `BytesToBytesMap` if it is given a map.

Currently we use the number of keys in `BytesToBytesMap` to determine if the array used for sort is enough or not. We has an assert that ensures the size of the array is enough: `map.numKeys() <= map.getArray().size() / 2`.

However, each record in the map takes two entries in the array, one is record pointer, another is key prefix. So the correct assert should be `map.numKeys() * 2 <= map.getArray().size() / 2`.

## How was this patch tested?

N/A

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #16232 from viirya/SPARK-18800-fix-UnsafeKVExternalSorter.
2016-12-24 12:05:49 +00:00
wangzhenhua 3cff816157 [SPARK-18911][SQL] Define CatalogStatistics to interact with metastore and convert it to Statistics in relations
## What changes were proposed in this pull request?

Statistics in LogicalPlan should use attributes to refer to columns rather than column names, because two columns from two relations can have the same column name. But CatalogTable doesn't have the concepts of attribute or broadcast hint in Statistics. Therefore, putting Statistics in CatalogTable is confusing.

We define a different statistic structure in CatalogTable, which is only responsible for interacting with metastore, and is converted to statistics in LogicalPlan when it is used.

## How was this patch tested?

add test cases

Author: wangzhenhua <wangzhenhua@huawei.com>
Author: Zhenhua Wang <wzh_zju@163.com>

Closes #16323 from wzhfy/nameToAttr.
2016-12-24 15:34:44 +08:00
Shixiong Zhu 2246ce88ae [SPARK-18985][SS] Add missing @InterfaceStability.Evolving for Structured Streaming APIs
## What changes were proposed in this pull request?

Add missing InterfaceStability.Evolving for Structured Streaming APIs

## How was this patch tested?

Compiling the codes.

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #16385 from zsxwing/SPARK-18985.
2016-12-22 16:21:09 -08:00
Reynold Xin 2615100055 [SPARK-18973][SQL] Remove SortPartitions and RedistributeData
## What changes were proposed in this pull request?
SortPartitions and RedistributeData logical operators are not actually used and can be removed. Note that we do have a Sort operator (with global flag false) that subsumed SortPartitions.

## How was this patch tested?
Also updated test cases to reflect the removal.

Author: Reynold Xin <rxin@databricks.com>

Closes #16381 from rxin/SPARK-18973.
2016-12-22 19:35:09 +01:00
hyukjinkwon 76622c661f [SPARK-16975][SQL][FOLLOWUP] Do not duplicately check file paths in data sources implementing FileFormat
## What changes were proposed in this pull request?

This PR cleans up duplicated checking for file paths in implemented data sources and prevent to attempt to list twice in ORC data source.

https://github.com/apache/spark/pull/14585 handles a problem for the partition column name having `_` and the issue itself is resolved correctly. However, it seems the data sources implementing `FileFormat` are validating the paths duplicately. Assuming from the comment in `CSVFileFormat`, `// TODO: Move filtering.`, I guess we don't have to check this duplicately.

   Currently, this seems being filtered in `PartitioningAwareFileIndex.shouldFilterOut` and`PartitioningAwareFileIndex.isDataPath`. So, `FileFormat.inferSchema` will always receive leaf files. For example, running to codes below:

   ``` scala
   spark.range(10).withColumn("_locality_code", $"id").write.partitionBy("_locality_code").save("/tmp/parquet")
   spark.read.parquet("/tmp/parquet")
   ```

   gives the paths below without directories but just valid data files:

   ``` bash
   /tmp/parquet/_col=0/part-r-00000-094a8efa-bece-4b50-b54c-7918d1f7b3f8.snappy.parquet
   /tmp/parquet/_col=1/part-r-00000-094a8efa-bece-4b50-b54c-7918d1f7b3f8.snappy.parquet
   /tmp/parquet/_col=2/part-r-00000-25de2b50-225a-4bcf-a2bc-9eb9ed407ef6.snappy.parquet
   ...
   ```

   to `FileFormat.inferSchema`.

## How was this patch tested?

Unit test added in `HadoopFsRelationTest` and related existing tests.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #14627 from HyukjinKwon/SPARK-16975.
2016-12-22 10:00:20 -08:00
Reynold Xin 2e861df96e [DOC] bucketing is applicable to all file-based data sources
## What changes were proposed in this pull request?
Starting Spark 2.1.0, bucketing feature is available for all file-based data sources. This patch fixes some function docs that haven't yet been updated to reflect that.

## How was this patch tested?
N/A

Author: Reynold Xin <rxin@databricks.com>

Closes #16349 from rxin/ds-doc.
2016-12-21 23:46:33 -08:00
Reynold Xin 7c5b7b3a2e [SQL] Minor readability improvement for partition handling code
## What changes were proposed in this pull request?
This patch includes minor changes to improve readability for partition handling code. I'm in the middle of implementing some new feature and found some naming / implicit type inference not as intuitive.

## How was this patch tested?
This patch should have no semantic change and the changes should be covered by existing test cases.

Author: Reynold Xin <rxin@databricks.com>

Closes #16378 from rxin/minor-fix.
2016-12-22 15:29:56 +08:00
Shixiong Zhu ff7d82a207 [SPARK-18908][SS] Creating StreamingQueryException should check if logicalPlan is created
## What changes were proposed in this pull request?

This PR audits places using `logicalPlan` in StreamExecution and ensures they all handles the case that `logicalPlan` cannot be created.

In addition, this PR also fixes the following issues in `StreamingQueryException`:
- `StreamingQueryException` and `StreamExecution` are cycle-dependent because in the `StreamingQueryException`'s constructor, it calls `StreamExecution`'s `toDebugString` which uses `StreamingQueryException`. Hence it will output `null` value in the error message.
- Duplicated stack trace when calling Throwable.printStackTrace because StreamingQueryException's toString contains the stack trace.

## How was this patch tested?

The updated `test("max files per trigger - incorrect values")`. I found this issue when I switched from `testStream` to the real codes to verify the failure in this test.

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #16322 from zsxwing/SPARK-18907.
2016-12-21 22:02:57 -08:00
Takeshi YAMAMURO b41ec99778 [SPARK-18528][SQL] Fix a bug to initialise an iterator of aggregation buffer
## What changes were proposed in this pull request?
This pr is to fix an `NullPointerException` issue caused by a following `limit + aggregate` query;
```
scala> val df = Seq(("a", 1), ("b", 2), ("c", 1), ("d", 5)).toDF("id", "value")
scala> df.limit(2).groupBy("id").count().show
WARN TaskSetManager: Lost task 0.0 in stage 9.0 (TID 8204, lvsp20hdn012.stubprod.com): java.lang.NullPointerException
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
```
The root culprit is that [`$doAgg()`](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala#L596) skips an initialization of [the buffer iterator](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala#L603); `BaseLimitExec` sets `stopEarly=true` and `$doAgg()` exits in the middle without the initialization.

## How was this patch tested?
Added a test to check if no exception happens for limit + aggregates in `DataFrameAggregateSuite.scala`.

Author: Takeshi YAMAMURO <linguin.m.s@gmail.com>

Closes #15980 from maropu/SPARK-18528.
2016-12-22 01:53:33 +01:00
Tathagata Das 83a6ace0d1 [SPARK-18234][SS] Made update mode public
## What changes were proposed in this pull request?

Made update mode public. As part of that here are the changes.
- Update DatastreamWriter to accept "update"
- Changed package of InternalOutputModes from o.a.s.sql to o.a.s.sql.catalyst
- Added update mode state removing with watermark to StateStoreSaveExec

## How was this patch tested?

Added new tests in changed modules

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #16360 from tdas/SPARK-18234.
2016-12-21 16:43:17 -08:00
Reynold Xin 354e936187 [SPARK-18775][SQL] Limit the max number of records written per file
## What changes were proposed in this pull request?
Currently, Spark writes a single file out per task, sometimes leading to very large files. It would be great to have an option to limit the max number of records written per file in a task, to avoid humongous files.

This patch introduces a new write config option `maxRecordsPerFile` (default to a session-wide setting `spark.sql.files.maxRecordsPerFile`) that limits the max number of records written to a single file. A non-positive value indicates there is no limit (same behavior as not having this flag).

## How was this patch tested?
Added test cases in PartitionedWriteSuite for both dynamic partition insert and non-dynamic partition insert.

Author: Reynold Xin <rxin@databricks.com>

Closes #16204 from rxin/SPARK-18775.
2016-12-21 23:50:35 +01:00
Tathagata Das 607a1e63db [SPARK-18894][SS] Fix event time watermark delay threshold specified in months or years
## What changes were proposed in this pull request?

Two changes
- Fix how delays specified in months and years are translated to milliseconds
- Following up on #16258, not show watermark when there is no watermarking in the query

## How was this patch tested?
Updated and new unit tests

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #16304 from tdas/SPARK-18834-1.
2016-12-21 10:44:20 -08:00
Wenchen Fan b7650f11c7 [SPARK-18947][SQL] SQLContext.tableNames should not call Catalog.listTables
## What changes were proposed in this pull request?

It's a huge waste to call `Catalog.listTables` in `SQLContext.tableNames`, which only need the table names, while `Catalog.listTables` will get the table metadata for each table name.

## How was this patch tested?

N/A

Author: Wenchen Fan <wenchen@databricks.com>

Closes #16352 from cloud-fan/minor.
2016-12-21 19:39:00 +08:00
gatorsmile 24c0c94128 [SPARK-18949][SQL] Add recoverPartitions API to Catalog
### What changes were proposed in this pull request?

Currently, we only have a SQL interface for recovering all the partitions in the directory of a table and update the catalog. `MSCK REPAIR TABLE` or `ALTER TABLE table RECOVER PARTITIONS`. (Actually, very hard for me to remember `MSCK` and have no clue what it means)

After the new "Scalable Partition Handling", the table repair becomes much more important for making visible the data in the created data source partitioned table.

Thus, this PR is to add it into the Catalog interface. After this PR, users can repair the table by
```Scala
spark.catalog.recoverPartitions("testTable")
```

### How was this patch tested?
Modified the existing test cases.

Author: gatorsmile <gatorsmile@gmail.com>

Closes #16356 from gatorsmile/repairTable.
2016-12-20 23:40:02 -08:00
Burak Yavuz caed89321f [SPARK-18927][SS] MemorySink for StructuredStreaming can't recover from checkpoint if location is provided in SessionConf
## What changes were proposed in this pull request?

Checkpoint Location can be defined for a StructuredStreaming on a per-query basis by the `DataStreamWriter` options, but it can also be provided through SparkSession configurations. It should be able to recover in both cases when the OutputMode is Complete for MemorySinks.

## How was this patch tested?

Unit tests

Author: Burak Yavuz <brkyvz@gmail.com>

Closes #16342 from brkyvz/chk-rec.
2016-12-20 14:19:35 -08:00
Reynold Xin 150d26cad4 Tiny style improvement. 2016-12-19 22:50:23 -08:00
Wenchen Fan f923c849e5 [SPARK-18899][SPARK-18912][SPARK-18913][SQL] refactor the error checking when append data to an existing table
## What changes were proposed in this pull request?

When we append data to an existing table with `DataFrameWriter.saveAsTable`, we will do various checks to make sure the appended data is consistent with the existing data.

However, we get the information of the existing table by matching the table relation, instead of looking at the table metadata. This is error-prone, e.g. we only check the number of columns for `HadoopFsRelation`, we forget to check bucketing, etc.

This PR refactors the error checking by looking at the metadata of the existing table, and fix several bugs:
* SPARK-18899: We forget to check if the specified bucketing matched the existing table, which may lead to a problematic table that has different bucketing in different data files.
* SPARK-18912: We forget to check the number of columns for non-file-based data source table
* SPARK-18913: We don't support append data to a table with special column names.

## How was this patch tested?
new regression test.

Author: Wenchen Fan <wenchen@databricks.com>

Closes #16313 from cloud-fan/bug1.
2016-12-19 20:03:33 -08:00
Josh Rosen 5857b9ac2d [SPARK-18928] Check TaskContext.isInterrupted() in FileScanRDD, JDBCRDD & UnsafeSorter
## What changes were proposed in this pull request?

In order to respond to task cancellation, Spark tasks must periodically check `TaskContext.isInterrupted()`, but this check is missing on a few critical read paths used in Spark SQL, including `FileScanRDD`, `JDBCRDD`, and UnsafeSorter-based sorts. This can cause interrupted / cancelled tasks to continue running and become zombies (as also described in #16189).

This patch aims to fix this problem by adding `TaskContext.isInterrupted()` checks to these paths. Note that I could have used `InterruptibleIterator` to simply wrap a bunch of iterators but in some cases this would have an adverse performance penalty or might not be effective due to certain special uses of Iterators in Spark SQL. Instead, I inlined `InterruptibleIterator`-style logic into existing iterator subclasses.

## How was this patch tested?

Tested manually in `spark-shell` with two different reproductions of non-cancellable tasks, one involving scans of huge files and another involving sort-merge joins that spill to disk. Both causes of zombie tasks are fixed by the changes added here.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #16340 from JoshRosen/sql-task-interruption.
2016-12-20 01:19:38 +01:00
hyukjinkwon ed84cd0684
[MINOR][BUILD] Fix lint-check failures and javadoc8 break
## What changes were proposed in this pull request?

This PR proposes to fix lint-check failures and javadoc8 break.

Few errors were introduced as below:

**lint-check failures**

```
[ERROR] src/test/java/org/apache/spark/network/TransportClientFactorySuite.java:[45,1] (imports) RedundantImport: Duplicate import to line 43 - org.apache.spark.network.util.MapConfigProvider.
[ERROR] src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java:[255,10] (modifier) RedundantModifier: Redundant 'final' modifier.
```

**javadoc8**

```
[error] .../spark/sql/core/target/java/org/apache/spark/sql/streaming/StreamingQueryProgress.java:19: error: bad use of '>'
[error]  *                   "max" -> "2016-12-05T20:54:20.827Z"  // maximum event time seen in this trigger
[error]                             ^
[error] .../spark/sql/core/target/java/org/apache/spark/sql/streaming/StreamingQueryProgress.java:20: error: bad use of '>'
[error]  *                   "min" -> "2016-12-05T20:54:20.827Z"  // minimum event time seen in this trigger
[error]                             ^
[error] .../spark/sql/core/target/java/org/apache/spark/sql/streaming/StreamingQueryProgress.java:21: error: bad use of '>'
[error]  *                   "avg" -> "2016-12-05T20:54:20.827Z"  // average event time seen in this trigger
[error]                             ^
[error] .../spark/sql/core/target/java/org/apache/spark/sql/streaming/StreamingQueryProgress.java:22: error: bad use of '>'
[error]  *                   "watermark" -> "2016-12-05T20:54:20.827Z"  // watermark used in this trigger
[error]
```

## How was this patch tested?

Manually checked as below:

**lint-check failures**

```
./dev/lint-java
Checkstyle checks passed.
```

**javadoc8**

This seems hidden in the API doc but I manually checked after removing access modifier as below:

It looks not rendering properly (scaladoc).

![2016-12-16 3 40 34](https://cloud.githubusercontent.com/assets/6477701/21255175/8df1fe6e-c3ad-11e6-8cda-ce7f76c6677a.png)

After this PR, it renders as below:

- scaladoc
  ![2016-12-16 3 40 23](https://cloud.githubusercontent.com/assets/6477701/21255135/4a11dab6-c3ad-11e6-8ab2-b091c4f45029.png)

- javadoc
  ![2016-12-16 3 41 10](https://cloud.githubusercontent.com/assets/6477701/21255137/4bba1d9c-c3ad-11e6-9b88-62f1f697b56a.png)

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #16307 from HyukjinKwon/lint-javadoc8.
2016-12-16 17:49:43 +00:00
Takeshi YAMAMURO dc2a4d4ad4 [SPARK-18108][SQL] Fix a schema inconsistent bug that makes a parquet reader fail to read data
## What changes were proposed in this pull request?
A vectorized parquet reader fails to read column data if data schema and partition schema overlap with each other and inferred types in the partition schema differ from ones in the data schema. An example code to reproduce this bug is as follows;

```
scala> case class A(a: Long, b: Int)
scala> val as = Seq(A(1, 2))
scala> spark.createDataFrame(as).write.parquet("/data/a=1/")
scala> val df = spark.read.parquet("/data/")
scala> df.printSchema
root
 |-- a: long (nullable = true)
 |-- b: integer (nullable = true)
scala> df.collect
java.lang.NullPointerException
        at org.apache.spark.sql.execution.vectorized.OnHeapColumnVector.getLong(OnHeapColumnVector.java:283)
        at org.apache.spark.sql.execution.vectorized.ColumnarBatch$Row.getLong(ColumnarBatch.java:191)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
```
The root cause is that a logical layer (`HadoopFsRelation`) and a physical layer (`VectorizedParquetRecordReader`) have a different assumption on partition schema; the logical layer trusts the data schema to infer the type the overlapped partition columns, and, on the other hand, the physical layer trusts partition schema which is inferred from path string. To fix this bug, this pr simply updates `HadoopFsRelation.schema` to respect the partition columns position in data schema and respect the partition columns type in partition schema.

## How was this patch tested?
Add tests in `ParquetPartitionDiscoverySuite`

Author: Takeshi YAMAMURO <linguin.m.s@gmail.com>

Closes #16030 from maropu/SPARK-18108.
2016-12-16 22:44:42 +08:00
Shixiong Zhu d7f3058e17 [SPARK-18850][SS] Make StreamExecution and progress classes serializable
## What changes were proposed in this pull request?

This PR adds StreamingQueryWrapper to make StreamExecution and progress classes serializable because it is too easy for it to get captured with normal usage. If StreamingQueryWrapper gets captured in a closure but no place calls its methods, it should not fail the Spark tasks. However if its methods are called, then this PR will throw a better message.

## How was this patch tested?

`test("StreamingQuery should be Serializable but cannot be used in executors")`
`test("progress classes should be Serializable")`

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #16272 from zsxwing/SPARK-18850.
2016-12-16 00:42:39 -08:00
Shixiong Zhu 68a6dc974b [SPARK-18826][SS] Add 'latestFirst' option to FileStreamSource
## What changes were proposed in this pull request?

When starting a stream with a lot of backfill and maxFilesPerTrigger, the user could often want to start with most recent files first. This would let you keep low latency for recent data and slowly backfill historical data.

This PR adds a new option `latestFirst` to control this behavior. When it's true, `FileStreamSource` will sort the files by the modified time from latest to oldest, and take the first `maxFilesPerTrigger` files as a new batch.

## How was this patch tested?

The added test.

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #16251 from zsxwing/newest-first.
2016-12-15 13:17:51 -08:00
jiangxingbo 01e14bf303 [SPARK-17910][SQL] Allow users to update the comment of a column
## What changes were proposed in this pull request?

Right now, once a user set the comment of a column with create table command, he/she cannot update the comment. It will be useful to provide a public interface (e.g. SQL) to do that.

This PR implements the following SQL statement:
```
ALTER TABLE table [PARTITION partition_spec]
CHANGE [COLUMN] column_old_name column_new_name column_dataType
[COMMENT column_comment]
[FIRST | AFTER column_name];
```

For further expansion, we could support alter `name`/`dataType`/`index` of a column too.

## How was this patch tested?

Add new test cases in `ExternalCatalogSuite` and `SessionCatalogSuite`.
Add sql file test for `ALTER TABLE CHANGE COLUMN` statement.

Author: jiangxingbo <jiangxb1987@gmail.com>

Closes #15717 from jiangxb1987/change-column.
2016-12-15 10:09:42 -08:00
Wenchen Fan d6f11a12a1 [SPARK-18856][SQL] non-empty partitioned table should not report zero size
## What changes were proposed in this pull request?

In `DataSource`, if the table is not analyzed, we will use 0 as the default value for table size. This is dangerous, we may broadcast a large table and cause OOM. We should use `defaultSizeInBytes` instead.

## How was this patch tested?

new regression test

Author: Wenchen Fan <wenchen@databricks.com>

Closes #16280 from cloud-fan/bug.
2016-12-14 21:03:56 -08:00
Reynold Xin ffdd1fcd1e [SPARK-18854][SQL] numberedTreeString and apply(i) inconsistent for subqueries
## What changes were proposed in this pull request?
This is a bug introduced by subquery handling. numberedTreeString (which uses generateTreeString under the hood) numbers trees including innerChildren (used to print subqueries), but apply (which uses getNodeNumbered) ignores innerChildren. As a result, apply(i) would return the wrong plan node if there are subqueries.

This patch fixes the bug.

## How was this patch tested?
Added a test case in SubquerySuite.scala to test both the depth-first traversal of numbering as well as making sure the two methods are consistent.

Author: Reynold Xin <rxin@databricks.com>

Closes #16277 from rxin/SPARK-18854.
2016-12-14 16:12:14 -08:00
Shixiong Zhu 1ac6567bdb [SPARK-18852][SS] StreamingQuery.lastProgress should be null when recentProgress is empty
## What changes were proposed in this pull request?

Right now `StreamingQuery.lastProgress` throws NoSuchElementException and it's hard to be used in Python since Python user will just see Py4jError.

This PR just makes it return null instead.

## How was this patch tested?

`test("lastProgress should be null when recentProgress is empty")`

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #16273 from zsxwing/SPARK-18852.
2016-12-14 13:36:41 -08:00
hyukjinkwon 89ae26dcdb [SPARK-18753][SQL] Keep pushed-down null literal as a filter in Spark-side post-filter for FileFormat datasources
## What changes were proposed in this pull request?

Currently, `FileSourceStrategy` does not handle the case when the pushed-down filter is `Literal(null)` and removes it at the post-filter in Spark-side.

For example, the codes below:

```scala
val df = Seq(Tuple1(Some(true)), Tuple1(None), Tuple1(Some(false))).toDF()
df.filter($"_1" === "true").explain(true)
```

shows it keeps `null` properly.

```
== Parsed Logical Plan ==
'Filter ('_1 = true)
+- LocalRelation [_1#17]

== Analyzed Logical Plan ==
_1: boolean
Filter (cast(_1#17 as double) = cast(true as double))
+- LocalRelation [_1#17]

== Optimized Logical Plan ==
Filter (isnotnull(_1#17) && null)
+- LocalRelation [_1#17]

== Physical Plan ==
*Filter (isnotnull(_1#17) && null)       << Here `null` is there
+- LocalTableScan [_1#17]
```

However, when we read it back from Parquet,

```scala
val path = "/tmp/testfile"
df.write.parquet(path)
spark.read.parquet(path).filter($"_1" === "true").explain(true)
```

`null` is removed at the post-filter.

```
== Parsed Logical Plan ==
'Filter ('_1 = true)
+- Relation[_1#11] parquet

== Analyzed Logical Plan ==
_1: boolean
Filter (cast(_1#11 as double) = cast(true as double))
+- Relation[_1#11] parquet

== Optimized Logical Plan ==
Filter (isnotnull(_1#11) && null)
+- Relation[_1#11] parquet

== Physical Plan ==
*Project [_1#11]
+- *Filter isnotnull(_1#11)       << Here `null` is missing
   +- *FileScan parquet [_1#11] Batched: true, Format: ParquetFormat, Location: InMemoryFileIndex[file:/tmp/testfile], PartitionFilters: [null], PushedFilters: [IsNotNull(_1)], ReadSchema: struct<_1:boolean>
```

This PR fixes it to keep it properly. In more details,

```scala
val partitionKeyFilters =
  ExpressionSet(normalizedFilters.filter(_.references.subsetOf(partitionSet)))
```

This keeps this `null` in `partitionKeyFilters` as `Literal` always don't have `children` and `references` is being empty  which is always the subset of `partitionSet`.

And then in

```scala
val afterScanFilters = filterSet -- partitionKeyFilters
```

`null` is always removed from the post filter. So, if the referenced fields are empty, it should be applied into data columns too.

After this PR, it becomes as below:

```
== Parsed Logical Plan ==
'Filter ('_1 = true)
+- Relation[_1#276] parquet

== Analyzed Logical Plan ==
_1: boolean
Filter (cast(_1#276 as double) = cast(true as double))
+- Relation[_1#276] parquet

== Optimized Logical Plan ==
Filter (isnotnull(_1#276) && null)
+- Relation[_1#276] parquet

== Physical Plan ==
*Project [_1#276]
+- *Filter (isnotnull(_1#276) && null)
   +- *FileScan parquet [_1#276] Batched: true, Format: ParquetFormat, Location: InMemoryFileIndex[file:/private/var/folders/9j/gf_c342d7d150mwrxvkqnc180000gn/T/spark-a5d59bdb-5b..., PartitionFilters: [null], PushedFilters: [IsNotNull(_1)], ReadSchema: struct<_1:boolean>
```

## How was this patch tested?

Unit test in `FileSourceStrategySuite`

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #16184 from HyukjinKwon/SPARK-18753.
2016-12-14 11:29:11 -08:00
Wenchen Fan 3e307b4959 [SPARK-18566][SQL] remove OverwriteOptions
## What changes were proposed in this pull request?

`OverwriteOptions` was introduced in https://github.com/apache/spark/pull/15705, to carry the information of static partitions. However, after further refactor, this information becomes duplicated and we can remove `OverwriteOptions`.

## How was this patch tested?

N/A

Author: Wenchen Fan <wenchen@databricks.com>

Closes #15995 from cloud-fan/overwrite.
2016-12-14 11:30:34 +08:00