Commit graph

4461 commits

Author SHA1 Message Date
guoxiaolong 42a1a15d73 [SPARK-22999][SQL] show databases like command' can remove the like keyword
## What changes were proposed in this pull request?

SHOW DATABASES (LIKE pattern = STRING)? Can be like the back increase?
When using this command, LIKE keyword can be removed.
You can refer to the SHOW TABLES command, SHOW TABLES 'test *' and SHOW TABELS like 'test *' can be used.
Similarly SHOW DATABASES 'test *' and SHOW DATABASES like 'test *' can be used.

## How was this patch tested?
unit tests   manual tests
Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: guoxiaolong <guo.xiaolong1@zte.com.cn>

Closes #20194 from guoxiaolongzte/SPARK-22999.
2018-01-15 02:02:49 +08:00
Takeshi Yamamuro 0066d6f6fa [SPARK-21213][SQL][FOLLOWUP] Use compatible types for comparisons in compareAndGetNewStats
## What changes were proposed in this pull request?
This pr fixed code to compare values in `compareAndGetNewStats`.
The test below fails in the current master;
```
    val oldStats2 = CatalogStatistics(sizeInBytes = BigInt(Long.MaxValue) * 2)
    val newStats5 = CommandUtils.compareAndGetNewStats(
      Some(oldStats2), newTotalSize = BigInt(Long.MaxValue) * 2, None)
    assert(newStats5.isEmpty)
```

## How was this patch tested?
Added some tests in `CommandUtilsSuite`.

Author: Takeshi Yamamuro <yamamuro@apache.org>

Closes #20245 from maropu/SPARK-21213-FOLLOWUP.
2018-01-14 05:39:38 +08:00
CodingCat ba891ec993 [SPARK-22790][SQL] add a configurable factor to describe HadoopFsRelation's size
## What changes were proposed in this pull request?

as per discussion in https://github.com/apache/spark/pull/19864#discussion_r156847927

the current HadoopFsRelation is purely based on the underlying file size which is not accurate and makes the execution vulnerable to errors like OOM

Users can enable CBO with the functionalities in https://github.com/apache/spark/pull/19864 to avoid this issue

This JIRA proposes to add a configurable factor to sizeInBytes method in HadoopFsRelation class so that users can mitigate this problem without CBO

## How was this patch tested?

Existing tests

Author: CodingCat <zhunansjtu@gmail.com>
Author: Nan Zhu <nanzhu@uber.com>

Closes #20072 from CodingCat/SPARK-22790.
2018-01-14 02:36:32 +08:00
xubo245 bd4a21b482 [SPARK-23036][SQL][TEST] Add withGlobalTempView for testing
## What changes were proposed in this pull request?

Add withGlobalTempView when create global temp view, like withTempView and withView.
And correct some improper usage.
Please see jira.
There are other similar place like that. I will fix it if community need. Please confirm it.
## How was this patch tested?

no new test.

Author: xubo245 <601450868@qq.com>

Closes #20228 from xubo245/DropTempView.
2018-01-14 02:28:57 +08:00
Sameer Agarwal 55dbfbca37 Revert "[SPARK-22908] Add kafka source and sink for continuous processing."
This reverts commit 6f7aaed805.
2018-01-12 15:00:00 -08:00
Marco Gaido 54277398af [SPARK-22975][SS] MetricsReporter should not throw exception when there was no progress reported
## What changes were proposed in this pull request?

`MetricsReporter ` assumes that there has been some progress for the query, ie. `lastProgress` is not null. If this is not true, as it might happen in particular conditions, a `NullPointerException` can be thrown.

The PR checks whether there is a `lastProgress` and if this is not true, it returns a default value for the metrics.

## How was this patch tested?

added UT

Author: Marco Gaido <marcogaido91@gmail.com>

Closes #20189 from mgaido91/SPARK-22975.
2018-01-12 11:25:37 -08:00
Dongjoon Hyun 7bd14cfd40 [MINOR][BUILD] Fix Java linter errors
## What changes were proposed in this pull request?

This PR cleans up the java-lint errors (for v2.3.0-rc1 tag). Hopefully, this will be the final one.

```
$ dev/lint-java
Using `mvn` from path: /usr/local/bin/mvn
Checkstyle checks failed at following occurrences:
[ERROR] src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java:[85] (sizes) LineLength: Line is longer than 100 characters (found 101).
[ERROR] src/main/java/org/apache/spark/launcher/InProcessAppHandle.java:[20,8] (imports) UnusedImports: Unused import - java.io.IOException.
[ERROR] src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java:[41,9] (modifier) ModifierOrder: 'private' modifier out of order with the JLS suggestions.
[ERROR] src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java:[464] (sizes) LineLength: Line is longer than 100 characters (found 102).
```

## How was this patch tested?

Manual.

```
$ dev/lint-java
Using `mvn` from path: /usr/local/bin/mvn
Checkstyle checks passed.
```

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #20242 from dongjoon-hyun/fix_lint_java_2.3_rc1.
2018-01-12 10:18:42 -08:00
gatorsmile 651f76153f [SPARK-23028] Bump master branch version to 2.4.0-SNAPSHOT
## What changes were proposed in this pull request?
This patch bumps the master branch version to `2.4.0-SNAPSHOT`.

## How was this patch tested?
N/A

Author: gatorsmile <gatorsmile@gmail.com>

Closes #20222 from gatorsmile/bump24.
2018-01-13 00:37:59 +08:00
Marco Gaido 5050868069 [SPARK-23025][SQL] Support Null type in scala reflection
## What changes were proposed in this pull request?

Add support for `Null` type in the `schemaFor` method for Scala reflection.

## How was this patch tested?

Added UT

Author: Marco Gaido <marcogaido91@gmail.com>

Closes #20219 from mgaido91/SPARK-23025.
2018-01-12 18:04:44 +08:00
Jose Torres 6f7aaed805 [SPARK-22908] Add kafka source and sink for continuous processing.
## What changes were proposed in this pull request?

Add kafka source and sink for continuous processing. This involves two small changes to the execution engine:

* Bring data reader close() into the normal data reader thread to avoid thread safety issues.
* Fix up the semantics of the RECONFIGURING StreamExecution state. State updates are now atomic, and we don't have to deal with swallowing an exception.

## How was this patch tested?

new unit tests

Author: Jose Torres <jose@databricks.com>

Closes #20096 from jose-torres/continuous-kafka.
2018-01-11 10:52:12 -08:00
Feng Liu 9b33dfc408 [SPARK-22951][SQL] fix aggregation after dropDuplicates on empty data frames
## What changes were proposed in this pull request?

(courtesy of liancheng)

Spark SQL supports both global aggregation and grouping aggregation. Global aggregation always return a single row with the initial aggregation state as the output, even there are zero input rows. Spark implements this by simply checking the number of grouping keys and treats an aggregation as a global aggregation if it has zero grouping keys.

However, this simple principle drops the ball in the following case:

```scala
spark.emptyDataFrame.dropDuplicates().agg(count($"*") as "c").show()
// +---+
// | c |
// +---+
// | 1 |
// +---+
```

The reason is that:

1. `df.dropDuplicates()` is roughly translated into something equivalent to:

```scala
val allColumns = df.columns.map { col }
df.groupBy(allColumns: _*).agg(allColumns.head, allColumns.tail: _*)
```

This translation is implemented in the rule `ReplaceDeduplicateWithAggregate`.

2. `spark.emptyDataFrame` contains zero columns and zero rows.

Therefore, rule `ReplaceDeduplicateWithAggregate` makes a confusing transformation roughly equivalent to the following one:

```scala
spark.emptyDataFrame.dropDuplicates()
=> spark.emptyDataFrame.groupBy().agg(Map.empty[String, String])
```

The above transformation is confusing because the resulting aggregate operator contains no grouping keys (because `emptyDataFrame` contains no columns), and gets recognized as a global aggregation. As a result, Spark SQL allocates a single row filled by the initial aggregation state and uses it as the output, and returns a wrong result.

To fix this issue, this PR tweaks `ReplaceDeduplicateWithAggregate` by appending a literal `1` to the grouping key list of the resulting `Aggregate` operator when the input plan contains zero output columns. In this way, `spark.emptyDataFrame.dropDuplicates()` is now translated into a grouping aggregation, roughly depicted as:

```scala
spark.emptyDataFrame.dropDuplicates()
=> spark.emptyDataFrame.groupBy(lit(1)).agg(Map.empty[String, String])
```

Which is now properly treated as a grouping aggregation and returns the correct answer.

## How was this patch tested?

New unit tests added

Author: Feng Liu <fengliu@databricks.com>

Closes #20174 from liufengdb/fix-duplicate.
2018-01-10 14:25:04 -08:00
Wenchen Fan eaac60a1e2 [SPARK-16060][SQL][FOLLOW-UP] add a wrapper solution for vectorized orc reader
## What changes were proposed in this pull request?

This is mostly from https://github.com/apache/spark/pull/13775

The wrapper solution is pretty good for string/binary type, as the ORC column vector doesn't keep bytes in a continuous memory region, and has a significant overhead when copying the data to Spark columnar batch. For other cases, the wrapper solution is almost same with the current solution.

I think we can treat the wrapper solution as a baseline and keep improving the writing to Spark solution.

## How was this patch tested?

existing tests.

Author: Wenchen Fan <wenchen@databricks.com>

Closes #20205 from cloud-fan/orc.
2018-01-10 15:16:27 +08:00
Wenchen Fan 6f169ca9e1 [MINOR] fix a typo in BroadcastJoinSuite
## What changes were proposed in this pull request?

`BroadcastNestedLoopJoinExec` should be `BroadcastHashJoinExec`

## How was this patch tested?

N/A

Author: Wenchen Fan <wenchen@databricks.com>

Closes #20202 from cloud-fan/typo.
2018-01-10 10:20:34 +08:00
Wang Gengliang 96ba217a06 [SPARK-23005][CORE] Improve RDD.take on small number of partitions
## What changes were proposed in this pull request?
In current implementation of RDD.take, we overestimate the number of partitions we need to try by 50%:
`(1.5 * num * partsScanned / buf.size).toInt`
However, when the number is small, the result of `.toInt` is not what we want.
E.g, 2.9 will become 2, which should be 3.
Use Math.ceil to fix the problem.

Also clean up the code in RDD.scala.

## How was this patch tested?

Unit test

Author: Wang Gengliang <ltnwgl@gmail.com>

Closes #20200 from gengliangwang/Take.
2018-01-10 10:15:27 +08:00
Dongjoon Hyun f44ba910f5 [SPARK-16060][SQL] Support Vectorized ORC Reader
## What changes were proposed in this pull request?

This PR adds an ORC columnar-batch reader to native `OrcFileFormat`. Since both Spark `ColumnarBatch` and ORC `RowBatch` are used together, it is faster than the current Spark implementation. This replaces the prior PR, #17924.

Also, this PR adds `OrcReadBenchmark` to show the performance improvement.

## How was this patch tested?

Pass the existing test cases.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #19943 from dongjoon-hyun/SPARK-16060.
2018-01-09 21:48:14 +08:00
xubo245 68ce792b58 [SPARK-22972] Couldn't find corresponding Hive SerDe for data source provider org.apache.spark.sql.hive.orc
## What changes were proposed in this pull request?
Fix the warning: Couldn't find corresponding Hive SerDe for data source provider org.apache.spark.sql.hive.orc.

## How was this patch tested?
 test("SPARK-22972: hive orc source")
    assert(HiveSerDe.sourceToSerDe("org.apache.spark.sql.hive.orc")
      .equals(HiveSerDe.sourceToSerDe("orc")))

Author: xubo245 <601450868@qq.com>

Closes #20165 from xubo245/HiveSerDe.
2018-01-09 10:15:01 +08:00
Jose Torres 4f7e758834 [SPARK-22912] v2 data source support in MicroBatchExecution
## What changes were proposed in this pull request?

Support for v2 data sources in microbatch streaming.

## How was this patch tested?

A very basic new unit test on the toy v2 implementation of rate source. Once we have a v1 source fully migrated to v2, we'll need to do more detailed compatibility testing.

Author: Jose Torres <jose@databricks.com>

Closes #20097 from jose-torres/v2-impl.
2018-01-08 13:24:08 -08:00
Xianjin YE 40b983c3b4 [SPARK-22952][CORE] Deprecate stageAttemptId in favour of stageAttemptNumber
## What changes were proposed in this pull request?
1.  Deprecate attemptId in StageInfo and add `def attemptNumber() = attemptId`
2. Replace usage of stageAttemptId with stageAttemptNumber

## How was this patch tested?
I manually checked the compiler warning info

Author: Xianjin YE <advancedxy@gmail.com>

Closes #20178 from advancedxy/SPARK-22952.
2018-01-08 23:49:07 +08:00
Wenchen Fan eb45b52e82 [SPARK-21865][SQL] simplify the distribution semantic of Spark SQL
## What changes were proposed in this pull request?

**The current shuffle planning logic**

1. Each operator specifies the distribution requirements for its children, via the `Distribution` interface.
2. Each operator specifies its output partitioning, via the `Partitioning` interface.
3. `Partitioning.satisfy` determines whether a `Partitioning` can satisfy a `Distribution`.
4. For each operator, check each child of it, add a shuffle node above the child if the child partitioning can not satisfy the required distribution.
5. For each operator, check if its children's output partitionings are compatible with each other, via the `Partitioning.compatibleWith`.
6. If the check in 5 failed, add a shuffle above each child.
7. try to eliminate the shuffles added in 6, via `Partitioning.guarantees`.

This design has a major problem with the definition of "compatible".

`Partitioning.compatibleWith` is not well defined, ideally a `Partitioning` can't know if it's compatible with other `Partitioning`, without more information from the operator. For example, `t1 join t2 on t1.a = t2.b`, `HashPartitioning(a, 10)` should be compatible with `HashPartitioning(b, 10)` under this case, but the partitioning itself doesn't know it.

As a result, currently `Partitioning.compatibleWith` always return false except for literals, which make it almost useless. This also means, if an operator has distribution requirements for multiple children, Spark always add shuffle nodes to all the children(although some of them can be eliminated). However, there is no guarantee that the children's output partitionings are compatible with each other after adding these shuffles, we just assume that the operator will only specify `ClusteredDistribution` for multiple children.

I think it's very hard to guarantee children co-partition for all kinds of operators, and we can not even give a clear definition about co-partition between distributions like `ClusteredDistribution(a,b)` and `ClusteredDistribution(c)`.

I think we should drop the "compatible" concept in the distribution model, and let the operator achieve the co-partition requirement by special distribution requirements.

**Proposed shuffle planning logic after this PR**
(The first 4 are same as before)
1. Each operator specifies the distribution requirements for its children, via the `Distribution` interface.
2. Each operator specifies its output partitioning, via the `Partitioning` interface.
3. `Partitioning.satisfy` determines whether a `Partitioning` can satisfy a `Distribution`.
4. For each operator, check each child of it, add a shuffle node above the child if the child partitioning can not satisfy the required distribution.
5. For each operator, check if its children's output partitionings have the same number of partitions.
6. If the check in 5 failed, pick the max number of partitions from children's output partitionings, and add shuffle to child whose number of partitions doesn't equal to the max one.

The new distribution model is very simple, we only have one kind of relationship, which is `Partitioning.satisfy`. For multiple children, Spark only guarantees they have the same number of partitions, and it's the operator's responsibility to leverage this guarantee to achieve more complicated requirements. For example, non-broadcast joins can use the newly added `HashPartitionedDistribution` to achieve co-partition.

## How was this patch tested?

existing tests.

Author: Wenchen Fan <wenchen@databricks.com>

Closes #19080 from cloud-fan/exchange.
2018-01-08 19:41:41 +08:00
Josh Rosen 2c73d2a948 [SPARK-22983] Don't push filters beneath aggregates with empty grouping expressions
## What changes were proposed in this pull request?

The following SQL query should return zero rows, but in Spark it actually returns one row:

```
SELECT 1 from (
  SELECT 1 AS z,
  MIN(a.x)
  FROM (select 1 as x) a
  WHERE false
) b
where b.z != b.z
```

The problem stems from the `PushDownPredicate` rule: when this rule encounters a filter on top of an Aggregate operator, e.g. `Filter(Agg(...))`, it removes the original filter and adds a new filter onto Aggregate's child, e.g. `Agg(Filter(...))`. This is sometimes okay, but the case above is a counterexample: because there is no explicit `GROUP BY`, we are implicitly computing a global aggregate over the entire table so the original filter was not acting like a `HAVING` clause filtering the number of groups: if we push this filter then it fails to actually reduce the cardinality of the Aggregate output, leading to the wrong answer.

In 2016 I fixed a similar problem involving invalid pushdowns of data-independent filters (filters which reference no columns of the filtered relation). There was additional discussion after my fix was merged which pointed out that my patch was an incomplete fix (see #15289), but it looks I must have either misunderstood the comment or forgot to follow up on the additional points raised there.

This patch fixes the problem by choosing to never push down filters in cases where there are no grouping expressions. Since there are no grouping keys, the only columns are aggregate columns and we can't push filters defined over aggregate results, so this change won't cause us to miss out on any legitimate pushdown opportunities.

## How was this patch tested?

New regression tests in `SQLQueryTestSuite` and `FilterPushdownSuite`.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #20180 from JoshRosen/SPARK-22983-dont-push-filters-beneath-aggs-with-empty-grouping-expressions.
2018-01-08 16:04:03 +08:00
hyukjinkwon 8fdeb4b994 [SPARK-22979][PYTHON][SQL] Avoid per-record type dispatch in Python data conversion (EvaluatePython.fromJava)
## What changes were proposed in this pull request?

Seems we can avoid type dispatch for each value when Java objection (from Pyrolite) -> Spark's internal data format because we know the schema ahead.

I manually performed the benchmark as below:

```scala
  test("EvaluatePython.fromJava / EvaluatePython.makeFromJava") {
    val numRows = 1000 * 1000
    val numFields = 30

    val random = new Random(System.nanoTime())
    val types = Array(
      BooleanType, ByteType, FloatType, DoubleType, IntegerType, LongType, ShortType,
      DecimalType.ShortDecimal, DecimalType.IntDecimal, DecimalType.ByteDecimal,
      DecimalType.FloatDecimal, DecimalType.LongDecimal, new DecimalType(5, 2),
      new DecimalType(12, 2), new DecimalType(30, 10), CalendarIntervalType)
    val schema = RandomDataGenerator.randomSchema(random, numFields, types)
    val rows = mutable.ArrayBuffer.empty[Array[Any]]
    var i = 0
    while (i < numRows) {
      val row = RandomDataGenerator.randomRow(random, schema)
      rows += row.toSeq.toArray
      i += 1
    }

    val benchmark = new Benchmark("EvaluatePython.fromJava / EvaluatePython.makeFromJava", numRows)
    benchmark.addCase("Before - EvaluatePython.fromJava", 3) { _ =>
      var i = 0
      while (i < numRows) {
        EvaluatePython.fromJava(rows(i), schema)
        i += 1
      }
    }

    benchmark.addCase("After - EvaluatePython.makeFromJava", 3) { _ =>
      val fromJava = EvaluatePython.makeFromJava(schema)
      var i = 0
      while (i < numRows) {
        fromJava(rows(i))
        i += 1
      }
    }

    benchmark.run()
  }
```

```
EvaluatePython.fromJava / EvaluatePython.makeFromJava: Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
Before - EvaluatePython.fromJava              1265 / 1346          0.8        1264.8       1.0X
After - EvaluatePython.makeFromJava            571 /  649          1.8         570.8       2.2X
```

If the structure is nested, I think the advantage should be larger than this.

## How was this patch tested?

Existing tests should cover this. Also, I manually checked if the values from before / after are actually same via `assert` when performing the benchmarks.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #20172 from HyukjinKwon/type-dispatch-python-eval.
2018-01-08 13:59:08 +08:00
fjh100456 7b78041423 [SPARK-21786][SQL] When acquiring 'compressionCodecClassName' in 'ParquetOptions', parquet.compression needs to be considered.
[SPARK-21786][SQL] When acquiring 'compressionCodecClassName' in 'ParquetOptions', `parquet.compression` needs to be considered.

## What changes were proposed in this pull request?
Since Hive 1.1, Hive allows users to set parquet compression codec via table-level properties parquet.compression. See the JIRA: https://issues.apache.org/jira/browse/HIVE-7858 . We do support orc.compression for ORC. Thus, for external users, it is more straightforward to support both. See the stackflow question: https://stackoverflow.com/questions/36941122/spark-sql-ignores-parquet-compression-propertie-specified-in-tblproperties
In Spark side, our table-level compression conf compression was added by #11464 since Spark 2.0.
We need to support both table-level conf. Users might also use session-level conf spark.sql.parquet.compression.codec. The priority rule will be like
If other compression codec configuration was found through hive or parquet, the precedence would be compression, parquet.compression, spark.sql.parquet.compression.codec. Acceptable values include: none, uncompressed, snappy, gzip, lzo.
The rule for Parquet is consistent with the ORC after the change.

Changes:
1.Increased acquiring 'compressionCodecClassName' from `parquet.compression`,and the precedence order is `compression`,`parquet.compression`,`spark.sql.parquet.compression.codec`, just like what we do in `OrcOptions`.

2.Change `spark.sql.parquet.compression.codec` to support "none".Actually in `ParquetOptions`,we do support "none" as equivalent to "uncompressed", but it does not allowed to configured to "none".

3.Change `compressionCode` to `compressionCodecClassName`.

## How was this patch tested?
Add test.

Author: fjh100456 <fu.jinhua6@zte.com.cn>

Closes #20076 from fjh100456/ParquetOptionIssue.
2018-01-06 18:19:57 +08:00
Takeshi Yamamuro e8af7e8aec [SPARK-22937][SQL] SQL elt output binary for binary inputs
## What changes were proposed in this pull request?
This pr modified `elt` to output binary for binary inputs.
`elt` in the current master always output data as a string. But, in some databases (e.g., MySQL), if all inputs are binary, `elt` also outputs binary (Also, this might be a small surprise).
This pr is related to #19977.

## How was this patch tested?
Added tests in `SQLQueryTestSuite` and `TypeCoercionSuite`.

Author: Takeshi Yamamuro <yamamuro@apache.org>

Closes #20135 from maropu/SPARK-22937.
2018-01-06 09:26:03 +08:00
Takeshi Yamamuro 52fc5c17d9 [SPARK-22825][SQL] Fix incorrect results of Casting Array to String
## What changes were proposed in this pull request?
This pr fixed the issue when casting arrays into strings;
```
scala> val df = spark.range(10).select('id.cast("integer")).agg(collect_list('id).as('ids))
scala> df.write.saveAsTable("t")
scala> sql("SELECT cast(ids as String) FROM t").show(false)
+------------------------------------------------------------------+
|ids                                                               |
+------------------------------------------------------------------+
|org.apache.spark.sql.catalyst.expressions.UnsafeArrayData8bc285df|
+------------------------------------------------------------------+
```

This pr modified the result into;
```
+------------------------------+
|ids                           |
+------------------------------+
|[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]|
+------------------------------+
```

## How was this patch tested?
Added tests in `CastSuite` and `SQLQuerySuite`.

Author: Takeshi Yamamuro <yamamuro@apache.org>

Closes #20024 from maropu/SPARK-22825.
2018-01-05 14:02:21 +08:00
Juliusz Sompolski df7fc3ef38 [SPARK-22957] ApproxQuantile breaks if the number of rows exceeds MaxInt
## What changes were proposed in this pull request?

32bit Int was used for row rank.
That overflowed in a dataframe with more than 2B rows.

## How was this patch tested?

Added test, but ignored, as it takes 4 minutes.

Author: Juliusz Sompolski <julek@databricks.com>

Closes #20152 from juliuszsompolski/SPARK-22957.
2018-01-05 10:16:34 +08:00
Wenchen Fan d5861aba9d [SPARK-22945][SQL] add java UDF APIs in the functions object
## What changes were proposed in this pull request?

Currently Scala users can use UDF like
```
val foo = udf((i: Int) => Math.random() + i).asNondeterministic
df.select(foo('a))
```
Python users can also do it with similar APIs. However Java users can't do it, we should add Java UDF APIs in the functions object.

## How was this patch tested?

new tests

Author: Wenchen Fan <wenchen@databricks.com>

Closes #20141 from cloud-fan/udf.
2018-01-04 19:17:22 +08:00
Felix Cheung df95a908ba [SPARK-22933][SPARKR] R Structured Streaming API for withWatermark, trigger, partitionBy
## What changes were proposed in this pull request?

R Structured Streaming API for withWatermark, trigger, partitionBy

## How was this patch tested?

manual, unit tests

Author: Felix Cheung <felixcheung_m@hotmail.com>

Closes #20129 from felixcheung/rwater.
2018-01-03 21:43:14 -08:00
Wenchen Fan b297029130 [SPARK-20960][SQL] make ColumnVector public
## What changes were proposed in this pull request?

move `ColumnVector` and related classes to `org.apache.spark.sql.vectorized`, and improve the document.

## How was this patch tested?

existing tests.

Author: Wenchen Fan <wenchen@databricks.com>

Closes #20116 from cloud-fan/column-vector.
2018-01-04 07:28:53 +08:00
Wenchen Fan a66fe36cee [SPARK-20236][SQL] dynamic partition overwrite
## What changes were proposed in this pull request?

When overwriting a partitioned table with dynamic partition columns, the behavior is different between data source and hive tables.

data source table: delete all partition directories that match the static partition values provided in the insert statement.

hive table: only delete partition directories which have data written into it

This PR adds a new config to make users be able to choose hive's behavior.

## How was this patch tested?

new tests

Author: Wenchen Fan <wenchen@databricks.com>

Closes #18714 from cloud-fan/overwrite-partition.
2018-01-03 22:18:13 +08:00
gatorsmile 1a87a1609c [SPARK-22934][SQL] Make optional clauses order insensitive for CREATE TABLE SQL statement
## What changes were proposed in this pull request?
Currently, our CREATE TABLE syntax require the EXACT order of clauses. It is pretty hard to remember the exact order. Thus, this PR is to make optional clauses order insensitive for `CREATE TABLE` SQL statement.

```
CREATE [TEMPORARY] TABLE [IF NOT EXISTS] [db_name.]table_name
    [(col_name1 col_type1 [COMMENT col_comment1], ...)]
    USING datasource
    [OPTIONS (key1=val1, key2=val2, ...)]
    [PARTITIONED BY (col_name1, col_name2, ...)]
    [CLUSTERED BY (col_name3, col_name4, ...) INTO num_buckets BUCKETS]
    [LOCATION path]
    [COMMENT table_comment]
    [TBLPROPERTIES (key1=val1, key2=val2, ...)]
    [AS select_statement]
```

The proposal is to make the following clauses order insensitive.
```
    [OPTIONS (key1=val1, key2=val2, ...)]
    [PARTITIONED BY (col_name1, col_name2, ...)]
    [CLUSTERED BY (col_name3, col_name4, ...) INTO num_buckets BUCKETS]
    [LOCATION path]
    [COMMENT table_comment]
    [TBLPROPERTIES (key1=val1, key2=val2, ...)]
```

The same idea is also applicable to Create Hive Table.
```
CREATE [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name
    [(col_name1[:] col_type1 [COMMENT col_comment1], ...)]
    [COMMENT table_comment]
    [PARTITIONED BY (col_name2[:] col_type2 [COMMENT col_comment2], ...)]
    [ROW FORMAT row_format]
    [STORED AS file_format]
    [LOCATION path]
    [TBLPROPERTIES (key1=val1, key2=val2, ...)]
    [AS select_statement]
```

The proposal is to make the following clauses order insensitive.
```
    [COMMENT table_comment]
    [PARTITIONED BY (col_name2[:] col_type2 [COMMENT col_comment2], ...)]
    [ROW FORMAT row_format]
    [STORED AS file_format]
    [LOCATION path]
    [TBLPROPERTIES (key1=val1, key2=val2, ...)]
```

## How was this patch tested?
Added test cases

Author: gatorsmile <gatorsmile@gmail.com>

Closes #20133 from gatorsmile/createDataSourceTableDDL.
2018-01-03 22:09:30 +08:00
Xianjin YE a6fc300e91 [SPARK-22897][CORE] Expose stageAttemptId in TaskContext
## What changes were proposed in this pull request?
stageAttemptId added in TaskContext and corresponding construction modification

## How was this patch tested?
Added a new test in TaskContextSuite, two cases are tested:
1. Normal case without failure
2. Exception case with resubmitted stages

Link to [SPARK-22897](https://issues.apache.org/jira/browse/SPARK-22897)

Author: Xianjin YE <advancedxy@gmail.com>

Closes #20082 from advancedxy/SPARK-22897.
2018-01-02 23:30:38 +08:00
Bryan Cutler 1c9f95cb77 [SPARK-22530][PYTHON][SQL] Adding Arrow support for ArrayType
## What changes were proposed in this pull request?

This change adds `ArrayType` support for working with Arrow in pyspark when creating a DataFrame, calling `toPandas()`, and using vectorized `pandas_udf`.

## How was this patch tested?

Added new Python unit tests using Array data.

Author: Bryan Cutler <cutlerb@gmail.com>

Closes #20114 from BryanCutler/arrow-ArrayType-support-SPARK-22530.
2018-01-02 07:13:27 +09:00
Sean Owen c284c4e1f6 [MINOR] Fix a bunch of typos 2018-01-02 07:10:19 +09:00
gatorsmile cfbe11e816 [SPARK-22895][SQL] Push down the deterministic predicates that are after the first non-deterministic
## What changes were proposed in this pull request?
Currently, we do not guarantee an order evaluation of conjuncts in either Filter or Join operator. This is also true to the mainstream RDBMS vendors like DB2 and MS SQL Server. Thus, we should also push down the deterministic predicates that are after the first non-deterministic, if possible.

## How was this patch tested?
Updated the existing test cases.

Author: gatorsmile <gatorsmile@gmail.com>

Closes #20069 from gatorsmile/morePushDown.
2017-12-31 15:06:54 +08:00
Gabor Somogyi ee3af15fea [SPARK-22363][SQL][TEST] Add unit test for Window spilling
## What changes were proposed in this pull request?

There is already test using window spilling, but the test coverage is not ideal.

In this PR the already existing test was fixed and additional cases added.

## How was this patch tested?

Automated: Pass the Jenkins.

Author: Gabor Somogyi <gabor.g.somogyi@gmail.com>

Closes #20022 from gaborgsomogyi/SPARK-22363.
2017-12-31 14:47:23 +08:00
Takeshi Yamamuro f2b3525c17 [SPARK-22771][SQL] Concatenate binary inputs into a binary output
## What changes were proposed in this pull request?
This pr modified `concat` to concat binary inputs into a single binary output.
`concat` in the current master always output data as a string. But, in some databases (e.g., PostgreSQL), if all inputs are binary, `concat` also outputs binary.

## How was this patch tested?
Added tests in `SQLQueryTestSuite` and `TypeCoercionSuite`.

Author: Takeshi Yamamuro <yamamuro@apache.org>

Closes #19977 from maropu/SPARK-22771.
2017-12-30 14:09:56 +08:00
WeichenXu 2ea17afb63 [SPARK-22881][ML][TEST] ML regression package testsuite add StructuredStreaming test
## What changes were proposed in this pull request?

ML regression package testsuite add StructuredStreaming test

In order to make testsuite easier to modify, new helper function added in `MLTest`:
```
def testTransformerByGlobalCheckFunc[A : Encoder](
      dataframe: DataFrame,
      transformer: Transformer,
      firstResultCol: String,
      otherResultCols: String*)
      (globalCheckFunction: Seq[Row] => Unit): Unit
```

## How was this patch tested?

N/A

Author: WeichenXu <weichen.xu@databricks.com>
Author: Bago Amirbekian <bago@databricks.com>

Closes #19979 from WeichenXu123/ml_stream_test.
2017-12-29 20:06:56 -08:00
oraviv fcf66a3276 [SPARK-21657][SQL] optimize explode quadratic memory consumpation
## What changes were proposed in this pull request?

The issue has been raised in two Jira tickets: [SPARK-21657](https://issues.apache.org/jira/browse/SPARK-21657), [SPARK-16998](https://issues.apache.org/jira/browse/SPARK-16998). Basically, what happens is that in collection generators like explode/inline we create many rows from each row. Currently each exploded row contains also the column on which it was created. This causes, for example, if we have a 10k array in one row that this array will get copy 10k times - to each of the row. this results a qudratic memory consumption. However, it is a common case that the original column gets projected out after the explode, so we can avoid duplicating it.
In this solution we propose to identify this situation in the optimizer and turn on a flag for omitting the original column in the generation process.

## How was this patch tested?

1. We added a benchmark test to MiscBenchmark that shows x16 improvement in runtimes.
2. We ran some of the other tests in MiscBenchmark and they show 15% improvements.
3. We ran this code on a specific case from our production data with rows containing arrays of size ~200k and it reduced the runtime from 6 hours to 3 mins.

Author: oraviv <oraviv@paypal.com>
Author: uzadude <ohad.raviv@gmail.com>
Author: uzadude <15645757+uzadude@users.noreply.github.com>

Closes #19683 from uzadude/optimize_explode.
2017-12-29 21:08:34 +08:00
Feng Liu cc30ef8009 [SPARK-22916][SQL] shouldn't bias towards build right if user does not specify
## What changes were proposed in this pull request?

When there are no broadcast hints, the current spark strategies will prefer to building the right side, without considering the sizes of the two tables. This patch added the logic to consider the sizes of the two tables for the build side. To make the logic clear, the build side is determined by two steps:

1. If there are broadcast hints, the build side is determined by `broadcastSideByHints`;
2. If there are no broadcast hints, the build side is determined by `broadcastSideBySizes`;
3. If the broadcast is disabled by the config, it falls back to the next cases.

## How was this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Feng Liu <fengliu@databricks.com>

Closes #20099 from liufengdb/fix-spark-strategies.
2017-12-29 18:48:47 +08:00
Wang Gengliang d4f0b1d2c5 [SPARK-22834][SQL] Make insertion commands have real children to fix UI issues
## What changes were proposed in this pull request?

With #19474,  children of insertion commands are missing in UI.
To fix it:
1. Create a new physical plan `DataWritingCommandExec` to exec `DataWritingCommand` with children.  So that the other commands won't be affected.
2. On creation of `DataWritingCommand`, a new field `allColumns` must be specified, which is the output of analyzed plan.
3. In `FileFormatWriter`, the output schema will use `allColumns` instead of the output of optimized plan.

Before code changes:
![2017-12-19 10 27 10](https://user-images.githubusercontent.com/1097932/34161850-d2fd0acc-e50c-11e7-898a-177154fe7d8e.png)

After code changes:
![2017-12-19 10 27 04](https://user-images.githubusercontent.com/1097932/34161865-de23de26-e50c-11e7-9131-0c32f7b7b749.png)

## How was this patch tested?
Unit test

Author: Wang Gengliang <ltnwgl@gmail.com>

Closes #20020 from gengliangwang/insert.
2017-12-29 15:28:33 +08:00
soonmok-kwon ffe6fd77a4 [SPARK-22818][SQL] csv escape of quote escape
## What changes were proposed in this pull request?

Escape of escape should be considered when using the UniVocity csv encoding/decoding library.

Ref: https://github.com/uniVocity/univocity-parsers#escaping-quote-escape-characters

One option is added for reading and writing CSV: `escapeQuoteEscaping`

## How was this patch tested?

Unit test added.

Author: soonmok-kwon <soonmok.kwon@navercorp.com>

Closes #20004 from ep1804/SPARK-22818.
2017-12-29 07:30:06 +08:00
Yuming Wang 613b71a123 [SPARK-22890][TEST] Basic tests for DateTimeOperations
## What changes were proposed in this pull request?

Test Coverage for `DateTimeOperations`, this is a Sub-tasks for [SPARK-22722](https://issues.apache.org/jira/browse/SPARK-22722).

## How was this patch tested?

N/A

Author: Yuming Wang <wgyumg@gmail.com>

Closes #20061 from wangyum/SPARK-22890.
2017-12-29 06:58:38 +08:00
Dongjoon Hyun 5536f3181c [MINOR][BUILD] Fix Java linter errors
## What changes were proposed in this pull request?

This PR cleans up a few Java linter errors for Apache Spark 2.3 release.

## How was this patch tested?

```bash
$ dev/lint-java
Using `mvn` from path: /usr/local/bin/mvn
Checkstyle checks passed.
```

We can see the result from [Travis CI](https://travis-ci.org/dongjoon-hyun/spark/builds/322470787), too.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #20101 from dongjoon-hyun/fix-java-lint.
2017-12-28 09:43:50 -06:00
Zhenhua Wang 2877817420 [SPARK-22917][SQL] Should not try to generate histogram for empty/null columns
## What changes were proposed in this pull request?

For empty/null column, the result of `ApproximatePercentile` is null. Then in `ApproxCountDistinctForIntervals`, a `MatchError` (for `endpoints`) will be thrown if we try to generate histogram for that column. Besides, there is no need to generate histogram for such column. In this patch, we exclude such column when generating histogram.

## How was this patch tested?

Enhanced test cases for empty/null columns.

Author: Zhenhua Wang <wangzhenhua@huawei.com>

Closes #20102 from wzhfy/no_record_hgm_bug.
2017-12-28 21:49:37 +08:00
Shixiong Zhu 32ec269d08 [SPARK-22909][SS] Move Structured Streaming v2 APIs to streaming folder
## What changes were proposed in this pull request?

This PR moves Structured Streaming v2 APIs to streaming folder as following:
```
sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming
├── ContinuousReadSupport.java
├── ContinuousWriteSupport.java
├── MicroBatchReadSupport.java
├── MicroBatchWriteSupport.java
├── reader
│   ├── ContinuousDataReader.java
│   ├── ContinuousReader.java
│   ├── MicroBatchReader.java
│   ├── Offset.java
│   └── PartitionOffset.java
└── writer
    └── ContinuousWriter.java
```

## How was this patch tested?

Jenkins

Author: Shixiong Zhu <zsxwing@gmail.com>

Closes #20093 from zsxwing/move.
2017-12-28 12:35:17 +08:00
Kazuaki Ishizaki 5683984520 [SPARK-18016][SQL][FOLLOW-UP] Code Generation: Constant Pool Limit - reduce entries for mutable state
## What changes were proposed in this pull request?

This PR addresses additional review comments in #19811

## How was this patch tested?

Existing test suites

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes #20036 from kiszk/SPARK-18066-followup.
2017-12-28 12:28:19 +08:00
Marco Gaido 774715d5c7 [SPARK-22904][SQL] Add tests for decimal operations and string casts
## What changes were proposed in this pull request?

Test coverage for arithmetic operations leading to:

 1. Precision loss
 2. Overflow

Moreover, tests for casting bad string to other input types and for using bad string as operators of some functions.

## How was this patch tested?

added tests

Author: Marco Gaido <marcogaido91@gmail.com>

Closes #20084 from mgaido91/SPARK-22904.
2017-12-27 23:53:10 +08:00
Yuming Wang 91d1b300d4 [SPARK-22894][SQL] DateTimeOperations should accept SQL like string type
## What changes were proposed in this pull request?

`DateTimeOperations` accept [`StringType`](ae998ec2b5/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala (L669)),  but:

```
spark-sql> SELECT '2017-12-24' + interval 2 months 2 seconds;
Error in query: cannot resolve '(CAST('2017-12-24' AS DOUBLE) + interval 2 months 2 seconds)' due to data type mismatch: differing types in '(CAST('2017-12-24' AS DOUBLE) + interval 2 months 2 seconds)' (double and calendarinterval).; line 1 pos 7;
'Project [unresolvedalias((cast(2017-12-24 as double) + interval 2 months 2 seconds), None)]
+- OneRowRelation
spark-sql>
```

After this PR:
```
spark-sql> SELECT '2017-12-24' + interval 2 months 2 seconds;
2018-02-24 00:00:02
Time taken: 0.2 seconds, Fetched 1 row(s)

```

## How was this patch tested?

unit tests

Author: Yuming Wang <wgyumg@gmail.com>

Closes #20067 from wangyum/SPARK-22894.
2017-12-26 09:40:41 -08:00
Marco Gaido ff48b1b338 [SPARK-22901][PYTHON] Add deterministic flag to pyspark UDF
## What changes were proposed in this pull request?

In SPARK-20586 the flag `deterministic` was added to Scala UDF, but it is not available for python UDF. This flag is useful for cases when the UDF's code can return different result with the same input. Due to optimization, duplicate invocations may be eliminated or the function may even be invoked more times than it is present in the query. This can lead to unexpected behavior.

This PR adds the deterministic flag, via the `asNondeterministic` method, to let the user mark the function as non-deterministic and therefore avoid the optimizations which might lead to strange behaviors.

## How was this patch tested?

Manual tests:
```
>>> from pyspark.sql.functions import *
>>> from pyspark.sql.types import *
>>> df_br = spark.createDataFrame([{'name': 'hello'}])
>>> import random
>>> udf_random_col =  udf(lambda: int(100*random.random()), IntegerType()).asNondeterministic()
>>> df_br = df_br.withColumn('RAND', udf_random_col())
>>> random.seed(1234)
>>> udf_add_ten =  udf(lambda rand: rand + 10, IntegerType())
>>> df_br.withColumn('RAND_PLUS_TEN', udf_add_ten('RAND')).show()
+-----+----+-------------+
| name|RAND|RAND_PLUS_TEN|
+-----+----+-------------+
|hello|   3|           13|
+-----+----+-------------+

```

Author: Marco Gaido <marcogaido91@gmail.com>
Author: Marco Gaido <mgaido@hortonworks.com>

Closes #19929 from mgaido91/SPARK-22629.
2017-12-26 06:39:40 -08:00
Takuya UESHIN eb386be1ed [SPARK-21552][SQL] Add DecimalType support to ArrowWriter.
## What changes were proposed in this pull request?

Decimal type is not yet supported in `ArrowWriter`.
This is adding the decimal type support.

## How was this patch tested?

Added a test to `ArrowConvertersSuite`.

Author: Takuya UESHIN <ueshin@databricks.com>

Closes #18754 from ueshin/issues/SPARK-21552.
2017-12-26 21:37:25 +09:00
Yuming Wang 33ae2437ba [SPARK-22893][SQL] Unified the data type mismatch message
## What changes were proposed in this pull request?

We should use `dataType.simpleString` to unified the data type mismatch message:
Before:
```
spark-sql> select cast(1 as binary);
Error in query: cannot resolve 'CAST(1 AS BINARY)' due to data type mismatch: cannot cast IntegerType to BinaryType; line 1 pos 7;
```
After:
```
park-sql> select cast(1 as binary);
Error in query: cannot resolve 'CAST(1 AS BINARY)' due to data type mismatch: cannot cast int to binary; line 1 pos 7;
```

## How was this patch tested?

Exist test.

Author: Yuming Wang <wgyumg@gmail.com>

Closes #20064 from wangyum/SPARK-22893.
2017-12-25 01:14:09 -08:00
Jose Torres 8941a4abca [SPARK-22789] Map-only continuous processing execution
## What changes were proposed in this pull request?

Basic continuous execution, supporting map/flatMap/filter, with commits and advancement through RPC.

## How was this patch tested?

new unit-ish tests (exercising execution end to end)

Author: Jose Torres <jose@databricks.com>

Closes #19984 from jose-torres/continuous-impl.
2017-12-22 23:05:03 -08:00
Michael Armbrust 8df1da396f [SPARK-22862] Docs on lazy elimination of columns missing from an encoder
This behavior has confused some users, so lets clarify it.

Author: Michael Armbrust <michael@databricks.com>

Closes #20048 from marmbrus/datasetAsDocs.
2017-12-21 21:38:16 -08:00
Imran Rashid 7beb375bf4 [SPARK-22861][SQL] SQLAppStatusListener handles multi-job executions.
When one execution has multiple jobs, we need to append to the set of
stages, not replace them on every job.

Added unit test and ran existing tests on jenkins

Author: Imran Rashid <irashid@cloudera.com>

Closes #20047 from squito/SPARK-22861.
2017-12-21 15:37:55 -08:00
Tejas Patil fe65361b05 [SPARK-22042][FOLLOW-UP][SQL] ReorderJoinPredicates can break when child's partitioning is not decided
## What changes were proposed in this pull request?

This is a followup PR of https://github.com/apache/spark/pull/19257 where gatorsmile had left couple comments wrt code style.

## How was this patch tested?

Doesn't change any functionality. Will depend on build to see if no checkstyle rules are violated.

Author: Tejas Patil <tejasp@fb.com>

Closes #20041 from tejasapatil/followup_19257.
2017-12-21 09:22:08 -08:00
Yuming Wang 4e107fdb74 [SPARK-22822][TEST] Basic tests for WindowFrameCoercion and DecimalPrecision
## What changes were proposed in this pull request?

Test Coverage for `WindowFrameCoercion` and `DecimalPrecision`, this is a Sub-tasks for [SPARK-22722](https://issues.apache.org/jira/browse/SPARK-22722).

## How was this patch tested?

N/A

Author: Yuming Wang <wgyumg@gmail.com>

Closes #20008 from wangyum/SPARK-22822.
2017-12-21 09:18:27 -08:00
Wenchen Fan d3a1d9527b [SPARK-22786][SQL] only use AppStatusPlugin in history server
## What changes were proposed in this pull request?

In https://github.com/apache/spark/pull/19681 we introduced a new interface called `AppStatusPlugin`, to register listeners and set up the UI for both live and history UI.

However I think it's an overkill for live UI. For example, we should not register `SQLListener` if users are not using SQL functions. Previously we register the `SQLListener` and set up SQL tab when `SparkSession` is firstly created, which indicates users are going to use SQL functions. But in #19681 , we register the SQL functions during `SparkContext` creation. The same thing should apply to streaming too.

I think we should keep the previous behavior, and only use this new interface for history server.

To reflect this change, I also rename the new interface to `SparkHistoryUIPlugin`

This PR also refines the tests for sql listener.

## How was this patch tested?

existing tests

Author: Wenchen Fan <wenchen@databricks.com>

Closes #19981 from cloud-fan/listener.
2017-12-22 01:08:13 +08:00
Bryan Cutler 59d52631eb [SPARK-22324][SQL][PYTHON] Upgrade Arrow to 0.8.0
## What changes were proposed in this pull request?

Upgrade Spark to Arrow 0.8.0 for Java and Python.  Also includes an upgrade of Netty to 4.1.17 to resolve dependency requirements.

The highlights that pertain to Spark for the update from Arrow versoin 0.4.1 to 0.8.0 include:

* Java refactoring for more simple API
* Java reduced heap usage and streamlined hot code paths
* Type support for DecimalType, ArrayType
* Improved type casting support in Python
* Simplified type checking in Python

## How was this patch tested?

Existing tests

Author: Bryan Cutler <cutlerb@gmail.com>
Author: Shixiong Zhu <zsxwing@gmail.com>

Closes #19884 from BryanCutler/arrow-upgrade-080-SPARK-22324.
2017-12-21 20:43:56 +09:00
Xingbo Jiang 9c289a5cb4 [SPARK-22387][SQL] Propagate session configs to data source read/write options
## What changes were proposed in this pull request?

Introduce a new interface `SessionConfigSupport` for `DataSourceV2`, it can help to propagate session configs with the specified key-prefix to all data source operations in this session.

## How was this patch tested?

Add new test suite `DataSourceV2UtilsSuite`.

Author: Xingbo Jiang <xingbo.jiang@databricks.com>

Closes #19861 from jiangxb1987/datasource-configs.
2017-12-21 10:02:30 +08:00
Jose Torres 7798c9e6ef [SPARK-22824] Restore old offset for binary compatibility
## What changes were proposed in this pull request?

Some users depend on source compatibility with the org.apache.spark.sql.execution.streaming.Offset class. Although this is not a stable interface, we can keep it in place for now to simplify upgrades to 2.3.

Author: Jose Torres <jose@databricks.com>

Closes #20012 from joseph-torres/binary-compat.
2017-12-20 10:43:10 -08:00
Dongjoon Hyun 9962390af7 [SPARK-22781][SS] Support creating streaming dataset with ORC files
## What changes were proposed in this pull request?

Like `Parquet`, users can use `ORC` with Apache Spark structured streaming. This PR adds `orc()` to `DataStreamReader`(Scala/Python) in order to support creating streaming dataset with ORC file format more easily like the other file formats. Also, this adds a test coverage for ORC data source and updates the document.

**BEFORE**

```scala
scala> spark.readStream.schema("a int").orc("/tmp/orc_ss").writeStream.format("console").start()
<console>:24: error: value orc is not a member of org.apache.spark.sql.streaming.DataStreamReader
       spark.readStream.schema("a int").orc("/tmp/orc_ss").writeStream.format("console").start()
```

**AFTER**
```scala
scala> spark.readStream.schema("a int").orc("/tmp/orc_ss").writeStream.format("console").start()
res0: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper678b3746

scala>
-------------------------------------------
Batch: 0
-------------------------------------------
+---+
|  a|
+---+
|  1|
+---+
```

## How was this patch tested?

Pass the newly added test cases.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #19975 from dongjoon-hyun/SPARK-22781.
2017-12-19 23:50:06 -08:00
Fernando Pereira 13268a58f8 [SPARK-22649][PYTHON][SQL] Adding localCheckpoint to Dataset API
## What changes were proposed in this pull request?

This change adds local checkpoint support to datasets and respective bind from Python Dataframe API.

If reliability requirements can be lowered to favor performance, as in cases of further quick transformations followed by a reliable save, localCheckpoints() fit very well.
Furthermore, at the moment Reliable checkpoints still incur double computation (see #9428)
In general it makes the API more complete as well.

## How was this patch tested?

Python land quick use case:

```python
>>> from time import sleep
>>> from pyspark.sql import types as T
>>> from pyspark.sql import functions as F

>>> def f(x):
    sleep(1)
    return x*2
   ...:

>>> df1 = spark.range(30, numPartitions=6)
>>> df2 = df1.select(F.udf(f, T.LongType())("id"))

>>> %time _ = df2.collect()
CPU times: user 7.79 ms, sys: 5.84 ms, total: 13.6 ms
Wall time: 12.2 s

>>> %time df3 = df2.localCheckpoint()
CPU times: user 2.38 ms, sys: 2.3 ms, total: 4.68 ms
Wall time: 10.3 s

>>> %time _ = df3.collect()
CPU times: user 5.09 ms, sys: 410 µs, total: 5.5 ms
Wall time: 148 ms

>>> sc.setCheckpointDir(".")
>>> %time df3 = df2.checkpoint()
CPU times: user 4.04 ms, sys: 1.63 ms, total: 5.67 ms
Wall time: 20.3 s
```

Author: Fernando Pereira <fernando.pereira@epfl.ch>

Closes #19805 from ferdonline/feature_dataset_localCheckpoint.
2017-12-19 20:47:12 -08:00
Youngbin Kim 6e36d8d562 [SPARK-22829] Add new built-in function date_trunc()
## What changes were proposed in this pull request?

Adding date_trunc() as a built-in function.
`date_trunc` is common in other databases, but Spark or Hive does not have support for this. `date_trunc` is commonly used by data scientists and business intelligence application such as Superset (https://github.com/apache/incubator-superset).
We do have `trunc` but this only works with 'MONTH' and 'YEAR' level on the DateType input.

date_trunc() in other databases:
AWS Redshift: http://docs.aws.amazon.com/redshift/latest/dg/r_DATE_TRUNC.html
PostgreSQL: https://www.postgresql.org/docs/9.1/static/functions-datetime.html
Presto: https://prestodb.io/docs/current/functions/datetime.html

## How was this patch tested?

Unit tests

(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Youngbin Kim <ykim828@hotmail.com>

Closes #20015 from youngbink/date_trunc.
2017-12-19 20:22:33 -08:00
Sital Kedia 3a7494dfee [SPARK-22827][CORE] Avoid throwing OutOfMemoryError in case of exception in spill
## What changes were proposed in this pull request?
Currently, the task memory manager throws an OutofMemory error when there is an IO exception happens in spill() - https://github.com/apache/spark/blob/master/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java#L194. Similarly there any many other places in code when if a task is not able to acquire memory due to an exception we throw an OutofMemory error which kills the entire executor and hence failing all the tasks that are running on that executor instead of just failing one single task.

## How was this patch tested?

Unit tests

Author: Sital Kedia <skedia@fb.com>

Closes #20014 from sitalkedia/skedia/upstream_SPARK-22827.
2017-12-20 12:21:00 +08:00
Yuming Wang 6129ffa11e [SPARK-22821][TEST] Basic tests for WidenSetOperationTypes, BooleanEquality, StackCoercion and Division
## What changes were proposed in this pull request?

Test Coverage for `WidenSetOperationTypes`, `BooleanEquality`, `StackCoercion`  and `Division`, this is a Sub-tasks for [SPARK-22722](https://issues.apache.org/jira/browse/SPARK-22722).

## How was this patch tested?
N/A

Author: Yuming Wang <wgyumg@gmail.com>

Closes #20006 from wangyum/SPARK-22821.
2017-12-19 11:56:22 -08:00
Kazuaki Ishizaki ee56fc3432 [SPARK-18016][SQL] Code Generation: Constant Pool Limit - reduce entries for mutable state
## What changes were proposed in this pull request?

This PR is follow-on of #19518. This PR tries to reduce the number of constant pool entries used for accessing mutable state.
There are two directions:
1. Primitive type variables should be allocated at the outer class due to better performance. Otherwise, this PR allocates an array.
2. The length of allocated array is up to 32768 due to avoiding usage of constant pool entry at access (e.g. `mutableStateArray[32767]`).

Here are some discussions to determine these directions.
1. [[1]](https://github.com/apache/spark/pull/19518#issuecomment-346690464), [[2]](https://github.com/apache/spark/pull/19518#issuecomment-346690642), [[3]](https://github.com/apache/spark/pull/19518#issuecomment-346828180), [[4]](https://github.com/apache/spark/pull/19518#issuecomment-346831544), [[5]](https://github.com/apache/spark/pull/19518#issuecomment-346857340)
2. [[6]](https://github.com/apache/spark/pull/19518#issuecomment-346729172), [[7]](https://github.com/apache/spark/pull/19518#issuecomment-346798358), [[8]](https://github.com/apache/spark/pull/19518#issuecomment-346870408)

This PR modifies `addMutableState` function in the `CodeGenerator` to check if the declared state can be easily initialized compacted into an array. We identify three types of states that cannot compacted:

- Primitive type state (ints, booleans, etc) if the number of them does not exceed threshold
- Multiple-dimensional array type
- `inline = true`

When `useFreshName = false`, the given name is used.

Many codes were ported from #19518. Many efforts were put here. I think this PR should credit to bdrillard

With this PR, the following code is generated:
```
/* 005 */ class SpecificMutableProjection extends org.apache.spark.sql.catalyst.expressions.codegen.BaseMutableProjection {
/* 006 */
/* 007 */   private Object[] references;
/* 008 */   private InternalRow mutableRow;
/* 009 */   private boolean isNull_0;
/* 010 */   private boolean isNull_1;
/* 011 */   private boolean isNull_2;
/* 012 */   private int value_2;
/* 013 */   private boolean isNull_3;
...
/* 10006 */   private int value_4999;
/* 10007 */   private boolean isNull_5000;
/* 10008 */   private int value_5000;
/* 10009 */   private InternalRow[] mutableStateArray = new InternalRow[2];
/* 10010 */   private boolean[] mutableStateArray1 = new boolean[7001];
/* 10011 */   private int[] mutableStateArray2 = new int[1001];
/* 10012 */   private UTF8String[] mutableStateArray3 = new UTF8String[6000];
/* 10013 */
...
/* 107956 */     private void init_176() {
/* 107957 */       isNull_4986 = true;
/* 107958 */       value_4986 = -1;
...
/* 108004 */     }
...
```

## How was this patch tested?

Added a new test case to `GeneratedProjectionSuite`

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes #19811 from kiszk/SPARK-18016.
2017-12-20 00:10:54 +08:00
gatorsmile 28315714dd [SPARK-22791][SQL][SS] Redact Output of Explain
## What changes were proposed in this pull request?

When calling explain on a query, the output can contain sensitive information. We should provide an admin/user to redact such information.

Before this PR, the plan of SS is like this
```
== Physical Plan ==
*HashAggregate(keys=[value#6], functions=[count(1)], output=[value#6, count(1)#12L])
+- StateStoreSave [value#6], state info [ checkpoint = file:/private/var/folders/vx/j0ydl5rn0gd9mgrh1pljnw900000gn/T/temporary-91c6fac0-609f-4bc8-ad57-52c189f06797/state, runId = 05a4b3af-f02c-40f8-9ff9-a3e18bae496f, opId = 0, ver = 0, numPartitions = 5], Complete, 0
   +- *HashAggregate(keys=[value#6], functions=[merge_count(1)], output=[value#6, count#18L])
      +- StateStoreRestore [value#6], state info [ checkpoint = file:/private/var/folders/vx/j0ydl5rn0gd9mgrh1pljnw900000gn/T/temporary-91c6fac0-609f-4bc8-ad57-52c189f06797/state, runId = 05a4b3af-f02c-40f8-9ff9-a3e18bae496f, opId = 0, ver = 0, numPartitions = 5]
         +- *HashAggregate(keys=[value#6], functions=[merge_count(1)], output=[value#6, count#18L])
            +- Exchange hashpartitioning(value#6, 5)
               +- *HashAggregate(keys=[value#6], functions=[partial_count(1)], output=[value#6, count#18L])
                  +- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false) AS value#6]
                     +- *MapElements <function1>, obj#5: java.lang.String
                        +- *DeserializeToObject value#30.toString, obj#4: java.lang.String
                           +- LocalTableScan [value#30]
```

After this PR, we can get the following output if users set `spark.redaction.string.regex` to `file:/[\\w_]+`
```
== Physical Plan ==
*HashAggregate(keys=[value#6], functions=[count(1)], output=[value#6, count(1)#12L])
+- StateStoreSave [value#6], state info [ checkpoint = *********(redacted)/var/folders/vx/j0ydl5rn0gd9mgrh1pljnw900000gn/T/temporary-e7da9b7d-3ec0-474d-8b8c-927f7d12ed72/state, runId = 8a9c3761-93d5-4896-ab82-14c06240dcea, opId = 0, ver = 0, numPartitions = 5], Complete, 0
   +- *HashAggregate(keys=[value#6], functions=[merge_count(1)], output=[value#6, count#32L])
      +- StateStoreRestore [value#6], state info [ checkpoint = *********(redacted)/var/folders/vx/j0ydl5rn0gd9mgrh1pljnw900000gn/T/temporary-e7da9b7d-3ec0-474d-8b8c-927f7d12ed72/state, runId = 8a9c3761-93d5-4896-ab82-14c06240dcea, opId = 0, ver = 0, numPartitions = 5]
         +- *HashAggregate(keys=[value#6], functions=[merge_count(1)], output=[value#6, count#32L])
            +- Exchange hashpartitioning(value#6, 5)
               +- *HashAggregate(keys=[value#6], functions=[partial_count(1)], output=[value#6, count#32L])
                  +- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false) AS value#6]
                     +- *MapElements <function1>, obj#5: java.lang.String
                        +- *DeserializeToObject value#27.toString, obj#4: java.lang.String
                           +- LocalTableScan [value#27]
```
## How was this patch tested?
Added a test case

Author: gatorsmile <gatorsmile@gmail.com>

Closes #19985 from gatorsmile/redactPlan.
2017-12-19 22:12:23 +08:00
CodingCat ab7346f20c [SPARK-22673][SQL] InMemoryRelation should utilize existing stats whenever possible
## What changes were proposed in this pull request?

The current implementation of InMemoryRelation always uses the most expensive execution plan when writing cache
With CBO enabled, we can actually have a more exact estimation of the underlying table size...

## How was this patch tested?

existing test

Author: CodingCat <zhunansjtu@gmail.com>
Author: Nan Zhu <CodingCat@users.noreply.github.com>
Author: Nan Zhu <nanzhu@uber.com>

Closes #19864 from CodingCat/SPARK-22673.
2017-12-19 21:51:56 +08:00
Marcelo Vanzin 772e4648d9 [SPARK-20653][CORE] Add cleaning of old elements from the status store.
This change restores the functionality that keeps a limited number of
different types (jobs, stages, etc) depending on configuration, to avoid
the store growing indefinitely over time.

The feature is implemented by creating a new type (ElementTrackingStore)
that wraps a KVStore and allows triggers to be set up for when elements
of a certain type meet a certain threshold. Triggers don't need to
necessarily only delete elements, but the current API is set up in a way
that makes that use case easier.

The new store also has a trigger for the "close" call, which makes it
easier for listeners to register code for cleaning things up and flushing
partial state to the store.

The old configurations for cleaning up the stored elements from the core
and SQL UIs are now active again, and the old unit tests are re-enabled.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #19751 from vanzin/SPARK-20653.
2017-12-18 14:08:48 -06:00
Yuming Wang 7f6d10a737 [SPARK-22816][TEST] Basic tests for PromoteStrings and InConversion
## What changes were proposed in this pull request?

Test Coverage for `PromoteStrings` and `InConversion`, this is a Sub-tasks for [SPARK-22722](https://issues.apache.org/jira/browse/SPARK-22722).

## How was this patch tested?

N/A

Author: Yuming Wang <wgyumg@gmail.com>

Closes #20001 from wangyum/SPARK-22816.
2017-12-17 09:15:10 -08:00
Yuming Wang 46776234a4 [SPARK-22762][TEST] Basic tests for IfCoercion and CaseWhenCoercion
## What changes were proposed in this pull request?

Basic tests for IfCoercion and CaseWhenCoercion

## How was this patch tested?

N/A

Author: Yuming Wang <wgyumg@gmail.com>

Closes #19949 from wangyum/SPARK-22762.
2017-12-15 09:58:31 -08:00
Takeshi Yamamuro 9fafa8209c [SPARK-22800][TEST][SQL] Add a SSB query suite
## What changes were proposed in this pull request?
Add a test suite to ensure all the [SSB (Star Schema Benchmark)](https://www.cs.umb.edu/~poneil/StarSchemaB.PDF) queries can be successfully analyzed, optimized and compiled without hitting the max iteration threshold.

## How was this patch tested?
Added `SSBQuerySuite`.

Author: Takeshi Yamamuro <yamamuro@apache.org>

Closes #19990 from maropu/SPARK-22800.
2017-12-15 09:56:22 -08:00
Yuanjian Li 3775dd31ee [SPARK-22753][SQL] Get rid of dataSource.writeAndRead
## What changes were proposed in this pull request?

As the discussion in https://github.com/apache/spark/pull/16481 and https://github.com/apache/spark/pull/18975#discussion_r155454606
Currently the BaseRelation returned by `dataSource.writeAndRead` only used in `CreateDataSourceTableAsSelect`, planForWriting and writeAndRead has some common code paths.
In this patch I removed the writeAndRead function and added the getRelation function which only use in `CreateDataSourceTableAsSelectCommand` while saving data to non-existing table.

## How was this patch tested?

Existing UT

Author: Yuanjian Li <xyliyuanjian@gmail.com>

Closes #19941 from xuanyuanking/SPARK-22753.
2017-12-14 23:11:13 -08:00
gatorsmile 3fea5c4f19 [SPARK-22787][TEST][SQL] Add a TPC-H query suite
## What changes were proposed in this pull request?
Add a test suite to ensure all the TPC-H queries can be successfully analyzed, optimized and compiled without hitting the max iteration threshold.

## How was this patch tested?
N/A

Author: gatorsmile <gatorsmile@gmail.com>

Closes #19982 from gatorsmile/testTPCH.
2017-12-14 22:56:57 -08:00
Jose Torres 59daf91b7c [SPARK-22733] Split StreamExecution into MicroBatchExecution and StreamExecution.
## What changes were proposed in this pull request?

StreamExecution is now an abstract base class, which MicroBatchExecution (the current StreamExecution) inherits. When continuous processing is implemented, we'll have a new ContinuousExecution implementation of StreamExecution.

A few fields are also renamed to make them less microbatch-specific.

## How was this patch tested?

refactoring only

Author: Jose Torres <jose@databricks.com>

Closes #19926 from joseph-torres/continuous-refactor.
2017-12-14 14:31:21 -08:00
Prashant Sharma 40de176c93 [SPARK-16496][SQL] Add wholetext as option for reading text in SQL.
## What changes were proposed in this pull request?

In multiple text analysis problems, it is not often desirable for the rows to be split by "\n". There exists a wholeText reader for RDD API, and this JIRA just adds the same support for Dataset API.
## How was this patch tested?

Added relevant new tests for both scala and Java APIs

Author: Prashant Sharma <prashsh1@in.ibm.com>
Author: Prashant Sharma <prashant@apache.org>

Closes #14151 from ScrapCodes/SPARK-16496/wholetext.
2017-12-14 11:19:34 -08:00
Kazuaki Ishizaki 606ae491e4 [SPARK-22774][SQL][TEST] Add compilation check into TPCDSQuerySuite
## What changes were proposed in this pull request?

This PR adds check whether Java code generated by Catalyst can be compiled by `janino` correctly or not into `TPCDSQuerySuite`. Before this PR, this suite only checks whether analysis can be performed correctly or not.

This check will be able to avoid unexpected performance degrade by interpreter execution due to a Java compilation error.

## How was this patch tested?

Existing a test case, but updated it.

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes #19971 from kiszk/SPARK-22774.
2017-12-15 02:14:08 +08:00
Wenchen Fan d095795439 [SPARK-22785][SQL] remove ColumnVector.anyNullsSet
## What changes were proposed in this pull request?
`ColumnVector.anyNullsSet` is not called anywhere except tests, and we can easily replace it with `ColumnVector.numNulls > 0`

## How was this patch tested?

existing tests

Author: Wenchen Fan <wenchen@databricks.com>

Closes #19980 from cloud-fan/minor.
2017-12-15 00:29:44 +08:00
Wenchen Fan 7d8e2ca7f8 [SPARK-22775][SQL] move dictionary related APIs from ColumnVector to WritableColumnVector
## What changes were proposed in this pull request?

These dictionary related APIs are special to `WritableColumnVector` and should not be in `ColumnVector`, which will be public soon.

## How was this patch tested?

existing tests

Author: Wenchen Fan <wenchen@databricks.com>

Closes #19970 from cloud-fan/final.
2017-12-14 19:33:54 +08:00
Marcelo Vanzin c3dd2a26de [SPARK-22779][SQL] Resolve default values for fallback configs.
SQLConf allows some callers to define a custom default value for
configs, and that complicates a little bit the handling of fallback
config entries, since most of the default value resolution is
hidden by the config code.

This change peaks into the internals of these fallback configs
to figure out the correct default value, and also returns the
current human-readable default when showing the default value
(e.g. through "set -v").

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #19974 from vanzin/SPARK-22779.
2017-12-13 22:46:20 -08:00
Jose Torres f8c7c1f21a [SPARK-22732] Add Structured Streaming APIs to DataSourceV2
## What changes were proposed in this pull request?

This PR provides DataSourceV2 API support for structured streaming, including new pieces needed to support continuous processing [SPARK-20928]. High level summary:

- DataSourceV2 includes new mixins to support micro-batch and continuous reads and writes. For reads, we accept an optional user specified schema rather than using the ReadSupportWithSchema model, because doing so would severely complicate the interface.

- DataSourceV2Reader includes new interfaces to read a specific microbatch or read continuously from a given offset. These follow the same setter pattern as the existing Supports* mixins so that they can work with SupportsScanUnsafeRow.

- DataReader (the per-partition reader) has a new subinterface ContinuousDataReader only for continuous processing. This reader has a special method to check progress, and next() blocks for new input rather than returning false.

- Offset, an abstract representation of position in a streaming query, is ported to the public API. (Each type of reader will define its own Offset implementation.)

- DataSourceV2Writer has a new subinterface ContinuousWriter only for continuous processing. Commits to this interface come tagged with an epoch number, as the execution engine will continue to produce new epoch commits as the task continues indefinitely.

Note that this PR does not propose to change the existing DataSourceV2 batch API, or deprecate the existing streaming source/sink internal APIs in spark.sql.execution.streaming.

## How was this patch tested?

Toy implementations of the new interfaces with unit tests.

Author: Jose Torres <jose@databricks.com>

Closes #19925 from joseph-torres/continuous-api.
2017-12-13 22:31:39 -08:00
Wenchen Fan 2a29a60da3 Revert "[SPARK-22600][SQL] Fix 64kb limit for deeply nested expressions under wholestage codegen"
This reverts commit c7d0148615.
2017-12-14 11:22:23 +08:00
Wenchen Fan bc7e4a90c0 Revert "[SPARK-22600][SQL][FOLLOW-UP] Fix a compilation error in TPCDS q75/q77"
This reverts commit ef92999653.
2017-12-14 11:21:34 +08:00
Takeshi Yamamuro ef92999653 [SPARK-22600][SQL][FOLLOW-UP] Fix a compilation error in TPCDS q75/q77
## What changes were proposed in this pull request?
This pr fixed a compilation error of TPCDS `q75`/`q77`  caused by #19813;
```
  java.util.concurrent.ExecutionException: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 371, Column 16: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 371, Column 16: Expression "bhj_matched" is not an rvalue
  at com.google.common.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:306)
  at com.google.common.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:293)
  at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:116)
  at com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:135)
```

## How was this patch tested?
Manually checked `q75`/`q77` can be properly compiled

Author: Takeshi Yamamuro <yamamuro@apache.org>

Closes #19969 from maropu/SPARK-22600-FOLLOWUP.
2017-12-13 15:55:16 -08:00
Tejas Patil 682eb4f2ea [SPARK-22042][SQL] ReorderJoinPredicates can break when child's partitioning is not decided
## What changes were proposed in this pull request?

See jira description for the bug : https://issues.apache.org/jira/browse/SPARK-22042

Fix done in this PR is:  In `EnsureRequirements`, apply `ReorderJoinPredicates` over the input tree before doing its core logic. Since the tree is transformed bottom-up, we can assure that the children are resolved before doing `ReorderJoinPredicates`.

Theoretically this will guarantee to cover all such cases while keeping the code simple. My small grudge is for cosmetic reasons. This PR will look weird given that we don't call rules from other rules (not to my knowledge). I could have moved all the logic for `ReorderJoinPredicates` into `EnsureRequirements` but that will make it a but crowded. I am happy to discuss if there are better options.

## How was this patch tested?

Added a new test case

Author: Tejas Patil <tejasp@fb.com>

Closes #19257 from tejasapatil/SPARK-22042_ReorderJoinPredicates.
2017-12-12 23:30:06 -08:00
WeichenXu 0e36ba6212 [SPARK-22644][ML][TEST] Make ML testsuite support StructuredStreaming test
## What changes were proposed in this pull request?

We need to add some helper code to make testing ML transformers & models easier with streaming data. These tests might help us catch any remaining issues and we could encourage future PRs to use these tests to prevent new Models & Transformers from having issues.

I add a `MLTest` trait which extends `StreamTest` trait, and override `createSparkSession`. So ML testsuite can only extend `MLTest`, to use both ML & Stream test util functions.

I only modify one testcase in `LinearRegressionSuite`, for first pass review.

Link to #19746

## How was this patch tested?

`MLTestSuite` added.

Author: WeichenXu <weichen.xu@databricks.com>

Closes #19843 from WeichenXu123/ml_stream_test_helper.
2017-12-12 21:28:24 -08:00
Liang-Chi Hsieh c7d0148615 [SPARK-22600][SQL] Fix 64kb limit for deeply nested expressions under wholestage codegen
## What changes were proposed in this pull request?

SPARK-22543 fixes the 64kb compile error for deeply nested expression for non-wholestage codegen. This PR extends it to support wholestage codegen.

This patch brings some util methods in to extract necessary parameters for an expression if it is split to a function.

The util methods are put in object `ExpressionCodegen` under `codegen`. The main entry is `getExpressionInputParams` which returns all necessary parameters to evaluate the given expression in a split function.

This util methods can be used to split expressions too. This is a TODO item later.

## How was this patch tested?

Added test.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #19813 from viirya/reduce-expr-code-for-wholestage.
2017-12-13 10:40:05 +08:00
Marco Gaido 4117786a87 [SPARK-22716][SQL] Avoid the creation of mutable states in addReferenceObj
## What changes were proposed in this pull request?

We have two methods to reference an object `addReferenceMinorObj` and `addReferenceObj `. The latter creates a new global variable, which means new entries in the constant pool.

The PR unifies the two method in a single `addReferenceObj` which returns the code to access the object in the `references` array and doesn't add new mutable states.

## How was this patch tested?

added UTs.

Author: Marco Gaido <mgaido@hortonworks.com>

Closes #19916 from mgaido91/SPARK-22716.
2017-12-13 10:29:14 +08:00
Daniel van der Ende e6dc5f2807 [SPARK-22729][SQL] Add getTruncateQuery to JdbcDialect
In order to enable truncate for PostgreSQL databases in Spark JDBC, a change is needed to the query used for truncating a PostgreSQL table. By default, PostgreSQL will automatically truncate any descendant tables if a TRUNCATE query is executed. As this may result in (unwanted) side-effects, the query used for the truncate should be specified separately for PostgreSQL, specifying only to TRUNCATE a single table.

## What changes were proposed in this pull request?

Add `getTruncateQuery` function to `JdbcDialect.scala`, with default query. Overridden this function for PostgreSQL to only truncate a single table. Also sets `isCascadingTruncateTable` to false, as this will allow truncates for PostgreSQL.

## How was this patch tested?

Existing tests all pass. Added test for `getTruncateQuery`

Author: Daniel van der Ende <daniel.vanderende@gmail.com>

Closes #19911 from danielvdende/SPARK-22717.
2017-12-12 10:41:37 -08:00
gatorsmile a4002651a3 [SPARK-20557][SQL] Only support TIMESTAMP WITH TIME ZONE for Oracle Dialect
## What changes were proposed in this pull request?
In the previous PRs, https://github.com/apache/spark/pull/17832 and https://github.com/apache/spark/pull/17835 , we convert `TIMESTAMP WITH TIME ZONE` and `TIME WITH TIME ZONE` to `TIMESTAMP` for all the JDBC sources. However, this conversion could be risky since it does not respect our SQL configuration `spark.sql.session.timeZone`.

In addition, each vendor might have different semantics for these two types. For example, Postgres simply returns `TIMESTAMP` types for `TIMESTAMP WITH TIME ZONE`. For such supports, we should do it case by case. This PR reverts the general support of `TIMESTAMP WITH TIME ZONE` and `TIME WITH TIME ZONE` for JDBC sources, except ORACLE Dialect.

When supporting the ORACLE's `TIMESTAMP WITH TIME ZONE`, we only support it when the JVM default timezone is the same as the user-specified configuration `spark.sql.session.timeZone` (whose default is the JVM default timezone). Now, we still treat `TIMESTAMP WITH TIME ZONE` as `TIMESTAMP` when fetching the values via the Oracle JDBC connector, whose client converts the timestamp values with time zone to the timestamp values using the local JVM default timezone (a test case is added to `OracleIntegrationSuite.scala` in this PR for showing the behavior). Thus, to avoid any future behavior change, we will not support it if JVM default timezone is different from `spark.sql.session.timeZone`

No regression because the previous two PRs were just merged to be unreleased master branch.

## How was this patch tested?
Added the test cases

Author: gatorsmile <gatorsmile@gmail.com>

Closes #19939 from gatorsmile/timezoneUpdate.
2017-12-11 16:33:06 -08:00
gatorsmile 3d82f6eb78 [SPARK-22726][TEST] Basic tests for Binary Comparison and ImplicitTypeCasts
## What changes were proposed in this pull request?
Before we deliver the Hive compatibility mode, we plan to write a set of test cases that can be easily run in both Spark and Hive sides. We can easily compare whether they are the same or not. When new typeCoercion rules are added, we also can easily track the changes. These test cases can also be backported to the previous Spark versions for determining the changes we made.

This PR is the first attempt for improving the test coverage for type coercion compatibility. We generate these test cases for our binary comparison and ImplicitTypeCasts based on the Apache Derby test cases in https://github.com/apache/derby/blob/10.14/java/testing/org/apache/derbyTesting/functionTests/tests/lang/implicitConversions.sql

## How was this patch tested?
N/A

Author: gatorsmile <gatorsmile@gmail.com>

Closes #19918 from gatorsmile/typeCoercionTests.
2017-12-11 15:55:23 -08:00
Kazuaki Ishizaki c235b5f977 [SPARK-22746][SQL] Avoid the generation of useless mutable states by SortMergeJoin
## What changes were proposed in this pull request?

This PR reduce the number of global mutable variables in generated code of `SortMergeJoin`.

Before this PR, global mutable variables are used to extend lifetime of variables in the nested loop. This can be achieved by declaring variable at the outer most loop level where the variables are used.
In the following example, `smj_value8`, `smj_value8`, and `smj_value9` are declared as local variable at lines 145-147 in `With this PR`.

This PR fixes potential assertion error by #19865. Without this PR, a global mutable variable is potentially passed to arguments in generated code of split function.

Without this PR
```
/* 010 */   int smj_value8;
/* 011 */   boolean smj_value8;
/* 012 */   int smj_value9;
..
/* 143 */   protected void processNext() throws java.io.IOException {
/* 144 */     while (findNextInnerJoinRows(smj_leftInput, smj_rightInput)) {
/* 145 */       boolean smj_loaded = false;
/* 146 */       smj_isNull6 = smj_leftRow.isNullAt(1);
/* 147 */       smj_value9 = smj_isNull6 ? -1 : (smj_leftRow.getInt(1));
/* 148 */       scala.collection.Iterator<UnsafeRow> smj_iterator = smj_matches.generateIterator();
/* 149 */       while (smj_iterator.hasNext()) {
/* 150 */         InternalRow smj_rightRow1 = (InternalRow) smj_iterator.next();
/* 151 */         boolean smj_isNull8 = smj_rightRow1.isNullAt(1);
/* 152 */         int smj_value11 = smj_isNull8 ? -1 : (smj_rightRow1.getInt(1));
/* 153 */
/* 154 */         boolean smj_value12 = (smj_isNull6 && smj_isNull8) ||
/* 155 */         (!smj_isNull6 && !smj_isNull8 && smj_value9 == smj_value11);
/* 156 */         if (false || !smj_value12) continue;
/* 157 */         if (!smj_loaded) {
/* 158 */           smj_loaded = true;
/* 159 */           smj_value8 = smj_leftRow.getInt(0);
/* 160 */         }
/* 161 */         int smj_value10 = smj_rightRow1.getInt(0);
/* 162 */         smj_numOutputRows.add(1);
/* 163 */
/* 164 */         smj_rowWriter.zeroOutNullBytes();
/* 165 */
/* 166 */         smj_rowWriter.write(0, smj_value8);
/* 167 */
/* 168 */         if (smj_isNull6) {
/* 169 */           smj_rowWriter.setNullAt(1);
/* 170 */         } else {
/* 171 */           smj_rowWriter.write(1, smj_value9);
/* 172 */         }
/* 173 */
/* 174 */         smj_rowWriter.write(2, smj_value10);
/* 175 */
/* 176 */         if (smj_isNull8) {
/* 177 */           smj_rowWriter.setNullAt(3);
/* 178 */         } else {
/* 179 */           smj_rowWriter.write(3, smj_value11);
/* 180 */         }
/* 181 */         append(smj_result.copy());
/* 182 */
/* 183 */       }
/* 184 */       if (shouldStop()) return;
/* 185 */     }
/* 186 */   }
```

With this PR
```
/* 143 */   protected void processNext() throws java.io.IOException {
/* 144 */     while (findNextInnerJoinRows(smj_leftInput, smj_rightInput)) {
/* 145 */       int smj_value8 = -1;
/* 146 */       boolean smj_isNull6 = false;
/* 147 */       int smj_value9 = -1;
/* 148 */       boolean smj_loaded = false;
/* 149 */       smj_isNull6 = smj_leftRow.isNullAt(1);
/* 150 */       smj_value9 = smj_isNull6 ? -1 : (smj_leftRow.getInt(1));
/* 151 */       scala.collection.Iterator<UnsafeRow> smj_iterator = smj_matches.generateIterator();
/* 152 */       while (smj_iterator.hasNext()) {
/* 153 */         InternalRow smj_rightRow1 = (InternalRow) smj_iterator.next();
/* 154 */         boolean smj_isNull8 = smj_rightRow1.isNullAt(1);
/* 155 */         int smj_value11 = smj_isNull8 ? -1 : (smj_rightRow1.getInt(1));
/* 156 */
/* 157 */         boolean smj_value12 = (smj_isNull6 && smj_isNull8) ||
/* 158 */         (!smj_isNull6 && !smj_isNull8 && smj_value9 == smj_value11);
/* 159 */         if (false || !smj_value12) continue;
/* 160 */         if (!smj_loaded) {
/* 161 */           smj_loaded = true;
/* 162 */           smj_value8 = smj_leftRow.getInt(0);
/* 163 */         }
/* 164 */         int smj_value10 = smj_rightRow1.getInt(0);
/* 165 */         smj_numOutputRows.add(1);
/* 166 */
/* 167 */         smj_rowWriter.zeroOutNullBytes();
/* 168 */
/* 169 */         smj_rowWriter.write(0, smj_value8);
/* 170 */
/* 171 */         if (smj_isNull6) {
/* 172 */           smj_rowWriter.setNullAt(1);
/* 173 */         } else {
/* 174 */           smj_rowWriter.write(1, smj_value9);
/* 175 */         }
/* 176 */
/* 177 */         smj_rowWriter.write(2, smj_value10);
/* 178 */
/* 179 */         if (smj_isNull8) {
/* 180 */           smj_rowWriter.setNullAt(3);
/* 181 */         } else {
/* 182 */           smj_rowWriter.write(3, smj_value11);
/* 183 */         }
/* 184 */         append(smj_result.copy());
/* 185 */
/* 186 */       }
/* 187 */       if (shouldStop()) return;
/* 188 */     }
/* 189 */   }
```

## How was this patch tested?

Existing test cases

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes #19937 from kiszk/SPARK-22746.
2017-12-11 13:15:45 -08:00
Dongjoon Hyun 251b2c03b4 [SPARK-22672][SQL][TEST][FOLLOWUP] Fix to use spark.conf
## What changes were proposed in this pull request?

During https://github.com/apache/spark/pull/19882, `conf` is mistakenly used to switch ORC implementation between `native` and `hive`. To affect `OrcTest` correctly, `spark.conf` should be used.

## How was this patch tested?

Pass the tests.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #19931 from dongjoon-hyun/SPARK-22672-2.
2017-12-09 20:20:28 +09:00
Imran Rashid acf7ef3154 [SPARK-12297][SQL] Adjust timezone for int96 data from impala
## What changes were proposed in this pull request?

Int96 data written by impala vs data written by hive & spark is stored slightly differently -- they use a different offset for the timezone.  This adds an option "spark.sql.parquet.int96TimestampConversion" (false by default) to adjust timestamps if and only if the writer is impala (or more precisely, if the parquet file's "createdBy" metadata does not start with "parquet-mr").  This matches the existing behavior in hive from HIVE-9482.

## How was this patch tested?

Unit test added, existing tests run via jenkins.

Author: Imran Rashid <irashid@cloudera.com>
Author: Henry Robinson <henry@apache.org>

Closes #19769 from squito/SPARK-12297_skip_conversion.
2017-12-09 11:53:15 +09:00
Sunitha Kambhampati f88a67bf08 [SPARK-22452][SQL] Add getDouble to DataSourceV2Options
- Implemented getDouble method in DataSourceV2Options
- Add unit test

Author: Sunitha Kambhampati <skambha@us.ibm.com>

Closes #19921 from skambha/ds2.
2017-12-08 14:48:19 +08:00
Tathagata Das b11869bc3b [SPARK-22187][SS][REVERT] Revert change in state row format for mapGroupsWithState
## What changes were proposed in this pull request?

#19416 changed the format in which rows were encoded in the state store. However, this can break existing streaming queries with the old format in unpredictable ways (potentially crashing the JVM). Hence I am reverting this for now. This will be re-applied in the future after we start saving more metadata in checkpoints to signify which version of state row format the existing streaming query is running. Then we can decode old and new formats accordingly.

## How was this patch tested?
Existing tests.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #19924 from tdas/SPARK-22187-1.
2017-12-07 22:02:51 -08:00
Dongjoon Hyun 0ba8f4b211 [SPARK-21787][SQL] Support for pushing down filters for DateType in native OrcFileFormat
## What changes were proposed in this pull request?

This PR support for pushing down filters for DateType in ORC

## How was this patch tested?

Pass the Jenkins with newly add and updated test cases.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #18995 from dongjoon-hyun/SPARK-21787.
2017-12-08 09:52:16 +08:00
kellyzly f41c0a93fd [SPARK-22660][BUILD] Use position() and limit() to fix ambiguity issue in scala-2.12
…a-2.12 and JDK9

## What changes were proposed in this pull request?
Some compile error after upgrading to scala-2.12
```javascript
spark_source/core/src/main/scala/org/apache/spark/executor/Executor.scala:455: ambiguous reference to overloaded definition, method limit in class ByteBuffer of type (x$1: Int)java.nio.ByteBuffer
method limit in class Buffer of type ()Int
match expected type ?
     val resultSize = serializedDirectResult.limit
error
```
The limit method was moved from ByteBuffer to the superclass Buffer and it can no longer be called without (). The same reason for position method.

```javascript
/home/zly/prj/oss/jdk9_HOS_SOURCE/spark_source/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformationExec.scala:427: ambiguous reference to overloaded definition, [error] both method putAll in class Properties of type (x$1: java.util.Map[_, _])Unit [error] and  method putAll in class Hashtable of type (x$1: java.util.Map[_ <: Object, _ <: Object])Unit [error] match argument types (java.util.Map[String,String])
 [error]       props.putAll(outputSerdeProps.toMap.asJava)
 [error]             ^
 ```
This is because the key type is Object instead of String which is unsafe.

## How was this patch tested?

running tests

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: kellyzly <kellyzly@126.com>

Closes #19854 from kellyzly/SPARK-22660.
2017-12-07 10:04:04 -06:00
Dongjoon Hyun dd59a4be36 [SPARK-22712][SQL] Use buildReaderWithPartitionValues in native OrcFileFormat
## What changes were proposed in this pull request?

To support vectorization in native OrcFileFormat later, we need to use `buildReaderWithPartitionValues` instead of `buildReader` like ParquetFileFormat. This PR replaces `buildReader` with `buildReaderWithPartitionValues`.

## How was this patch tested?

Pass the Jenkins with the existing test cases.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #19907 from dongjoon-hyun/SPARK-ORC-BUILD-READER.
2017-12-07 21:08:15 +08:00
Sunitha Kambhampati 2be448260d [SPARK-22452][SQL] Add getInt, getLong, getBoolean to DataSourceV2Options
- Implemented methods getInt, getLong, getBoolean for DataSourceV2Options
- Added new unit tests to exercise these methods

Author: Sunitha Kambhampati <skambha@us.ibm.com>

Closes #19902 from skambha/spark22452.
2017-12-07 20:59:47 +08:00
Wenchen Fan e103adf45a [SPARK-22703][SQL] make ColumnarRow an immutable view
## What changes were proposed in this pull request?

Similar to https://github.com/apache/spark/pull/19842 , we should also make `ColumnarRow` an immutable view, and move forward to make `ColumnVector` public.

## How was this patch tested?

Existing tests.

The performance concern should be same as https://github.com/apache/spark/pull/19842 .

Author: Wenchen Fan <wenchen@databricks.com>

Closes #19898 from cloud-fan/row-id.
2017-12-07 20:45:11 +08:00
Dongjoon Hyun c1e5688d1a [SPARK-22672][SQL][TEST] Refactor ORC Tests
## What changes were proposed in this pull request?

Since SPARK-20682, we have two `OrcFileFormat`s. This PR refactors ORC tests with three principles (with a few exceptions)
1. Move test suite into `sql/core`.
2. Create `HiveXXX` test suite in `sql/hive` by reusing `sql/core` test suite.
3. `OrcTest` will provide common helper functions and `val orcImp: String`.

**Test Suites**

*Native OrcFileFormat*
- org.apache.spark.sql.hive.orc
  - OrcFilterSuite
  - OrcPartitionDiscoverySuite
  - OrcQuerySuite
  - OrcSourceSuite
- o.a.s.sql.hive.orc
  - OrcHadoopFsRelationSuite

*Hive built-in OrcFileFormat*

- o.a.s.sql.hive.orc
  - HiveOrcFilterSuite
  - HiveOrcPartitionDiscoverySuite
  - HiveOrcQuerySuite
  - HiveOrcSourceSuite
  - HiveOrcHadoopFsRelationSuite

**Hierarchy**
```
OrcTest
    -> OrcSuite
        -> OrcSourceSuite
    -> OrcQueryTest
        -> OrcQuerySuite
    -> OrcPartitionDiscoveryTest
        -> OrcPartitionDiscoverySuite
    -> OrcFilterSuite

HadoopFsRelationTest
    -> OrcHadoopFsRelationSuite
        -> HiveOrcHadoopFsRelationSuite
```

Please note the followings.
- Unlike the other test suites, `OrcHadoopFsRelationSuite` doesn't inherit `OrcTest`. It is inside `sql/hive` like `ParquetHadoopFsRelationSuite` due to the dependencies and follows the existing convention to use `val dataSourceName: String`
- `OrcFilterSuite`s cannot reuse test cases due to the different function signatures using Hive 1.2.1 ORC classes and Apache ORC 1.4.1 classes.

## How was this patch tested?

Pass the Jenkins tests with reorganized test suites.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #19882 from dongjoon-hyun/SPARK-22672.
2017-12-07 20:42:46 +08:00
Kazuaki Ishizaki 8ae004b460 [SPARK-22688][SQL] Upgrade Janino version to 3.0.8
## What changes were proposed in this pull request?

This PR upgrade Janino version to 3.0.8. [Janino 3.0.8](https://janino-compiler.github.io/janino/changelog.html) includes an important fix to reduce the number of constant pool entries by using 'sipush' java bytecode.

* SIPUSH bytecode is not used for short integer constant [#33](https://github.com/janino-compiler/janino/issues/33).

Please see detail in [this discussion thread](https://github.com/apache/spark/pull/19518#issuecomment-346674976).

## How was this patch tested?

Existing tests

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes #19890 from kiszk/SPARK-22688.
2017-12-06 16:15:25 -08:00
smurakozi 9948b860ac [SPARK-22516][SQL] Bump up Univocity version to 2.5.9
## What changes were proposed in this pull request?

There was a bug in Univocity Parser that causes the issue in SPARK-22516. This was fixed by upgrading from 2.5.4 to 2.5.9 version of the library :

**Executing**
```
spark.read.option("header","true").option("inferSchema", "true").option("multiLine", "true").option("comment", "g").csv("test_file_without_eof_char.csv").show()
```
**Before**
```
ERROR Executor: Exception in task 0.0 in stage 6.0 (TID 6)
com.univocity.parsers.common.TextParsingException: java.lang.IllegalArgumentException - Unable to skip 1 lines from line 2. End of input reached
...
Internal state when error was thrown: line=3, column=0, record=2, charIndex=31
	at com.univocity.parsers.common.AbstractParser.handleException(AbstractParser.java:339)
	at com.univocity.parsers.common.AbstractParser.parseNext(AbstractParser.java:475)
	at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anon$1.next(UnivocityParser.scala:281)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
```
**After**
```
+-------+-------+
|column1|column2|
+-------+-------+
|    abc|    def|
+-------+-------+
```

## How was this patch tested?
The already existing `CSVSuite.commented lines in CSV data` test was extended to parse the file also in multiline mode. The test input file was modified to also include a comment in the last line.

Author: smurakozi <smurakozi@gmail.com>

Closes #19906 from smurakozi/SPARK-22516.
2017-12-06 13:22:08 -08:00
Dongjoon Hyun fb6a922751 [SPARK-20728][SQL][FOLLOWUP] Use an actionable exception message
## What changes were proposed in this pull request?

This is a follow-up of https://github.com/apache/spark/pull/19871 to improve an exception message.

## How was this patch tested?

Pass the Jenkins.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #19903 from dongjoon-hyun/orc_exception.
2017-12-06 20:20:20 +09:00
Liang-Chi Hsieh 00d176d2fe [SPARK-20392][SQL] Set barrier to prevent re-entering a tree
## What changes were proposed in this pull request?

The SQL `Analyzer` goes through a whole query plan even most part of it is analyzed. This increases the time spent on query analysis for long pipelines in ML, especially.

This patch adds a logical node called `AnalysisBarrier` that wraps an analyzed logical plan to prevent it from analysis again. The barrier is applied to the analyzed logical plan in `Dataset`. It won't change the output of wrapped logical plan and just acts as a wrapper to hide it from analyzer. New operations on the dataset will be put on the barrier, so only the new nodes created will be analyzed.

This analysis barrier will be removed at the end of analysis stage.

## How was this patch tested?

Added tests.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #19873 from viirya/SPARK-20392-reopen.
2017-12-05 21:43:41 -08:00
Dongjoon Hyun 82183f7b57 [SPARK-22686][SQL] DROP TABLE IF EXISTS should not show AnalysisException
## What changes were proposed in this pull request?

During [SPARK-22488](https://github.com/apache/spark/pull/19713) to fix view resolution issue, there occurs a regression at `2.2.1` and `master` branch like the following. This PR fixes that.

```scala
scala> spark.version
res2: String = 2.2.1

scala> sql("DROP TABLE IF EXISTS t").show
17/12/04 21:01:06 WARN DropTableCommand: org.apache.spark.sql.AnalysisException:
Table or view not found: t;
org.apache.spark.sql.AnalysisException: Table or view not found: t;
```

## How was this patch tested?

Manual.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #19888 from dongjoon-hyun/SPARK-22686.
2017-12-06 10:52:29 +08:00
Dongjoon Hyun 326f1d6728 [SPARK-20728][SQL] Make OrcFileFormat configurable between sql/hive and sql/core
## What changes were proposed in this pull request?

This PR aims to provide a configuration to choose the default `OrcFileFormat` from legacy `sql/hive` module or new `sql/core` module.

For example, this configuration will affects the following operations.
```scala
spark.read.orc(...)
```
```sql
CREATE TABLE t
USING ORC
...
```

## How was this patch tested?

Pass the Jenkins with new test suites.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #19871 from dongjoon-hyun/spark-sql-orc-enabled.
2017-12-05 20:46:35 +08:00
gatorsmile 53e5251bb3 [SPARK-22675][SQL] Refactoring PropagateTypes in TypeCoercion
## What changes were proposed in this pull request?
PropagateTypes are called twice in TypeCoercion. We do not need to call it twice. Instead, we should call it after each change on the types.

## How was this patch tested?
The existing tests

Author: gatorsmile <gatorsmile@gmail.com>

Closes #19874 from gatorsmile/deduplicatePropagateTypes.
2017-12-05 20:43:02 +08:00
Wenchen Fan 295df746ec [SPARK-22677][SQL] cleanup whole stage codegen for hash aggregate
## What changes were proposed in this pull request?

The `HashAggregateExec` whole stage codegen path is a little messy and hard to understand, this code cleans it up a little bit, especially for the fast hash map part.

## How was this patch tested?

existing tests

Author: Wenchen Fan <wenchen@databricks.com>

Closes #19869 from cloud-fan/hash-agg.
2017-12-05 12:38:26 +08:00
Dongjoon Hyun f23dddf105 [SPARK-20682][SPARK-15474][SPARK-21791] Add new ORCFileFormat based on ORC 1.4.1
## What changes were proposed in this pull request?

Since [SPARK-2883](https://issues.apache.org/jira/browse/SPARK-2883), Apache Spark supports Apache ORC inside `sql/hive` module with Hive dependency. This PR aims to add a new ORC data source inside `sql/core` and to replace the old ORC data source eventually. This PR resolves the following three issues.

- [SPARK-20682](https://issues.apache.org/jira/browse/SPARK-20682): Add new ORCFileFormat based on Apache ORC 1.4.1
- [SPARK-15474](https://issues.apache.org/jira/browse/SPARK-15474): ORC data source fails to write and read back empty dataframe
- [SPARK-21791](https://issues.apache.org/jira/browse/SPARK-21791): ORC should support column names with dot

## How was this patch tested?

Pass the Jenkins with the existing all tests and new tests for SPARK-15474 and SPARK-21791.

Author: Dongjoon Hyun <dongjoon@apache.org>
Author: Wenchen Fan <wenchen@databricks.com>

Closes #19651 from dongjoon-hyun/SPARK-20682.
2017-12-03 22:21:44 +08:00
Shixiong Zhu ee10ca7ec6 [SPARK-22638][SS] Use a separate queue for StreamingQueryListenerBus
## What changes were proposed in this pull request?

Use a separate Spark event queue for StreamingQueryListenerBus so that if there are many non-streaming events, streaming query listeners don't need to wait for other Spark listeners and can catch up.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu <zsxwing@gmail.com>

Closes #19838 from zsxwing/SPARK-22638.
2017-12-01 13:02:03 -08:00
sujith71955 16adaf634b [SPARK-22601][SQL] Data load is getting displayed successful on providing non existing nonlocal file path
## What changes were proposed in this pull request?
When user tries to load data with a non existing hdfs file path system is not validating it and the load command operation is getting successful.
This is misleading to the user. already there is a validation in the scenario of none existing local file path. This PR has added validation in the scenario of nonexisting hdfs file path
## How was this patch tested?
UT has been added for verifying the issue, also snapshots has been added after the verification in a spark yarn cluster

Author: sujith71955 <sujithchacko.2010@gmail.com>

Closes #19823 from sujith71955/master_LoadComand_Issue.
2017-11-30 20:45:30 -08:00
Adrian Ionescu f5f8e84d9d [SPARK-22614] Dataset API: repartitionByRange(...)
## What changes were proposed in this pull request?

This PR introduces a way to explicitly range-partition a Dataset. So far, only round-robin and hash partitioning were possible via `df.repartition(...)`, but sometimes range partitioning might be desirable: e.g. when writing to disk, for better compression without the cost of global sort.

The current implementation piggybacks on the existing `RepartitionByExpression` `LogicalPlan` and simply adds the following logic: If its expressions are of type `SortOrder`, then it will do `RangePartitioning`; otherwise `HashPartitioning`. This was by far the least intrusive solution I could come up with.

## How was this patch tested?
Unit test for `RepartitionByExpression` changes, a test to ensure we're not changing the behavior of existing `.repartition()` and a few end-to-end tests in `DataFrameSuite`.

Author: Adrian Ionescu <adrian@databricks.com>

Closes #19828 from adrian-ionescu/repartitionByRange.
2017-11-30 15:41:34 -08:00
Yuming Wang bcceab6495 [SPARK-22489][SQL] Shouldn't change broadcast join buildSide if user clearly specified
## What changes were proposed in this pull request?

How to reproduce:
```scala
import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec

spark.createDataFrame(Seq((1, "4"), (2, "2"))).toDF("key", "value").createTempView("table1")
spark.createDataFrame(Seq((1, "1"), (2, "2"))).toDF("key", "value").createTempView("table2")

val bl = sql("SELECT /*+ MAPJOIN(t1) */ * FROM table1 t1 JOIN table2 t2 ON t1.key = t2.key").queryExecution.executedPlan

println(bl.children.head.asInstanceOf[BroadcastHashJoinExec].buildSide)
```
The result is `BuildRight`, but should be `BuildLeft`. This PR fix this issue.
## How was this patch tested?

unit tests

Author: Yuming Wang <wgyumg@gmail.com>

Closes #19714 from wangyum/SPARK-22489.
2017-11-30 15:36:26 -08:00
Wenchen Fan 9c29c55763 [SPARK-22643][SQL] ColumnarArray should be an immutable view
## What changes were proposed in this pull request?

To make `ColumnVector` public, `ColumnarArray` need to be public too, and we should not have mutable public fields in a public class. This PR proposes to make `ColumnarArray` an immutable view of the data, and always create a new instance of `ColumnarArray` in `ColumnVector#getArray`

## How was this patch tested?

new benchmark in `ColumnarBatchBenchmark`

Author: Wenchen Fan <wenchen@databricks.com>

Closes #19842 from cloud-fan/column-vector.
2017-11-30 18:34:38 +08:00
Wenchen Fan 444a2bbb67 [SPARK-22652][SQL] remove set methods in ColumnarRow
## What changes were proposed in this pull request?

As a step to make `ColumnVector` public, the `ColumnarRow` returned by `ColumnVector#getStruct` should be immutable.

However we do need the mutability of `ColumnaRow` for the fast vectorized hashmap in hash aggregate. To solve this, this PR introduces a `MutableColumnarRow` for this use case.

## How was this patch tested?

existing test.

Author: Wenchen Fan <wenchen@databricks.com>

Closes #19847 from cloud-fan/mutable-row.
2017-11-30 18:28:58 +08:00
Wang Gengliang 57687280d4 [SPARK-22615][SQL] Handle more cases in PropagateEmptyRelation
## What changes were proposed in this pull request?

Currently, in the optimize rule `PropagateEmptyRelation`, the following cases is not handled:
1.  empty relation as right child in left outer join
2. empty relation as left child in right outer join
3. empty relation as right child  in left semi join
4. empty relation as right child  in left anti join
5. only one empty relation in full outer join

case 1 / 2 / 5 can be treated as **Cartesian product** and cause exception. See the new test cases.

## How was this patch tested?
Unit test

Author: Wang Gengliang <ltnwgl@gmail.com>

Closes #19825 from gengliangwang/SPARK-22615.
2017-11-29 09:17:39 -08:00
Wenchen Fan 20b239845b [SPARK-22605][SQL] SQL write job should also set Spark task output metrics
## What changes were proposed in this pull request?

For SQL write jobs, we only set metrics for the SQL listener and display them in the SQL plan UI. We should also set metrics for Spark task output metrics, which will be shown in spark job UI.

## How was this patch tested?

test it manually. For a simple write job
```
spark.range(1000).write.parquet("/tmp/p1")
```
now the spark job UI looks like
![ui](https://user-images.githubusercontent.com/3182036/33326478-05a25b7c-d490-11e7-96ef-806117774356.jpg)

Author: Wenchen Fan <wenchen@databricks.com>

Closes #19833 from cloud-fan/ui.
2017-11-29 19:18:47 +08:00
Herman van Hovell 475a29f11e [SPARK-22637][SQL] Only refresh a logical plan once.
## What changes were proposed in this pull request?
`CatalogImpl.refreshTable` uses `foreach(..)` to refresh all tables in a view. This traverses all nodes in the subtree and calls `LogicalPlan.refresh()` on these nodes. However `LogicalPlan.refresh()` is also refreshing its children, as a result refreshing a large view can be quite expensive.

This PR just calls `LogicalPlan.refresh()` on the top node.

## How was this patch tested?
Existing tests.

Author: Herman van Hovell <hvanhovell@databricks.com>

Closes #19837 from hvanhovell/SPARK-22637.
2017-11-28 16:03:47 -08:00
Sunitha Kambhampati a10b328dbc [SPARK-22431][SQL] Ensure that the datatype in the schema for the table/view metadata is parseable by Spark before persisting it
## What changes were proposed in this pull request?
* JIRA:  [SPARK-22431](https://issues.apache.org/jira/browse/SPARK-22431)  : Creating Permanent view with illegal type

**Description:**
- It is possible in Spark SQL to create a permanent view that uses an nested field with an illegal name.
- For example if we create the following view:
```create view x as select struct('a' as `$q`, 1 as b) q```
- A simple select fails with the following exception:

```
select * from x;

org.apache.spark.SparkException: Cannot recognize hive type string: struct<$q:string,b:int>
  at org.apache.spark.sql.hive.client.HiveClientImpl$.fromHiveColumn(HiveClientImpl.scala:812)
  at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getTableOption$1$$anonfun$apply$11$$anonfun$7.apply(HiveClientImpl.scala:378)
  at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getTableOption$1$$anonfun$apply$11$$anonfun$7.apply(HiveClientImpl.scala:378)
...
```
**Issue/Analysis**: Right now, we can create a view with a schema that cannot be read back by Spark from the Hive metastore.  For more details, please see the discussion about the analysis and proposed fix options in comment 1 and comment 2 in the [SPARK-22431](https://issues.apache.org/jira/browse/SPARK-22431)

**Proposed changes**:
 - Fix the hive table/view codepath to check whether the schema datatype is parseable by Spark before persisting it in the metastore. This change is localized to HiveClientImpl to do the check similar to the check in FromHiveColumn. This is fail-fast and we will avoid the scenario where we write something to the metastore that we are unable to read it back.
- Added new unit tests
- Ran the sql related unit test suites ( hive/test, sql/test, catalyst/test) OK

With the fix:
```
create view x as select struct('a' as `$q`, 1 as b) q;
17/11/28 10:44:55 ERROR SparkSQLDriver: Failed in [create view x as select struct('a' as `$q`, 1 as b) q]
org.apache.spark.SparkException: Cannot recognize hive type string: struct<$q:string,b:int>
	at org.apache.spark.sql.hive.client.HiveClientImpl$.org$apache$spark$sql$hive$client$HiveClientImpl$$getSparkSQLDataType(HiveClientImpl.scala:884)
	at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$org$apache$spark$sql$hive$client$HiveClientImpl$$verifyColumnDataType$1.apply(HiveClientImpl.scala:906)
	at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$org$apache$spark$sql$hive$client$HiveClientImpl$$verifyColumnDataType$1.apply(HiveClientImpl.scala:906)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
...
```
## How was this patch tested?
- New unit tests have been added.

hvanhovell, Please review and share your thoughts/comments.  Thank you so much.

Author: Sunitha Kambhampati <skambha@us.ibm.com>

Closes #19747 from skambha/spark22431.
2017-11-28 22:01:01 +01:00
Zhenhua Wang da35574297 [SPARK-22515][SQL] Estimation relation size based on numRows * rowSize
## What changes were proposed in this pull request?

Currently, relation size is computed as the sum of file size, which is error-prone because storage format like parquet may have a much smaller file size compared to in-memory size. When we choose broadcast join based on file size, there's a risk of OOM. But if the number of rows is available in statistics, we can get a better estimation by `numRows * rowSize`, which helps to alleviate this problem.

## How was this patch tested?

Added a new test case for data source table and hive table.

Author: Zhenhua Wang <wzh_zju@163.com>
Author: Zhenhua Wang <wangzhenhua@huawei.com>

Closes #19743 from wzhfy/better_leaf_size.
2017-11-28 11:43:21 -08:00
Takuya UESHIN 64817c423c [SPARK-22395][SQL][PYTHON] Fix the behavior of timestamp values for Pandas to respect session timezone
## What changes were proposed in this pull request?

When converting Pandas DataFrame/Series from/to Spark DataFrame using `toPandas()` or pandas udfs, timestamp values behave to respect Python system timezone instead of session timezone.

For example, let's say we use `"America/Los_Angeles"` as session timezone and have a timestamp value `"1970-01-01 00:00:01"` in the timezone. Btw, I'm in Japan so Python timezone would be `"Asia/Tokyo"`.

The timestamp value from current `toPandas()` will be the following:

```
>>> spark.conf.set("spark.sql.session.timeZone", "America/Los_Angeles")
>>> df = spark.createDataFrame([28801], "long").selectExpr("timestamp(value) as ts")
>>> df.show()
+-------------------+
|                 ts|
+-------------------+
|1970-01-01 00:00:01|
+-------------------+

>>> df.toPandas()
                   ts
0 1970-01-01 17:00:01
```

As you can see, the value becomes `"1970-01-01 17:00:01"` because it respects Python timezone.
As we discussed in #18664, we consider this behavior is a bug and the value should be `"1970-01-01 00:00:01"`.

## How was this patch tested?

Added tests and existing tests.

Author: Takuya UESHIN <ueshin@databricks.com>

Closes #19607 from ueshin/issues/SPARK-22395.
2017-11-28 16:45:22 +08:00
gaborgsomogyi 33d43bf1b6 [SPARK-22484][DOC] Document PySpark DataFrame csv writer behavior whe…
## What changes were proposed in this pull request?

In PySpark API Document, DataFrame.write.csv() says that setting the quote parameter to an empty string should turn off quoting. Instead, it uses the [null character](https://en.wikipedia.org/wiki/Null_character) as the quote.

This PR fixes the doc.

## How was this patch tested?

Manual.

```
cd python/docs
make html
open _build/html/pyspark.sql.html
```

Author: gaborgsomogyi <gabor.g.somogyi@gmail.com>

Closes #19814 from gaborgsomogyi/SPARK-22484.
2017-11-28 10:14:35 +09:00
Marco Gaido 087879a77a [SPARK-22520][SQL] Support code generation for large CaseWhen
## What changes were proposed in this pull request?

Code generation is disabled for CaseWhen when the number of branches is higher than `spark.sql.codegen.maxCaseBranches` (which defaults to 20). This was done to prevent the well known 64KB method limit exception.
This PR proposes to support code generation also in those cases (without causing exceptions of course). As a side effect, we could get rid of the `spark.sql.codegen.maxCaseBranches` configuration.

## How was this patch tested?

existing UTs

Author: Marco Gaido <mgaido@hortonworks.com>
Author: Marco Gaido <marcogaido91@gmail.com>

Closes #19752 from mgaido91/SPARK-22520.
2017-11-28 07:46:18 +08:00
Zhenhua Wang 1ff4a77be4 [SPARK-22529][SQL] Relation stats should be consistent with other plans based on cbo config
## What changes were proposed in this pull request?

Currently, relation stats is the same whether cbo is enabled or not. While relation (`LogicalRelation` or `HiveTableRelation`) is a `LogicalPlan`, its behavior is inconsistent with other plans. This can cause confusion when user runs EXPLAIN COST commands. Besides, when CBO is disabled, we apply the size-only estimation strategy, so there's no need to propagate other catalog statistics to relation.

## How was this patch tested?

Enhanced existing tests case and added a test case.

Author: Zhenhua Wang <wangzhenhua@huawei.com>

Closes #19757 from wzhfy/catalog_stats_conversion.
2017-11-28 01:13:44 +08:00
Wenchen Fan 5a02e3a2ac [SPARK-22602][SQL] remove ColumnVector#loadBytes
## What changes were proposed in this pull request?

`ColumnVector#loadBytes` is only used as an optimization for reading UTF8String in `WritableColumnVector`, this PR moves this optimization to `WritableColumnVector` and simplified it.

## How was this patch tested?

existing test

Author: Wenchen Fan <wenchen@databricks.com>

Closes #19815 from cloud-fan/load-bytes.
2017-11-26 21:49:09 -08:00
Sean Owen fba63c1a7b [SPARK-22607][BUILD] Set large stack size consistently for tests to avoid StackOverflowError
## What changes were proposed in this pull request?

Set `-ea` and `-Xss4m` consistently for tests, to fix in particular:

```
OrderingSuite:
...
- GenerateOrdering with ShortType
*** RUN ABORTED ***
java.lang.StackOverflowError:
at org.codehaus.janino.CodeContext.flowAnalysis(CodeContext.java:370)
at org.codehaus.janino.CodeContext.flowAnalysis(CodeContext.java:541)
at org.codehaus.janino.CodeContext.flowAnalysis(CodeContext.java:541)
at org.codehaus.janino.CodeContext.flowAnalysis(CodeContext.java:541)
at org.codehaus.janino.CodeContext.flowAnalysis(CodeContext.java:541)
at org.codehaus.janino.CodeContext.flowAnalysis(CodeContext.java:541)
at org.codehaus.janino.CodeContext.flowAnalysis(CodeContext.java:541)
at org.codehaus.janino.CodeContext.flowAnalysis(CodeContext.java:541)
...
```

## How was this patch tested?

Existing tests. Manually verified it resolves the StackOverflowError this intends to resolve.

Author: Sean Owen <sowen@cloudera.com>

Closes #19820 from srowen/SPARK-22607.
2017-11-26 07:42:44 -06:00
Wenchen Fan e3fd93f149 [SPARK-22604][SQL] remove the get address methods from ColumnVector
## What changes were proposed in this pull request?

`nullsNativeAddress` and `valuesNativeAddress` are only used in tests and benchmark, no need to be top class API.

## How was this patch tested?

existing tests

Author: Wenchen Fan <wenchen@databricks.com>

Closes #19818 from cloud-fan/minor.
2017-11-24 22:43:47 -08:00
Wenchen Fan 70221903f5 [SPARK-22596][SQL] set ctx.currentVars in CodegenSupport.consume
## What changes were proposed in this pull request?

`ctx.currentVars` means the input variables for the current operator, which is already decided in `CodegenSupport`, we can set it there instead of `doConsume`.

also add more comments to help people understand the codegen framework.

After this PR, we now have a principle about setting `ctx.currentVars` and `ctx.INPUT_ROW`:
1. for non-whole-stage-codegen path, never set them. (permit some special cases like generating ordering)
2. for whole-stage-codegen `produce` path, mostly we don't need to set them, but blocking operators may need to set them for expressions that produce data from data source, sort buffer, aggregate buffer, etc.
3. for whole-stage-codegen `consume` path, mostly we don't need to set them because `currentVars` is automatically set to child input variables and `INPUT_ROW` is mostly not used. A few plans need to tweak them as they may have different inputs, or they use the input row.

## How was this patch tested?

existing tests.

Author: Wenchen Fan <wenchen@databricks.com>

Closes #19803 from cloud-fan/codegen.
2017-11-24 21:50:30 -08:00
Wenchen Fan 0605ad7614 [SPARK-22543][SQL] fix java 64kb compile error for deeply nested expressions
## What changes were proposed in this pull request?

A frequently reported issue of Spark is the Java 64kb compile error. This is because Spark generates a very big method and it's usually caused by 3 reasons:

1. a deep expression tree, e.g. a very complex filter condition
2. many individual expressions, e.g. expressions can have many children, operators can have many expressions.
3. a deep query plan tree (with whole stage codegen)

This PR focuses on 1. There are already several patches(#15620  #18972 #18641) trying to fix this issue and some of them are already merged. However this is an endless job as every non-leaf expression has this issue.

This PR proposes to fix this issue in `Expression.genCode`, to make sure the code for a single expression won't grow too big.

According to maropu 's benchmark, no regression is found with TPCDS (thanks maropu !): https://docs.google.com/spreadsheets/d/1K3_7lX05-ZgxDXi9X_GleNnDjcnJIfoSlSCDZcL4gdg/edit?usp=sharing

## How was this patch tested?

existing test

Author: Wenchen Fan <wenchen@databricks.com>
Author: Wenchen Fan <cloud0fan@gmail.com>

Closes #19767 from cloud-fan/codegen.
2017-11-22 10:05:46 -08:00
Takeshi Yamamuro 2c0fe818a6 [SPARK-22445][SQL][FOLLOW-UP] Respect stream-side child's needCopyResult in BroadcastHashJoin
## What changes were proposed in this pull request?
I found #19656 causes some bugs, for example, it changed the result set of `q6` in tpcds (I keep tracking TPCDS results daily [here](https://github.com/maropu/spark-tpcds-datagen/tree/master/reports/tests)):
- w/o pr19658
```
+-----+---+
|state|cnt|
+-----+---+
|   MA| 10|
|   AK| 10|
|   AZ| 11|
|   ME| 13|
|   VT| 14|
|   NV| 15|
|   NH| 16|
|   UT| 17|
|   NJ| 21|
|   MD| 22|
|   WY| 25|
|   NM| 26|
|   OR| 31|
|   WA| 36|
|   ND| 38|
|   ID| 39|
|   SC| 45|
|   WV| 50|
|   FL| 51|
|   OK| 53|
|   MT| 53|
|   CO| 57|
|   AR| 58|
|   NY| 58|
|   PA| 62|
|   AL| 63|
|   LA| 63|
|   SD| 70|
|   WI| 80|
| null| 81|
|   MI| 82|
|   NC| 82|
|   MS| 83|
|   CA| 84|
|   MN| 85|
|   MO| 88|
|   IL| 95|
|   IA|102|
|   TN|102|
|   IN|103|
|   KY|104|
|   NE|113|
|   OH|114|
|   VA|130|
|   KS|139|
|   GA|168|
|   TX|216|
+-----+---+
```
- w/   pr19658
```
+-----+---+
|state|cnt|
+-----+---+
|   RI| 14|
|   AK| 16|
|   FL| 20|
|   NJ| 21|
|   NM| 21|
|   NV| 22|
|   MA| 22|
|   MD| 22|
|   UT| 22|
|   AZ| 25|
|   SC| 28|
|   AL| 36|
|   MT| 36|
|   WA| 39|
|   ND| 41|
|   MI| 44|
|   AR| 45|
|   OR| 47|
|   OK| 52|
|   PA| 53|
|   LA| 55|
|   CO| 55|
|   NY| 64|
|   WV| 66|
|   SD| 72|
|   MS| 73|
|   NC| 79|
|   IN| 82|
| null| 85|
|   ID| 88|
|   MN| 91|
|   WI| 95|
|   IL| 96|
|   MO| 97|
|   CA|109|
|   CA|109|
|   TN|114|
|   NE|115|
|   KY|128|
|   OH|131|
|   IA|156|
|   TX|160|
|   VA|182|
|   KS|211|
|   GA|230|
+-----+---+
```
This pr is to keep the original logic of `CodegenContext.copyResult` in `BroadcastHashJoinExec`.

## How was this patch tested?
Existing tests

Author: Takeshi Yamamuro <yamamuro@apache.org>

Closes #19781 from maropu/SPARK-22445-bugfix.
2017-11-22 09:09:50 +01:00
Jia Li 881c5c8073 [SPARK-22548][SQL] Incorrect nested AND expression pushed down to JDBC data source
## What changes were proposed in this pull request?

Let’s say I have a nested AND expression shown below and p2 can not be pushed down,

(p1 AND p2) OR p3

In current Spark code, during data source filter translation, (p1 AND p2) is returned as p1 only and p2 is simply lost. This issue occurs with JDBC data source and is similar to [SPARK-12218](https://github.com/apache/spark/pull/10362) for Parquet. When we have AND nested below another expression, we should either push both legs or nothing.

Note that:
- The current Spark code will always split conjunctive predicate before it determines if a predicate can be pushed down or not
- If I have (p1 AND p2) AND p3, it will be split into p1, p2, p3. There won't be nested AND expression.
- The current Spark code logic for OR is OK. It either pushes both legs or nothing.

The same translation method is also called by Data Source V2.

## How was this patch tested?

Added new unit test cases to JDBCSuite

gatorsmile

Author: Jia Li <jiali@us.ibm.com>

Closes #19776 from jliwork/spark-22548.
2017-11-21 17:30:02 -08:00
Marco Gaido b96f61b6b2 [SPARK-22475][SQL] show histogram in DESC COLUMN command
## What changes were proposed in this pull request?

Added the histogram representation to the output of the `DESCRIBE EXTENDED table_name column_name` command.

## How was this patch tested?

Modified SQL UT and checked output

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Marco Gaido <mgaido@hortonworks.com>

Closes #19774 from mgaido91/SPARK-22475.
2017-11-21 20:55:24 +01:00
hyukjinkwon 6d7ebf2f9f [SPARK-22165][SQL] Fixes type conflicts between double, long, decimals, dates and timestamps in partition column
## What changes were proposed in this pull request?

This PR proposes to add a rule that re-uses `TypeCoercion.findWiderCommonType` when resolving type conflicts in partition values.

Currently, this uses numeric precedence-like comparison; therefore, it looks introducing failures for type conflicts between timestamps, dates and decimals, please see:

```scala
private val upCastingOrder: Seq[DataType] =
  Seq(NullType, IntegerType, LongType, FloatType, DoubleType, StringType)
...
literals.map(_.dataType).maxBy(upCastingOrder.indexOf(_))
```

The codes below:

```scala
val df = Seq((1, "2015-01-01"), (2, "2016-01-01 00:00:00")).toDF("i", "ts")
df.write.format("parquet").partitionBy("ts").save("/tmp/foo")
spark.read.load("/tmp/foo").printSchema()

val df = Seq((1, "1"), (2, "1" * 30)).toDF("i", "decimal")
df.write.format("parquet").partitionBy("decimal").save("/tmp/bar")
spark.read.load("/tmp/bar").printSchema()
```

produces output as below:

**Before**

```
root
 |-- i: integer (nullable = true)
 |-- ts: date (nullable = true)

root
 |-- i: integer (nullable = true)
 |-- decimal: integer (nullable = true)
```

**After**

```
root
 |-- i: integer (nullable = true)
 |-- ts: timestamp (nullable = true)

root
 |-- i: integer (nullable = true)
 |-- decimal: decimal(30,0) (nullable = true)
```

### Type coercion table:

This PR proposes the type conflict resolusion as below:

**Before**

|InputA \ InputB|`NullType`|`IntegerType`|`LongType`|`DecimalType(38,0)`|`DoubleType`|`DateType`|`TimestampType`|`StringType`|
|------------------------|----------|----------|----------|----------|----------|----------|----------|----------|
|**`NullType`**|`StringType`|`IntegerType`|`LongType`|`StringType`|`DoubleType`|`StringType`|`StringType`|`StringType`|
|**`IntegerType`**|`IntegerType`|`IntegerType`|`LongType`|`IntegerType`|`DoubleType`|`IntegerType`|`IntegerType`|`StringType`|
|**`LongType`**|`LongType`|`LongType`|`LongType`|`LongType`|`DoubleType`|`LongType`|`LongType`|`StringType`|
|**`DecimalType(38,0)`**|`StringType`|`IntegerType`|`LongType`|`DecimalType(38,0)`|`DoubleType`|`DecimalType(38,0)`|`DecimalType(38,0)`|`StringType`|
|**`DoubleType`**|`DoubleType`|`DoubleType`|`DoubleType`|`DoubleType`|`DoubleType`|`DoubleType`|`DoubleType`|`StringType`|
|**`DateType`**|`StringType`|`IntegerType`|`LongType`|`DateType`|`DoubleType`|`DateType`|`DateType`|`StringType`|
|**`TimestampType`**|`StringType`|`IntegerType`|`LongType`|`TimestampType`|`DoubleType`|`TimestampType`|`TimestampType`|`StringType`|
|**`StringType`**|`StringType`|`StringType`|`StringType`|`StringType`|`StringType`|`StringType`|`StringType`|`StringType`|

**After**

|InputA \ InputB|`NullType`|`IntegerType`|`LongType`|`DecimalType(38,0)`|`DoubleType`|`DateType`|`TimestampType`|`StringType`|
|------------------------|----------|----------|----------|----------|----------|----------|----------|----------|
|**`NullType`**|`NullType`|`IntegerType`|`LongType`|`DecimalType(38,0)`|`DoubleType`|`DateType`|`TimestampType`|`StringType`|
|**`IntegerType`**|`IntegerType`|`IntegerType`|`LongType`|`DecimalType(38,0)`|`DoubleType`|`StringType`|`StringType`|`StringType`|
|**`LongType`**|`LongType`|`LongType`|`LongType`|`DecimalType(38,0)`|`StringType`|`StringType`|`StringType`|`StringType`|
|**`DecimalType(38,0)`**|`DecimalType(38,0)`|`DecimalType(38,0)`|`DecimalType(38,0)`|`DecimalType(38,0)`|`StringType`|`StringType`|`StringType`|`StringType`|
|**`DoubleType`**|`DoubleType`|`DoubleType`|`StringType`|`StringType`|`DoubleType`|`StringType`|`StringType`|`StringType`|
|**`DateType`**|`DateType`|`StringType`|`StringType`|`StringType`|`StringType`|`DateType`|`TimestampType`|`StringType`|
|**`TimestampType`**|`TimestampType`|`StringType`|`StringType`|`StringType`|`StringType`|`TimestampType`|`TimestampType`|`StringType`|
|**`StringType`**|`StringType`|`StringType`|`StringType`|`StringType`|`StringType`|`StringType`|`StringType`|`StringType`|

This was produced by:

```scala
  test("Print out chart") {
    val supportedTypes: Seq[DataType] = Seq(
      NullType, IntegerType, LongType, DecimalType(38, 0), DoubleType,
      DateType, TimestampType, StringType)

    // Old type conflict resolution:
    val upCastingOrder: Seq[DataType] =
      Seq(NullType, IntegerType, LongType, FloatType, DoubleType, StringType)
    def oldResolveTypeConflicts(dataTypes: Seq[DataType]): DataType = {
      val topType = dataTypes.maxBy(upCastingOrder.indexOf(_))
      if (topType == NullType) StringType else topType
    }
    println(s"|InputA \\ InputB|${supportedTypes.map(dt => s"`${dt.toString}`").mkString("|")}|")
    println(s"|------------------------|${supportedTypes.map(_ => "----------").mkString("|")}|")
    supportedTypes.foreach { inputA =>
      val types = supportedTypes.map(inputB => oldResolveTypeConflicts(Seq(inputA, inputB)))
      println(s"|**`$inputA`**|${types.map(dt => s"`${dt.toString}`").mkString("|")}|")
    }

    // New type conflict resolution:
    def newResolveTypeConflicts(dataTypes: Seq[DataType]): DataType = {
      dataTypes.fold[DataType](NullType)(findWiderTypeForPartitionColumn)
    }
    println(s"|InputA \\ InputB|${supportedTypes.map(dt => s"`${dt.toString}`").mkString("|")}|")
    println(s"|------------------------|${supportedTypes.map(_ => "----------").mkString("|")}|")
    supportedTypes.foreach { inputA =>
      val types = supportedTypes.map(inputB => newResolveTypeConflicts(Seq(inputA, inputB)))
      println(s"|**`$inputA`**|${types.map(dt => s"`${dt.toString}`").mkString("|")}|")
    }
  }
```

## How was this patch tested?

Unit tests added in `ParquetPartitionDiscoverySuite`.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #19389 from HyukjinKwon/partition-type-coercion.
2017-11-21 20:53:38 +01:00
gatorsmile 96e947ed6c [SPARK-22569][SQL] Clean usage of addMutableState and splitExpressions
## What changes were proposed in this pull request?
This PR is to clean the usage of addMutableState and splitExpressions

1. replace hardcoded type string to ctx.JAVA_BOOLEAN etc.
2. create a default value of the initCode for ctx.addMutableStats
3. Use named arguments when calling `splitExpressions `

## How was this patch tested?
The existing test cases

Author: gatorsmile <gatorsmile@gmail.com>

Closes #19790 from gatorsmile/codeClean.
2017-11-21 13:48:09 +01:00
Kazuaki Ishizaki 3c3eebc873 [SPARK-20101][SQL] Use OffHeapColumnVector when "spark.sql.columnVector.offheap.enable" is set to "true"
This PR enables to use ``OffHeapColumnVector`` when ``spark.sql.columnVector.offheap.enable`` is set to ``true``. While ``ColumnVector`` has two implementations ``OnHeapColumnVector`` and ``OffHeapColumnVector``, only ``OnHeapColumnVector`` is always used.

This PR implements the followings
- Pass ``OffHeapColumnVector`` to ``ColumnarBatch.allocate()`` when ``spark.sql.columnVector.offheap.enable`` is set to ``true``
- Free all of off-heap memory regions by ``OffHeapColumnVector.close()``
- Ensure to call ``OffHeapColumnVector.close()``

Use existing tests

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes #17436 from kiszk/SPARK-20101.
2017-11-20 12:40:26 +01:00
Dongjoon Hyun b10837ab1a [SPARK-22557][TEST] Use ThreadSignaler explicitly
## What changes were proposed in this pull request?

ScalaTest 3.0 uses an implicit `Signaler`. This PR makes it sure all Spark tests uses `ThreadSignaler` explicitly which has the same default behavior of interrupting a thread on the JVM like ScalaTest 2.2.x. This will reduce potential flakiness.

## How was this patch tested?

This is testsuite-only update. This should passes the Jenkins tests.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #19784 from dongjoon-hyun/use_thread_signaler.
2017-11-20 13:32:01 +09:00
Shixiong Zhu bf0c0ae2dc [SPARK-22544][SS] FileStreamSource should use its own hadoop conf to call globPathIfNecessary
## What changes were proposed in this pull request?

Pass the FileSystem created using the correct Hadoop conf into `globPathIfNecessary` so that it can pick up user's hadoop configurations, such as credentials.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu <zsxwing@gmail.com>

Closes #19771 from zsxwing/fix-file-stream-conf.
2017-11-17 15:35:24 -08:00
Li Jin 7d039e0c0a [SPARK-22409] Introduce function type argument in pandas_udf
## What changes were proposed in this pull request?

* Add a "function type" argument to pandas_udf.
* Add a new public enum class `PandasUdfType` in pyspark.sql.functions
* Refactor udf related code from pyspark.sql.functions to pyspark.sql.udf
* Merge "PythonUdfType" and "PythonEvalType" into a single enum class "PythonEvalType"

Example:
```
from pyspark.sql.functions import pandas_udf, PandasUDFType

pandas_udf('double', PandasUDFType.SCALAR):
def plus_one(v):
    return v + 1
```

## Design doc
https://docs.google.com/document/d/1KlLaa-xJ3oz28xlEJqXyCAHU3dwFYkFs_ixcUXrJNTc/edit

## How was this patch tested?

Added PandasUDFTests

## TODO:
* [x] Implement proper enum type for `PandasUDFType`
* [x] Update documentation
* [x] Add more tests in PandasUDFTests

Author: Li Jin <ice.xelloss@gmail.com>

Closes #19630 from icexelloss/spark-22409-pandas-udf-type.
2017-11-17 16:43:08 +01:00
Wenchen Fan b9dcbe5e1b [SPARK-22542][SQL] remove unused features in ColumnarBatch
## What changes were proposed in this pull request?

`ColumnarBatch` provides features to do fast filter and project in a columnar fashion, however this feature is never used by Spark, as Spark uses whole stage codegen and processes the data in a row fashion. This PR proposes to remove these unused features as we won't switch to columnar execution in the near future. Even we do, I think this part needs a proper redesign.

This is also a step to make `ColumnVector` public, as we don't wanna expose these features to users.

## How was this patch tested?

existing tests

Author: Wenchen Fan <wenchen@databricks.com>

Closes #19766 from cloud-fan/vector.
2017-11-16 18:23:00 -08:00
osatici 2014e7a789 [SPARK-22479][SQL] Exclude credentials from SaveintoDataSourceCommand.simpleString
## What changes were proposed in this pull request?

Do not include jdbc properties which may contain credentials in logging a logical plan with `SaveIntoDataSourceCommand` in it.

## How was this patch tested?

building locally and trying to reproduce (per the steps in https://issues.apache.org/jira/browse/SPARK-22479):
```
== Parsed Logical Plan ==
SaveIntoDataSourceCommand org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider570127fa, Map(dbtable -> test20, driver -> org.postgresql.Driver, url -> *********(redacted), password -> *********(redacted)), ErrorIfExists
   +- Range (0, 100, step=1, splits=Some(8))

== Analyzed Logical Plan ==
SaveIntoDataSourceCommand org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider570127fa, Map(dbtable -> test20, driver -> org.postgresql.Driver, url -> *********(redacted), password -> *********(redacted)), ErrorIfExists
   +- Range (0, 100, step=1, splits=Some(8))

== Optimized Logical Plan ==
SaveIntoDataSourceCommand org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider570127fa, Map(dbtable -> test20, driver -> org.postgresql.Driver, url -> *********(redacted), password -> *********(redacted)), ErrorIfExists
   +- Range (0, 100, step=1, splits=Some(8))

== Physical Plan ==
Execute SaveIntoDataSourceCommand
   +- SaveIntoDataSourceCommand org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider570127fa, Map(dbtable -> test20, driver -> org.postgresql.Driver, url -> *********(redacted), password -> *********(redacted)), ErrorIfExists
         +- Range (0, 100, step=1, splits=Some(8))
```

Author: osatici <osatici@palantir.com>

Closes #19708 from onursatici/os/redact-jdbc-creds.
2017-11-15 14:08:51 -08:00
liutang123 bc0848b4c1 [SPARK-22469][SQL] Accuracy problem in comparison with string and numeric
## What changes were proposed in this pull request?
This fixes a problem caused by #15880
`select '1.5' > 0.5; // Result is NULL in Spark but is true in Hive.
`
When compare string and numeric, cast them as double like Hive.

Author: liutang123 <liutang123@yeah.net>

Closes #19692 from liutang123/SPARK-22469.
2017-11-15 09:02:54 -08:00
Wenchen Fan dce1610ae3 [SPARK-22514][SQL] move ColumnVector.Array and ColumnarBatch.Row to individual files
## What changes were proposed in this pull request?

Logically the `Array` doesn't belong to `ColumnVector`, and `Row` doesn't belong to `ColumnarBatch`. e.g. `ColumnVector` needs to return `Array` for `getArray`, and `Row` for `getStruct`. `Array` and `Row` can return each other with the `getArray`/`getStruct` methods.

This is also a step to make `ColumnVector` public, it's cleaner to have `Array` and `Row` as top-level classes.

This PR is just code moving around, with 2 renaming: `Array` -> `VectorBasedArray`, `Row` -> `VectorBasedRow`.

## How was this patch tested?

existing tests.

Author: Wenchen Fan <wenchen@databricks.com>

Closes #19740 from cloud-fan/vector.
2017-11-15 14:42:37 +01:00
Marcelo Vanzin 0ffa7c488f [SPARK-20652][SQL] Store SQL UI data in the new app status store.
This change replaces the SQLListener with a new implementation that
saves the data to the same store used by the SparkContext's status
store. For that, the types used by the old SQLListener had to be
updated a bit so that they're more serialization-friendly.

The interface for getting data from the store was abstracted into
a new class, SQLAppStatusStore (following the convention used in
core).

Another change is the way that the SQL UI hooks up into the core
UI or the SHS. The old "SparkHistoryListenerFactory" was replaced
with a new "AppStatePlugin" that more explicitly differentiates
between the two use cases: processing events, and showing the UI.
Both live apps and the SHS use this new API (previously, it was
restricted to the SHS).

Note on the above: this causes a slight change of behavior for
live apps; the SQL tab will only show up after the first execution
is started.

The metrics gathering code was re-worked a bit so that the types
used are less memory hungry and more serialization-friendly. This
reduces memory usage when using in-memory stores, and reduces load
times when using disk stores.

Tested with existing and added unit tests. Note one unit test was
disabled because it depends on SPARK-20653, which isn't in yet.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #19681 from vanzin/SPARK-20652.
2017-11-14 15:28:22 -06:00
Zhenhua Wang 11b60af737 [SPARK-17074][SQL] Generate equi-height histogram in column statistics
## What changes were proposed in this pull request?

Equi-height histogram is effective in cardinality estimation, and more accurate than basic column stats (min, max, ndv, etc) especially in skew distribution. So we need to support it.

For equi-height histogram, all buckets (intervals) have the same height (frequency).
In this PR, we use a two-step method to generate an equi-height histogram:
1. use `ApproximatePercentile` to get percentiles `p(0), p(1/n), p(2/n) ... p((n-1)/n), p(1)`;
2. construct range values of buckets, e.g. `[p(0), p(1/n)], [p(1/n), p(2/n)] ... [p((n-1)/n), p(1)]`, and use `ApproxCountDistinctForIntervals` to count ndv in each bucket. Each bucket is of the form: `(lowerBound, higherBound, ndv)`.

## How was this patch tested?

Added new test cases and modified some existing test cases.

Author: Zhenhua Wang <wangzhenhua@huawei.com>
Author: Zhenhua Wang <wzh_zju@163.com>

Closes #19479 from wzhfy/generate_histogram.
2017-11-14 16:41:43 +01:00
hyukjinkwon 673c670465 [SPARK-17310][SQL] Add an option to disable record-level filter in Parquet-side
## What changes were proposed in this pull request?

There is a concern that Spark-side codegen row-by-row filtering might be faster than Parquet's one in general due to type-boxing and additional fuction calls which Spark's one tries to avoid.

So, this PR adds an option to disable/enable record-by-record filtering in Parquet side.

It sets the default to `false` to take the advantage of the improvement.

This was also discussed in https://github.com/apache/spark/pull/14671.
## How was this patch tested?

Manually benchmarks were performed. I generated a billion (1,000,000,000) records and tested equality comparison concatenated with `OR`. This filter combinations were made from 5 to 30.

It seem indeed Spark-filtering is faster in the test case and the gap increased as the filter tree becomes larger.

The details are as below:

**Code**

``` scala
test("Parquet-side filter vs Spark-side filter - record by record") {
  withTempPath { path =>
    val N = 1000 * 1000 * 1000
    val df = spark.range(N).toDF("a")
    df.write.parquet(path.getAbsolutePath)

    val benchmark = new Benchmark("Parquet-side vs Spark-side", N)
    Seq(5, 10, 20, 30).foreach { num =>
      val filterExpr = (0 to num).map(i => s"a = $i").mkString(" OR ")

      benchmark.addCase(s"Parquet-side filter - number of filters [$num]", 3) { _ =>
        withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> false.toString,
          SQLConf.PARQUET_RECORD_FILTER_ENABLED.key -> true.toString) {

          // We should strip Spark-side filter to compare correctly.
          stripSparkFilter(
            spark.read.parquet(path.getAbsolutePath).filter(filterExpr)).count()
        }
      }

      benchmark.addCase(s"Spark-side filter - number of filters [$num]", 3) { _ =>
        withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> false.toString,
          SQLConf.PARQUET_RECORD_FILTER_ENABLED.key -> false.toString) {

          spark.read.parquet(path.getAbsolutePath).filter(filterExpr).count()
        }
      }
    }

    benchmark.run()
  }
}
```

**Result**

```
Parquet-side vs Spark-side:              Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
Parquet-side filter - number of filters [5]      4268 / 4367        234.3           4.3       0.8X
Spark-side filter - number of filters [5]      3709 / 3741        269.6           3.7       0.9X
Parquet-side filter - number of filters [10]      5673 / 5727        176.3           5.7       0.6X
Spark-side filter - number of filters [10]      3588 / 3632        278.7           3.6       0.9X
Parquet-side filter - number of filters [20]      8024 / 8440        124.6           8.0       0.4X
Spark-side filter - number of filters [20]      3912 / 3946        255.6           3.9       0.8X
Parquet-side filter - number of filters [30]    11936 / 12041         83.8          11.9       0.3X
Spark-side filter - number of filters [30]      3929 / 3978        254.5           3.9       0.8X
```

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #15049 from HyukjinKwon/SPARK-17310.
2017-11-14 12:34:21 +01:00
Bryan Cutler 209b9361ac [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFrame from Pandas
## What changes were proposed in this pull request?

This change uses Arrow to optimize the creation of a Spark DataFrame from a Pandas DataFrame. The input df is sliced according to the default parallelism. The optimization is enabled with the existing conf "spark.sql.execution.arrow.enabled" and is disabled by default.

## How was this patch tested?

Added new unit test to create DataFrame with and without the optimization enabled, then compare results.

Author: Bryan Cutler <cutlerb@gmail.com>
Author: Takuya UESHIN <ueshin@databricks.com>

Closes #19459 from BryanCutler/arrow-createDataFrame-from_pandas-SPARK-20791.
2017-11-13 13:16:01 +09:00
Kazuaki Ishizaki 9bf696dbec [SPARK-21720][SQL] Fix 64KB JVM bytecode limit problem with AND or OR
## What changes were proposed in this pull request?

This PR changes `AND` or `OR` code generation to place condition and then expressions' generated code into separated methods if these size could be large. When the method is newly generated, variables for `isNull` and `value` are declared as an instance variable to pass these values (e.g. `isNull1409` and `value1409`) to the callers of the generated method.

This PR resolved two cases:

* large code size of left expression
* large code size of right expression

## How was this patch tested?

Added a new test case into `CodeGenerationSuite`

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes #18972 from kiszk/SPARK-21720.
2017-11-12 22:44:47 +01:00
Wenchen Fan 21a7bfd5c3 [SPARK-10365][SQL] Support Parquet logical type TIMESTAMP_MICROS
## What changes were proposed in this pull request?

This PR makes Spark to be able to read Parquet TIMESTAMP_MICROS values, and add a new config to allow Spark to write timestamp values to parquet as TIMESTAMP_MICROS type.

## How was this patch tested?

new test

Author: Wenchen Fan <wenchen@databricks.com>

Closes #19702 from cloud-fan/parquet.
2017-11-11 22:40:26 +01:00