Commit graph

3685 commits

Author SHA1 Message Date
Herman van Hovell 0332063553 [SPARK-20410][SQL] Make sparkConf a def in SharedSQLContext
## What changes were proposed in this pull request?
It is kind of annoying that `SharedSQLContext.sparkConf` is a val when overriding test cases, because you cannot call `super` on it. This PR makes it a function.

## How was this patch tested?
Existing tests.

Author: Herman van Hovell <hvanhovell@databricks.com>

Closes #17705 from hvanhovell/SPARK-20410.
2017-04-20 22:37:04 +02:00
Dilip Biswal d95e4d9d6a [SPARK-20334][SQL] Return a better error message when correlated predicates contain aggregate expression that has mixture of outer and local references.
## What changes were proposed in this pull request?
Address a follow up in [comment](https://github.com/apache/spark/pull/16954#discussion_r105718880)
Currently subqueries with correlated predicates containing aggregate expression having mixture of outer references and local references generate a codegen error like following :

```SQL
SELECT t1a
FROM   t1
GROUP  BY 1
HAVING EXISTS (SELECT 1
               FROM  t2
               WHERE t2a < min(t1a + t2a));
```
Exception snippet.
```
Cannot evaluate expression: min((input[0, int, false] + input[4, int, false]))
	at org.apache.spark.sql.catalyst.expressions.Unevaluable$class.doGenCode(Expression.scala:226)
	at org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression.doGenCode(interfaces.scala:87)
	at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:106)
	at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:103)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:103)

```
After this PR, a better error message is issued.
```
org.apache.spark.sql.AnalysisException
Error in query: Found an aggregate expression in a correlated
predicate that has both outer and local references, which is not supported yet.
Aggregate expression: min((t1.`t1a` + t2.`t2a`)),
Outer references: t1.`t1a`,
Local references: t2.`t2a`.;
```
## How was this patch tested?
Added tests in SQLQueryTestSuite.

Author: Dilip Biswal <dbiswal@us.ibm.com>

Closes #17636 from dilipbiswal/subquery_followup1.
2017-04-20 22:35:48 +02:00
Bogdan Raducanu c5a31d160f [SPARK-20407][TESTS] ParquetQuerySuite 'Enabling/disabling ignoreCorruptFiles' flaky test
## What changes were proposed in this pull request?

SharedSQLContext.afterEach now calls DebugFilesystem.assertNoOpenStreams inside eventually.
SQLTestUtils withTempDir calls waitForTasksToFinish before deleting the directory.

## How was this patch tested?
Added new test in ParquetQuerySuite based on the flaky test

Author: Bogdan Raducanu <bogdan@databricks.com>

Closes #17701 from bogdanrdc/SPARK-20407.
2017-04-20 18:49:39 +02:00
Wenchen Fan b91873db09 [SPARK-20409][SQL] fail early if aggregate function in GROUP BY
## What changes were proposed in this pull request?

It's illegal to have aggregate function in GROUP BY, and we should fail at analysis phase, if this happens.

## How was this patch tested?

new regression test

Author: Wenchen Fan <wenchen@databricks.com>

Closes #17704 from cloud-fan/minor.
2017-04-20 16:59:38 +02:00
Reynold Xin c6f62c5b81 [SPARK-20405][SQL] Dataset.withNewExecutionId should be private
## What changes were proposed in this pull request?
Dataset.withNewExecutionId is only used in Dataset itself and should be private.

## How was this patch tested?
N/A - this is a simple visibility change.

Author: Reynold Xin <rxin@databricks.com>

Closes #17699 from rxin/SPARK-20405.
2017-04-20 14:29:59 +02:00
Xiao Li 55bea56911 [SPARK-20156][SQL][FOLLOW-UP] Java String toLowerCase "Turkish locale bug" in Database and Table DDLs
### What changes were proposed in this pull request?
Database and Table names conform the Hive standard ("[a-zA-z_0-9]+"), i.e. if this name only contains characters, numbers, and _.

When calling `toLowerCase` on the names, we should add `Locale.ROOT` to the `toLowerCase`for avoiding inadvertent locale-sensitive variation in behavior (aka the "Turkish locale problem").

### How was this patch tested?
Added a test case

Author: Xiao Li <gatorsmile@gmail.com>

Closes #17655 from gatorsmile/locale.
2017-04-20 11:13:48 +01:00
Eric Liang dd6d55d5de [SPARK-20398][SQL] range() operator should include cancellation reason when killed
## What changes were proposed in this pull request?

https://issues.apache.org/jira/browse/SPARK-19820 adds a reason field for why tasks were killed. However, for backwards compatibility it left the old TaskKilledException constructor which defaults to "unknown reason".
The range() operator should use the constructor that fills in the reason rather than dropping it on task kill.

## How was this patch tested?

Existing tests, and I tested this manually.

Author: Eric Liang <ekl@databricks.com>

Closes #17692 from ericl/fix-kill-reason-in-range.
2017-04-19 19:53:40 -07:00
Liang-Chi Hsieh 773754b6c1 [SPARK-20356][SQL] Pruned InMemoryTableScanExec should have correct output partitioning and ordering
## What changes were proposed in this pull request?

The output of `InMemoryTableScanExec` can be pruned and mismatch with `InMemoryRelation` and its child plan's output. This causes wrong output partitioning and ordering.

## How was this patch tested?

Jenkins tests.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #17679 from viirya/SPARK-20356.
2017-04-19 16:01:28 +08:00
Koert Kuipers 608bf30f0b [SPARK-20359][SQL] Avoid unnecessary execution in EliminateOuterJoin optimization that can lead to NPE
Avoid necessary execution that can lead to NPE in EliminateOuterJoin and add test in DataFrameSuite to confirm NPE is no longer thrown

## What changes were proposed in this pull request?
Change leftHasNonNullPredicate and rightHasNonNullPredicate to lazy so they are only executed when needed.

## How was this patch tested?

Added test in DataFrameSuite that failed before this fix and now succeeds. Note that a test in catalyst project would be better but i am unsure how to do this.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Koert Kuipers <koert@tresata.com>

Closes #17660 from koertkuipers/feat-catch-npe-in-eliminate-outer-join.
2017-04-19 15:52:47 +08:00
Xiao Li 01ff0350a8 [SPARK-20349][SQL] ListFunctions returns duplicate functions after using persistent functions
### What changes were proposed in this pull request?
The session catalog caches some persistent functions in the `FunctionRegistry`, so there can be duplicates. Our Catalog API `listFunctions` does not handle it.

It would be better if `SessionCatalog` API can de-duplciate the records, instead of doing it by each API caller. In `FunctionRegistry`, our functions are identified by the unquoted string. Thus, this PR is try to parse it using our parser interface and then de-duplicate the names.

### How was this patch tested?
Added test cases.

Author: Xiao Li <gatorsmile@gmail.com>

Closes #17646 from gatorsmile/showFunctions.
2017-04-17 09:50:20 -07:00
wangzhenhua fb036c4413 [SPARK-20318][SQL] Use Catalyst type for min/max in ColumnStat for ease of estimation
## What changes were proposed in this pull request?

Currently when estimating predicates like col > literal or col = literal, we will update min or max in column stats based on literal value. However, literal value is of Catalyst type (internal type), while min/max is of external type. Then for the next predicate, we again need to do type conversion to compare and update column stats. This is awkward and causes many unnecessary conversions in estimation.

To solve this, we use Catalyst type for min/max in `ColumnStat`. Note that the persistent format in metastore is still of external type, so there's no inconsistency for statistics in metastore.

This pr also fixes a bug for boolean type in `IN` condition.

## How was this patch tested?

The changes for ColumnStat are covered by existing tests.
For bug fix, a new test for boolean type in IN condition is added

Author: wangzhenhua <wangzhenhua@huawei.com>

Closes #17630 from wzhfy/refactorColumnStat.
2017-04-14 19:16:47 +08:00
Steve Loughran 7536e2849d [SPARK-20038][SQL] FileFormatWriter.ExecuteWriteTask.releaseResources() implementations to be re-entrant
## What changes were proposed in this pull request?

have the`FileFormatWriter.ExecuteWriteTask.releaseResources()` implementations  set `currentWriter=null` in a finally clause. This guarantees that if the first call to `currentWriter()` throws an exception, the second releaseResources() call made during the task cancel process will not trigger a second attempt to close the stream.

## How was this patch tested?

Tricky. I've been fixing the underlying cause when I saw the problem [HADOOP-14204](https://issues.apache.org/jira/browse/HADOOP-14204), but SPARK-10109 shows I'm not the first to have seen this. I can't replicate it locally any more, my code no longer being broken.

code review, however, should be straightforward

Author: Steve Loughran <stevel@hortonworks.com>

Closes #17364 from steveloughran/stevel/SPARK-20038-close.
2017-04-13 15:30:44 -05:00
Burak Yavuz 924c42477b [SPARK-20301][FLAKY-TEST] Fix Hadoop Shell.runCommand flakiness in Structured Streaming tests
## What changes were proposed in this pull request?

Some Structured Streaming tests show flakiness such as:
```
[info] - prune results by current_date, complete mode - 696 *** FAILED *** (10 seconds, 937 milliseconds)
[info]   Timed out while stopping and waiting for microbatchthread to terminate.: The code passed to failAfter did not complete within 10 seconds.
```

This happens when we wait for the stream to stop, but it doesn't. The reason it doesn't stop is that we interrupt the microBatchThread, but Hadoop's `Shell.runCommand` swallows the interrupt exception, and the exception is not propagated upstream to the microBatchThread. Then this thread continues to run, only to start blocking on the `streamManualClock`.

## How was this patch tested?

Thousand retries locally and [Jenkins](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/75720/testReport) of the flaky tests

Author: Burak Yavuz <brkyvz@gmail.com>

Closes #17613 from brkyvz/flaky-stream-agg.
2017-04-12 11:24:59 -07:00
Xiao Li 504e62e2f4 [SPARK-20303][SQL] Rename createTempFunction to registerFunction
### What changes were proposed in this pull request?
Session catalog API `createTempFunction` is being used by Hive build-in functions, persistent functions, and temporary functions. Thus, the name is confusing. This PR is to rename it by `registerFunction`. Also we can move construction of `FunctionBuilder` and `ExpressionInfo` into the new `registerFunction`, instead of duplicating the logics everywhere.

In the next PRs, the remaining Function-related APIs also need cleanups.

### How was this patch tested?
Existing test cases.

Author: Xiao Li <gatorsmile@gmail.com>

Closes #17615 from gatorsmile/cleanupCreateTempFunction.
2017-04-12 09:01:26 -07:00
hyukjinkwon ceaf77ae43 [SPARK-18692][BUILD][DOCS] Test Java 8 unidoc build on Jenkins
## What changes were proposed in this pull request?

This PR proposes to run Spark unidoc to test Javadoc 8 build as Javadoc 8 is easily re-breakable.

There are several problems with it:

- It introduces little extra bit of time to run the tests. In my case, it took 1.5 mins more (`Elapsed :[94.8746569157]`). How it was tested is described in "How was this patch tested?".

- > One problem that I noticed was that Unidoc appeared to be processing test sources: if we can find a way to exclude those from being processed in the first place then that might significantly speed things up.

  (see  joshrosen's [comment](https://issues.apache.org/jira/browse/SPARK-18692?focusedCommentId=15947627&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15947627))

To complete this automated build, It also suggests to fix existing Javadoc breaks / ones introduced by test codes as described above.

There fixes are similar instances that previously fixed. Please refer https://github.com/apache/spark/pull/15999 and https://github.com/apache/spark/pull/16013

Note that this only fixes **errors** not **warnings**. Please see my observation https://github.com/apache/spark/pull/17389#issuecomment-288438704 for spurious errors by warnings.

## How was this patch tested?

Manually via `jekyll build` for building tests. Also, tested via running `./dev/run-tests`.

This was tested via manually adding `time.time()` as below:

```diff
     profiles_and_goals = build_profiles + sbt_goals

     print("[info] Building Spark unidoc (w/Hive 1.2.1) using SBT with these arguments: ",
           " ".join(profiles_and_goals))

+    import time
+    st = time.time()
     exec_sbt(profiles_and_goals)
+    print("Elapsed :[%s]" % str(time.time() - st))
```

produces

```
...
========================================================================
Building Unidoc API Documentation
========================================================================
...
[info] Main Java API documentation successful.
...
Elapsed :[94.8746569157]
...

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #17477 from HyukjinKwon/SPARK-18692.
2017-04-12 12:38:48 +01:00
hyukjinkwon bca4259f12 [MINOR][DOCS] JSON APIs related documentation fixes
## What changes were proposed in this pull request?

This PR proposes corrections related to JSON APIs as below:

- Rendering links in Python documentation
- Replacing `RDD` to `Dataset` in programing guide
- Adding missing description about JSON Lines consistently in `DataFrameReader.json` in Python API
- De-duplicating little bit of `DataFrameReader.json` in Scala/Java API

## How was this patch tested?

Manually build the documentation via `jekyll build`. Corresponding snapstops will be left on the codes.

Note that currently there are Javadoc8 breaks in several places. These are proposed to be handled in https://github.com/apache/spark/pull/17477. So, this PR does not fix those.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #17602 from HyukjinKwon/minor-json-documentation.
2017-04-12 09:16:39 +01:00
Dilip Biswal b14bfc3f8e [SPARK-19993][SQL] Caching logical plans containing subquery expressions does not work.
## What changes were proposed in this pull request?
The sameResult() method does not work when the logical plan contains subquery expressions.

**Before the fix**
```SQL
scala> val ds = spark.sql("select * from s1 where s1.c1 in (select s2.c1 from s2 where s1.c1 = s2.c1)")
ds: org.apache.spark.sql.DataFrame = [c1: int]

scala> ds.cache
res13: ds.type = [c1: int]

scala> spark.sql("select * from s1 where s1.c1 in (select s2.c1 from s2 where s1.c1 = s2.c1)").explain(true)
== Analyzed Logical Plan ==
c1: int
Project [c1#86]
+- Filter c1#86 IN (list#78 [c1#86])
   :  +- Project [c1#87]
   :     +- Filter (outer(c1#86) = c1#87)
   :        +- SubqueryAlias s2
   :           +- Relation[c1#87] parquet
   +- SubqueryAlias s1
      +- Relation[c1#86] parquet

== Optimized Logical Plan ==
Join LeftSemi, ((c1#86 = c1#87) && (c1#86 = c1#87))
:- Relation[c1#86] parquet
+- Relation[c1#87] parquet
```
**Plan after fix**
```SQL
== Analyzed Logical Plan ==
c1: int
Project [c1#22]
+- Filter c1#22 IN (list#14 [c1#22])
   :  +- Project [c1#23]
   :     +- Filter (outer(c1#22) = c1#23)
   :        +- SubqueryAlias s2
   :           +- Relation[c1#23] parquet
   +- SubqueryAlias s1
      +- Relation[c1#22] parquet

== Optimized Logical Plan ==
InMemoryRelation [c1#22], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
   +- *BroadcastHashJoin [c1#1, c1#1], [c1#2, c1#2], LeftSemi, BuildRight
      :- *FileScan parquet default.s1[c1#1] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/dbiswal/mygit/apache/spark/bin/spark-warehouse/s1], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<c1:int>
      +- BroadcastExchange HashedRelationBroadcastMode(List((shiftleft(cast(input[0, int, true] as bigint), 32) | (cast(input[0, int, true] as bigint) & 4294967295))))
         +- *FileScan parquet default.s2[c1#2] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/dbiswal/mygit/apache/spark/bin/spark-warehouse/s2], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<c1:int>
```
## How was this patch tested?
New tests are added to CachedTableSuite.

Author: Dilip Biswal <dbiswal@us.ibm.com>

Closes #17330 from dilipbiswal/subquery_cache_final.
2017-04-12 12:18:01 +08:00
DB Tsai 8ad63ee158 [SPARK-20291][SQL] NaNvl(FloatType, NullType) should not be cast to NaNvl(DoubleType, DoubleType)
## What changes were proposed in this pull request?

`NaNvl(float value, null)` will be converted into `NaNvl(float value, Cast(null, DoubleType))` and finally `NaNvl(Cast(float value, DoubleType), Cast(null, DoubleType))`.

This will cause mismatching in the output type when the input type is float.

By adding extra rule in TypeCoercion can resolve this issue.

## How was this patch tested?

unite tests.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: DB Tsai <dbt@netflix.com>

Closes #17606 from dbtsai/fixNaNvl.
2017-04-12 11:19:20 +08:00
Liang-Chi Hsieh cd91f96714 [SPARK-20175][SQL] Exists should not be evaluated in Join operator
## What changes were proposed in this pull request?

Similar to `ListQuery`, `Exists` should not be evaluated in `Join` operator too.

## How was this patch tested?

Jenkins tests.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #17491 from viirya/dont-push-exists-to-join.
2017-04-11 20:33:10 +08:00
Reynold Xin 379b0b0bbd [SPARK-20283][SQL] Add preOptimizationBatches
## What changes were proposed in this pull request?
We currently have postHocOptimizationBatches, but not preOptimizationBatches. This patch adds preOptimizationBatches so the optimizer debugging extensions are symmetric.

## How was this patch tested?
N/A

Author: Reynold Xin <rxin@databricks.com>

Closes #17595 from rxin/SPARK-20283.
2017-04-10 14:14:09 -07:00
Shixiong Zhu a35b9d9712 [SPARK-20282][SS][TESTS] Write the commit log first to fix a race contion in tests
## What changes were proposed in this pull request?

This PR fixes the following failure:
```
sbt.ForkMain$ForkError: org.scalatest.exceptions.TestFailedException:
Assert on query failed:

== Progress ==
   AssertOnQuery(<condition>, )
   StopStream
   AddData to MemoryStream[value#30891]: 1,2
   StartStream(OneTimeTrigger,org.apache.spark.util.SystemClock35cdc93a,Map())
   CheckAnswer: [6],[3]
   StopStream
=> AssertOnQuery(<condition>, )
   AssertOnQuery(<condition>, )
   StartStream(OneTimeTrigger,org.apache.spark.util.SystemClockcdb247d,Map())
   CheckAnswer: [6],[3]
   StopStream
   AddData to MemoryStream[value#30891]: 3
   StartStream(OneTimeTrigger,org.apache.spark.util.SystemClock55394e4d,Map())
   CheckLastBatch: [2]
   StopStream
   AddData to MemoryStream[value#30891]: 0
   StartStream(OneTimeTrigger,org.apache.spark.util.SystemClock749aa997,Map())
   ExpectFailure[org.apache.spark.SparkException, isFatalError: false]
   AssertOnQuery(<condition>, )
   AssertOnQuery(<condition>, incorrect start offset or end offset on exception)

== Stream ==
Output Mode: Append
Stream state: not started
Thread state: dead

== Sink ==
0: [6] [3]

== Plan ==

	at org.scalatest.Assertions$class.newAssertionFailedException(Assertions.scala:495)
	at org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1555)
	at org.scalatest.Assertions$class.fail(Assertions.scala:1328)
	at org.scalatest.FunSuite.fail(FunSuite.scala:1555)
	at org.apache.spark.sql.streaming.StreamTest$class.failTest$1(StreamTest.scala:347)
	at org.apache.spark.sql.streaming.StreamTest$class.verify$1(StreamTest.scala:318)
	at org.apache.spark.sql.streaming.StreamTest$$anonfun$liftedTree1$1$1.apply(StreamTest.scala:483)
	at org.apache.spark.sql.streaming.StreamTest$$anonfun$liftedTree1$1$1.apply(StreamTest.scala:357)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.sql.streaming.StreamTest$class.liftedTree1$1(StreamTest.scala:357)
	at org.apache.spark.sql.streaming.StreamTest$class.testStream(StreamTest.scala:356)
	at org.apache.spark.sql.streaming.StreamingQuerySuite.testStream(StreamingQuerySuite.scala:41)
	at org.apache.spark.sql.streaming.StreamingQuerySuite$$anonfun$6.apply$mcV$sp(StreamingQuerySuite.scala:166)
	at org.apache.spark.sql.streaming.StreamingQuerySuite$$anonfun$6.apply(StreamingQuerySuite.scala:161)
	at org.apache.spark.sql.streaming.StreamingQuerySuite$$anonfun$6.apply(StreamingQuerySuite.scala:161)
	at org.apache.spark.sql.catalyst.util.package$.quietly(package.scala:42)
	at org.apache.spark.sql.test.SQLTestUtils$$anonfun$testQuietly$1.apply$mcV$sp(SQLTestUtils.scala:268)
	at org.apache.spark.sql.test.SQLTestUtils$$anonfun$testQuietly$1.apply(SQLTestUtils.scala:268)
	at org.apache.spark.sql.test.SQLTestUtils$$anonfun$testQuietly$1.apply(SQLTestUtils.scala:268)
	at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
	at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
	at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
	at org.scalatest.Transformer.apply(Transformer.scala:22)
	at org.scalatest.Transformer.apply(Transformer.scala:20)
	at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:166)
	at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:68)
	at org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:163)
	at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175)
	at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175)
	at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
	at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:175)
	at org.apache.spark.sql.streaming.StreamingQuerySuite.org$scalatest$BeforeAndAfterEach$$super$runTest(StreamingQuerySuite.scala:41)
	at org.scalatest.BeforeAndAfterEach$class.runTest(BeforeAndAfterEach.scala:255)
	at org.apache.spark.sql.streaming.StreamingQuerySuite.org$scalatest$BeforeAndAfter$$super$runTest(StreamingQuerySuite.scala:41)
	at org.scalatest.BeforeAndAfter$class.runTest(BeforeAndAfter.scala:200)
	at org.apache.spark.sql.streaming.StreamingQuerySuite.runTest(StreamingQuerySuite.scala:41)
	at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208)
	at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208)
	at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:413)
	at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:401)
	at scala.collection.immutable.List.foreach(List.scala:381)
	at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
	at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:396)
	at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:483)
	at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:208)
	at org.scalatest.FunSuite.runTests(FunSuite.scala:1555)
	at org.scalatest.Suite$class.run(Suite.scala:1424)
	at org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1555)
	at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212)
	at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212)
	at org.scalatest.SuperEngine.runImpl(Engine.scala:545)
	at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:212)
	at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:31)
	at org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:257)
	at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:256)
	at org.apache.spark.sql.streaming.StreamingQuerySuite.org$scalatest$BeforeAndAfter$$super$run(StreamingQuerySuite.scala:41)
	at org.scalatest.BeforeAndAfter$class.run(BeforeAndAfter.scala:241)
	at org.apache.spark.sql.streaming.StreamingQuerySuite.run(StreamingQuerySuite.scala:41)
	at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:357)
	at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:502)
	at sbt.ForkMain$Run$2.call(ForkMain.java:296)
	at sbt.ForkMain$Run$2.call(ForkMain.java:286)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
```

The failure is because `CheckAnswer` will run once `committedOffsets` is updated. Then writing the commit log may be interrupted by the following `StopStream`.

This PR just change the order to write the commit log first.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #17594 from zsxwing/SPARK-20282.
2017-04-10 14:09:32 -07:00
Bogdan Raducanu f6dd8e0e16 [SPARK-20280][CORE] FileStatusCache Weigher integer overflow
## What changes were proposed in this pull request?

Weigher.weigh needs to return Int but it is possible for an Array[FileStatus] to have size > Int.maxValue. To avoid this, the size is scaled down by a factor of 32. The maximumWeight of the cache is also scaled down by the same factor.

## How was this patch tested?
New test in FileIndexSuite

Author: Bogdan Raducanu <bogdan@databricks.com>

Closes #17591 from bogdanrdc/SPARK-20280.
2017-04-10 21:56:21 +02:00
Sean Owen a26e3ed5e4 [SPARK-20156][CORE][SQL][STREAMING][MLLIB] Java String toLowerCase "Turkish locale bug" causes Spark problems
## What changes were proposed in this pull request?

Add Locale.ROOT to internal calls to String `toLowerCase`, `toUpperCase`, to avoid inadvertent locale-sensitive variation in behavior (aka the "Turkish locale problem").

The change looks large but it is just adding `Locale.ROOT` (the locale with no country or language specified) to every call to these methods.

## How was this patch tested?

Existing tests.

Author: Sean Owen <sowen@cloudera.com>

Closes #17527 from srowen/SPARK-20156.
2017-04-10 20:11:56 +01:00
Wenchen Fan 3d7f201f2a [SPARK-20229][SQL] add semanticHash to QueryPlan
## What changes were proposed in this pull request?

Like `Expression`, `QueryPlan` should also have a `semanticHash` method, then we can put plans to a hash map and look it up fast. This PR refactors `QueryPlan` to follow `Expression` and put all the normalization logic in `QueryPlan.canonicalized`, so that it's very natural to implement `semanticHash`.

follow-up: improve `CacheManager` to leverage this `semanticHash` and speed up plan lookup, instead of iterating all cached plans.

## How was this patch tested?

existing tests. Note that we don't need to test the `semanticHash` method, once the existing tests prove `sameResult` is correct, we are good.

Author: Wenchen Fan <wenchen@databricks.com>

Closes #17541 from cloud-fan/plan-semantic.
2017-04-10 13:36:08 +08:00
DB Tsai 1a0bc41659
[SPARK-20270][SQL] na.fill should not change the values in long or integer when the default value is in double
## What changes were proposed in this pull request?

This bug was partially addressed in SPARK-18555 https://github.com/apache/spark/pull/15994, but the root cause isn't completely solved. This bug is pretty critical since it changes the member id in Long in our application if the member id can not be represented by Double losslessly when the member id is very big.

Here is an example how this happens, with
```
      Seq[(java.lang.Long, java.lang.Double)]((null, 3.14), (9123146099426677101L, null),
        (9123146560113991650L, 1.6), (null, null)).toDF("a", "b").na.fill(0.2),
```
the logical plan will be
```
== Analyzed Logical Plan ==
a: bigint, b: double
Project [cast(coalesce(cast(a#232L as double), cast(0.2 as double)) as bigint) AS a#240L, cast(coalesce(nanvl(b#233, cast(null as double)), 0.2) as double) AS b#241]
+- Project [_1#229L AS a#232L, _2#230 AS b#233]
   +- LocalRelation [_1#229L, _2#230]
```

Note that even the value is not null, Spark will cast the Long into Double first. Then if it's not null, Spark will cast it back to Long which results in losing precision.

The behavior should be that the original value should not be changed if it's not null, but Spark will change the value which is wrong.

With the PR, the logical plan will be
```
== Analyzed Logical Plan ==
a: bigint, b: double
Project [coalesce(a#232L, cast(0.2 as bigint)) AS a#240L, coalesce(nanvl(b#233, cast(null as double)), cast(0.2 as double)) AS b#241]
+- Project [_1#229L AS a#232L, _2#230 AS b#233]
   +- LocalRelation [_1#229L, _2#230]
```
which behaves correctly without changing the original Long values and also avoids extra cost of unnecessary casting.

## How was this patch tested?

unit test added.

+cc srowen rxin cloud-fan gatorsmile

Thanks.

Author: DB Tsai <dbt@netflix.com>

Closes #17577 from dbtsai/fixnafill.
2017-04-10 05:16:34 +00:00
Kazuaki Ishizaki 7a63f5e827 [SPARK-20253][SQL] Remove unnecessary nullchecks of a return value from Spark runtime routines in generated Java code
## What changes were proposed in this pull request?

This PR elminates unnecessary nullchecks of a return value from known Spark runtime routines. We know whether a given Spark runtime routine returns ``null`` or not (e.g. ``ArrayData.toDoubleArray()`` never returns ``null``). Thus, we can eliminate a null check for the return value from the Spark runtime routine.

When we run the following example program, now we get the Java code "Without this PR". In this code, since we know ``ArrayData.toDoubleArray()`` never returns ``null```, we can eliminate null checks at lines 90-92, and 97.

```java
val ds = sparkContext.parallelize(Seq(Array(1.1, 2.2)), 1).toDS.cache
ds.count
ds.map(e => e).show
```

Without this PR
```java
/* 050 */   protected void processNext() throws java.io.IOException {
/* 051 */     while (inputadapter_input.hasNext() && !stopEarly()) {
/* 052 */       InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
/* 053 */       boolean inputadapter_isNull = inputadapter_row.isNullAt(0);
/* 054 */       ArrayData inputadapter_value = inputadapter_isNull ? null : (inputadapter_row.getArray(0));
/* 055 */
/* 056 */       ArrayData deserializetoobject_value1 = null;
/* 057 */
/* 058 */       if (!inputadapter_isNull) {
/* 059 */         int deserializetoobject_dataLength = inputadapter_value.numElements();
/* 060 */
/* 061 */         Double[] deserializetoobject_convertedArray = null;
/* 062 */         deserializetoobject_convertedArray = new Double[deserializetoobject_dataLength];
/* 063 */
/* 064 */         int deserializetoobject_loopIndex = 0;
/* 065 */         while (deserializetoobject_loopIndex < deserializetoobject_dataLength) {
/* 066 */           MapObjects_loopValue2 = (double) (inputadapter_value.getDouble(deserializetoobject_loopIndex));
/* 067 */           MapObjects_loopIsNull2 = inputadapter_value.isNullAt(deserializetoobject_loopIndex);
/* 068 */
/* 069 */           if (MapObjects_loopIsNull2) {
/* 070 */             throw new RuntimeException(((java.lang.String) references[0]));
/* 071 */           }
/* 072 */           if (false) {
/* 073 */             deserializetoobject_convertedArray[deserializetoobject_loopIndex] = null;
/* 074 */           } else {
/* 075 */             deserializetoobject_convertedArray[deserializetoobject_loopIndex] = MapObjects_loopValue2;
/* 076 */           }
/* 077 */
/* 078 */           deserializetoobject_loopIndex += 1;
/* 079 */         }
/* 080 */
/* 081 */         deserializetoobject_value1 = new org.apache.spark.sql.catalyst.util.GenericArrayData(deserializetoobject_convertedArray); /*###*/
/* 082 */       }
/* 083 */       boolean deserializetoobject_isNull = true;
/* 084 */       double[] deserializetoobject_value = null;
/* 085 */       if (!inputadapter_isNull) {
/* 086 */         deserializetoobject_isNull = false;
/* 087 */         if (!deserializetoobject_isNull) {
/* 088 */           Object deserializetoobject_funcResult = null;
/* 089 */           deserializetoobject_funcResult = deserializetoobject_value1.toDoubleArray();
/* 090 */           if (deserializetoobject_funcResult == null) {
/* 091 */             deserializetoobject_isNull = true;
/* 092 */           } else {
/* 093 */             deserializetoobject_value = (double[]) deserializetoobject_funcResult;
/* 094 */           }
/* 095 */
/* 096 */         }
/* 097 */         deserializetoobject_isNull = deserializetoobject_value == null;
/* 098 */       }
/* 099 */
/* 100 */       boolean mapelements_isNull = true;
/* 101 */       double[] mapelements_value = null;
/* 102 */       if (!false) {
/* 103 */         mapelements_resultIsNull = false;
/* 104 */
/* 105 */         if (!mapelements_resultIsNull) {
/* 106 */           mapelements_resultIsNull = deserializetoobject_isNull;
/* 107 */           mapelements_argValue = deserializetoobject_value;
/* 108 */         }
/* 109 */
/* 110 */         mapelements_isNull = mapelements_resultIsNull;
/* 111 */         if (!mapelements_isNull) {
/* 112 */           Object mapelements_funcResult = null;
/* 113 */           mapelements_funcResult = ((scala.Function1) references[1]).apply(mapelements_argValue);
/* 114 */           if (mapelements_funcResult == null) {
/* 115 */             mapelements_isNull = true;
/* 116 */           } else {
/* 117 */             mapelements_value = (double[]) mapelements_funcResult;
/* 118 */           }
/* 119 */
/* 120 */         }
/* 121 */         mapelements_isNull = mapelements_value == null;
/* 122 */       }
/* 123 */
/* 124 */       serializefromobject_resultIsNull = false;
/* 125 */
/* 126 */       if (!serializefromobject_resultIsNull) {
/* 127 */         serializefromobject_resultIsNull = mapelements_isNull;
/* 128 */         serializefromobject_argValue = mapelements_value;
/* 129 */       }
/* 130 */
/* 131 */       boolean serializefromobject_isNull = serializefromobject_resultIsNull;
/* 132 */       final ArrayData serializefromobject_value = serializefromobject_resultIsNull ? null : org.apache.spark.sql.catalyst.expressions.UnsafeArrayData.fromPrimitiveArray(serializefromobject_argValue);
/* 133 */       serializefromobject_isNull = serializefromobject_value == null;
/* 134 */       serializefromobject_holder.reset();
/* 135 */
/* 136 */       serializefromobject_rowWriter.zeroOutNullBytes();
/* 137 */
/* 138 */       if (serializefromobject_isNull) {
/* 139 */         serializefromobject_rowWriter.setNullAt(0);
/* 140 */       } else {
/* 141 */         // Remember the current cursor so that we can calculate how many bytes are
/* 142 */         // written later.
/* 143 */         final int serializefromobject_tmpCursor = serializefromobject_holder.cursor;
/* 144 */
/* 145 */         if (serializefromobject_value instanceof UnsafeArrayData) {
/* 146 */           final int serializefromobject_sizeInBytes = ((UnsafeArrayData) serializefromobject_value).getSizeInBytes();
/* 147 */           // grow the global buffer before writing data.
/* 148 */           serializefromobject_holder.grow(serializefromobject_sizeInBytes);
/* 149 */           ((UnsafeArrayData) serializefromobject_value).writeToMemory(serializefromobject_holder.buffer, serializefromobject_holder.cursor);
/* 150 */           serializefromobject_holder.cursor += serializefromobject_sizeInBytes;
/* 151 */
/* 152 */         } else {
/* 153 */           final int serializefromobject_numElements = serializefromobject_value.numElements();
/* 154 */           serializefromobject_arrayWriter.initialize(serializefromobject_holder, serializefromobject_numElements, 8);
/* 155 */
/* 156 */           for (int serializefromobject_index = 0; serializefromobject_index < serializefromobject_numElements; serializefromobject_index++) {
/* 157 */             if (serializefromobject_value.isNullAt(serializefromobject_index)) {
/* 158 */               serializefromobject_arrayWriter.setNullDouble(serializefromobject_index);
/* 159 */             } else {
/* 160 */               final double serializefromobject_element = serializefromobject_value.getDouble(serializefromobject_index);
/* 161 */               serializefromobject_arrayWriter.write(serializefromobject_index, serializefromobject_element);
/* 162 */             }
/* 163 */           }
/* 164 */         }
/* 165 */
/* 166 */         serializefromobject_rowWriter.setOffsetAndSize(0, serializefromobject_tmpCursor, serializefromobject_holder.cursor - serializefromobject_tmpCursor);
/* 167 */       }
/* 168 */       serializefromobject_result.setTotalSize(serializefromobject_holder.totalSize());
/* 169 */       append(serializefromobject_result);
/* 170 */       if (shouldStop()) return;
/* 171 */     }
/* 172 */   }
```

With this PR (removed most of lines 90-97 in the above code)
```java
/* 050 */   protected void processNext() throws java.io.IOException {
/* 051 */     while (inputadapter_input.hasNext() && !stopEarly()) {
/* 052 */       InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
/* 053 */       boolean inputadapter_isNull = inputadapter_row.isNullAt(0);
/* 054 */       ArrayData inputadapter_value = inputadapter_isNull ? null : (inputadapter_row.getArray(0));
/* 055 */
/* 056 */       ArrayData deserializetoobject_value1 = null;
/* 057 */
/* 058 */       if (!inputadapter_isNull) {
/* 059 */         int deserializetoobject_dataLength = inputadapter_value.numElements();
/* 060 */
/* 061 */         Double[] deserializetoobject_convertedArray = null;
/* 062 */         deserializetoobject_convertedArray = new Double[deserializetoobject_dataLength];
/* 063 */
/* 064 */         int deserializetoobject_loopIndex = 0;
/* 065 */         while (deserializetoobject_loopIndex < deserializetoobject_dataLength) {
/* 066 */           MapObjects_loopValue2 = (double) (inputadapter_value.getDouble(deserializetoobject_loopIndex));
/* 067 */           MapObjects_loopIsNull2 = inputadapter_value.isNullAt(deserializetoobject_loopIndex);
/* 068 */
/* 069 */           if (MapObjects_loopIsNull2) {
/* 070 */             throw new RuntimeException(((java.lang.String) references[0]));
/* 071 */           }
/* 072 */           if (false) {
/* 073 */             deserializetoobject_convertedArray[deserializetoobject_loopIndex] = null;
/* 074 */           } else {
/* 075 */             deserializetoobject_convertedArray[deserializetoobject_loopIndex] = MapObjects_loopValue2;
/* 076 */           }
/* 077 */
/* 078 */           deserializetoobject_loopIndex += 1;
/* 079 */         }
/* 080 */
/* 081 */         deserializetoobject_value1 = new org.apache.spark.sql.catalyst.util.GenericArrayData(deserializetoobject_convertedArray); /*###*/
/* 082 */       }
/* 083 */       boolean deserializetoobject_isNull = true;
/* 084 */       double[] deserializetoobject_value = null;
/* 085 */       if (!inputadapter_isNull) {
/* 086 */         deserializetoobject_isNull = false;
/* 087 */         if (!deserializetoobject_isNull) {
/* 088 */           Object deserializetoobject_funcResult = null;
/* 089 */           deserializetoobject_funcResult = deserializetoobject_value1.toDoubleArray();
/* 090 */           deserializetoobject_value = (double[]) deserializetoobject_funcResult;
/* 091 */
/* 092 */         }
/* 093 */
/* 094 */       }
/* 095 */
/* 096 */       boolean mapelements_isNull = true;
/* 097 */       double[] mapelements_value = null;
/* 098 */       if (!false) {
/* 099 */         mapelements_resultIsNull = false;
/* 100 */
/* 101 */         if (!mapelements_resultIsNull) {
/* 102 */           mapelements_resultIsNull = deserializetoobject_isNull;
/* 103 */           mapelements_argValue = deserializetoobject_value;
/* 104 */         }
/* 105 */
/* 106 */         mapelements_isNull = mapelements_resultIsNull;
/* 107 */         if (!mapelements_isNull) {
/* 108 */           Object mapelements_funcResult = null;
/* 109 */           mapelements_funcResult = ((scala.Function1) references[1]).apply(mapelements_argValue);
/* 110 */           if (mapelements_funcResult == null) {
/* 111 */             mapelements_isNull = true;
/* 112 */           } else {
/* 113 */             mapelements_value = (double[]) mapelements_funcResult;
/* 114 */           }
/* 115 */
/* 116 */         }
/* 117 */         mapelements_isNull = mapelements_value == null;
/* 118 */       }
/* 119 */
/* 120 */       serializefromobject_resultIsNull = false;
/* 121 */
/* 122 */       if (!serializefromobject_resultIsNull) {
/* 123 */         serializefromobject_resultIsNull = mapelements_isNull;
/* 124 */         serializefromobject_argValue = mapelements_value;
/* 125 */       }
/* 126 */
/* 127 */       boolean serializefromobject_isNull = serializefromobject_resultIsNull;
/* 128 */       final ArrayData serializefromobject_value = serializefromobject_resultIsNull ? null : org.apache.spark.sql.catalyst.expressions.UnsafeArrayData.fromPrimitiveArray(serializefromobject_argValue);
/* 129 */       serializefromobject_isNull = serializefromobject_value == null;
/* 130 */       serializefromobject_holder.reset();
/* 131 */
/* 132 */       serializefromobject_rowWriter.zeroOutNullBytes();
/* 133 */
/* 134 */       if (serializefromobject_isNull) {
/* 135 */         serializefromobject_rowWriter.setNullAt(0);
/* 136 */       } else {
/* 137 */         // Remember the current cursor so that we can calculate how many bytes are
/* 138 */         // written later.
/* 139 */         final int serializefromobject_tmpCursor = serializefromobject_holder.cursor;
/* 140 */
/* 141 */         if (serializefromobject_value instanceof UnsafeArrayData) {
/* 142 */           final int serializefromobject_sizeInBytes = ((UnsafeArrayData) serializefromobject_value).getSizeInBytes();
/* 143 */           // grow the global buffer before writing data.
/* 144 */           serializefromobject_holder.grow(serializefromobject_sizeInBytes);
/* 145 */           ((UnsafeArrayData) serializefromobject_value).writeToMemory(serializefromobject_holder.buffer, serializefromobject_holder.cursor);
/* 146 */           serializefromobject_holder.cursor += serializefromobject_sizeInBytes;
/* 147 */
/* 148 */         } else {
/* 149 */           final int serializefromobject_numElements = serializefromobject_value.numElements();
/* 150 */           serializefromobject_arrayWriter.initialize(serializefromobject_holder, serializefromobject_numElements, 8);
/* 151 */
/* 152 */           for (int serializefromobject_index = 0; serializefromobject_index < serializefromobject_numElements; serializefromobject_index++) {
/* 153 */             if (serializefromobject_value.isNullAt(serializefromobject_index)) {
/* 154 */               serializefromobject_arrayWriter.setNullDouble(serializefromobject_index);
/* 155 */             } else {
/* 156 */               final double serializefromobject_element = serializefromobject_value.getDouble(serializefromobject_index);
/* 157 */               serializefromobject_arrayWriter.write(serializefromobject_index, serializefromobject_element);
/* 158 */             }
/* 159 */           }
/* 160 */         }
/* 161 */
/* 162 */         serializefromobject_rowWriter.setOffsetAndSize(0, serializefromobject_tmpCursor, serializefromobject_holder.cursor - serializefromobject_tmpCursor);
/* 163 */       }
/* 164 */       serializefromobject_result.setTotalSize(serializefromobject_holder.totalSize());
/* 165 */       append(serializefromobject_result);
/* 166 */       if (shouldStop()) return;
/* 167 */     }
/* 168 */   }
```

## How was this patch tested?

Add test suites to ``DatasetPrimitiveSuite``

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes #17569 from kiszk/SPARK-20253.
2017-04-10 10:47:17 +08:00
Adrian Ionescu 589f3edb82 [SPARK-20255] Move listLeafFiles() to InMemoryFileIndex
## What changes were proposed in this pull request

Trying to get a grip on the `FileIndex` hierarchy, I was confused by the following inconsistency:

On the one hand, `PartitioningAwareFileIndex` defines `leafFiles` and `leafDirToChildrenFiles` as abstract, but on the other it fully implements `listLeafFiles` which does all the listing of files. However, the latter is only used by `InMemoryFileIndex`.

I'm hereby proposing to move this method (and all its dependencies) to the implementation class that actually uses it, and thus unclutter the `PartitioningAwareFileIndex` interface.

## How was this patch tested?

`./build/sbt sql/test`

Author: Adrian Ionescu <adrian@databricks.com>

Closes #17570 from adrian-ionescu/list-leaf-files.
2017-04-07 14:00:23 -07:00
Wenchen Fan ad3cc1312d [SPARK-20245][SQL][MINOR] pass output to LogicalRelation directly
## What changes were proposed in this pull request?

Currently `LogicalRelation` has a `expectedOutputAttributes` parameter, which makes it hard to reason about what the actual output is. Like other leaf nodes, `LogicalRelation` should also take `output` as a parameter, to simplify the logic

## How was this patch tested?

existing tests

Author: Wenchen Fan <wenchen@databricks.com>

Closes #17552 from cloud-fan/minor.
2017-04-07 15:58:50 +08:00
Felix Cheung bccc330193 [SPARK-20196][PYTHON][SQL] update doc for catalog functions for all languages, add pyspark refreshByPath API
## What changes were proposed in this pull request?

Update doc to remove external for createTable, add refreshByPath in python

## How was this patch tested?

manual

Author: Felix Cheung <felixcheung_m@hotmail.com>

Closes #17512 from felixcheung/catalogdoc.
2017-04-06 09:09:43 -07:00
Tathagata Das 9543fc0e08 [SPARK-20224][SS] Updated docs for streaming dropDuplicates and mapGroupsWithState
## What changes were proposed in this pull request?

- Fixed bug in Java API not passing timeout conf to scala API
- Updated markdown docs
- Updated scala docs
- Added scala and Java example

## How was this patch tested?
Manually ran examples.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #17539 from tdas/SPARK-20224.
2017-04-05 16:03:04 -07:00
wangzhenhua a2d8d767d9 [SPARK-20223][SQL] Fix typo in tpcds q77.sql
## What changes were proposed in this pull request?

Fix typo in tpcds q77.sql

## How was this patch tested?

N/A

Author: wangzhenhua <wangzhenhua@huawei.com>

Closes #17538 from wzhfy/typoQ77.
2017-04-05 10:21:43 -07:00
Tathagata Das dad499f324 [SPARK-20209][SS] Execute next trigger immediately if previous batch took longer than trigger interval
## What changes were proposed in this pull request?

For large trigger intervals (e.g. 10 minutes), if a batch takes 11 minutes, then it will wait for 9 mins before starting the next batch. This does not make sense. The processing time based trigger policy should be to do process batches as fast as possible, but no faster than 1 in every trigger interval. If batches are taking longer than trigger interval anyways, then no point waiting extra trigger interval.

In this PR, I modified the ProcessingTimeExecutor to do so. Another minor change I did was to extract our StreamManualClock into a separate class so that it can be used outside subclasses of StreamTest. For example, ProcessingTimeExecutorSuite does not need to create any context for testing, just needs the StreamManualClock.

## How was this patch tested?
Added new unit tests to comprehensively test this behavior.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #17525 from tdas/SPARK-20209.
2017-04-04 23:20:17 -07:00
Reynold Xin b6e71032d9 Small doc fix for ReuseSubquery. 2017-04-04 22:46:42 -07:00
Wenchen Fan 295747e597 [SPARK-19716][SQL] support by-name resolution for struct type elements in array
## What changes were proposed in this pull request?

Previously when we construct deserializer expression for array type, we will first cast the corresponding field to expected array type and then apply `MapObjects`.

However, by doing that, we lose the opportunity to do by-name resolution for struct type inside array type. In this PR, I introduce a `UnresolvedMapObjects` to hold the lambda function and the input array expression. Then during analysis, after the input array expression is resolved, we get the actual array element type and apply by-name resolution. Then we don't need to add `Cast` for array type when constructing the deserializer expression, as the element type is determined later at analyzer.

## How was this patch tested?

new regression test

Author: Wenchen Fan <wenchen@databricks.com>

Closes #17398 from cloud-fan/dataset.
2017-04-04 16:38:32 -07:00
Wenchen Fan 402bf2a50d [SPARK-20204][SQL] remove SimpleCatalystConf and CatalystConf type alias
## What changes were proposed in this pull request?

This is a follow-up of https://github.com/apache/spark/pull/17285 .

## How was this patch tested?

existing tests

Author: Wenchen Fan <wenchen@databricks.com>

Closes #17521 from cloud-fan/conf.
2017-04-04 11:56:21 -07:00
Xiao Li 26e7bca229 [SPARK-20198][SQL] Remove the inconsistency in table/function name conventions in SparkSession.Catalog APIs
### What changes were proposed in this pull request?
Observed by felixcheung , in `SparkSession`.`Catalog` APIs, we have different conventions/rules for table/function identifiers/names. Most APIs accept the qualified name (i.e., `databaseName`.`tableName` or `databaseName`.`functionName`). However, the following five APIs do not accept it.
- def listColumns(tableName: String): Dataset[Column]
- def getTable(tableName: String): Table
- def getFunction(functionName: String): Function
- def tableExists(tableName: String): Boolean
- def functionExists(functionName: String): Boolean

To make them consistent with the other Catalog APIs, this PR does the changes, updates the function/API comments and adds the `params` to clarify the inputs we allow.

### How was this patch tested?
Added the test cases .

Author: Xiao Li <gatorsmile@gmail.com>

Closes #17518 from gatorsmile/tableIdentifier.
2017-04-04 18:57:46 +08:00
Xiao Li 51d3c854c5 [SPARK-20067][SQL] Unify and Clean Up Desc Commands Using Catalog Interface
### What changes were proposed in this pull request?

This PR is to unify and clean up the outputs of `DESC EXTENDED/FORMATTED` and `SHOW TABLE EXTENDED` by moving the logics into the Catalog interface. The output formats are improved. We also add the missing attributes. It impacts the DDL commands like `SHOW TABLE EXTENDED`, `DESC EXTENDED` and `DESC FORMATTED`.

In addition, by following what we did in Dataset API `printSchema`, we can use `treeString` to show the schema in the more readable way.

Below is the current way:
```
Schema: STRUCT<`a`: STRING (nullable = true), `b`: INT (nullable = true), `c`: STRING (nullable = true), `d`: STRING (nullable = true)>
```
After the change, it should look like
```
Schema: root
 |-- a: string (nullable = true)
 |-- b: integer (nullable = true)
 |-- c: string (nullable = true)
 |-- d: string (nullable = true)
```

### How was this patch tested?
`describe.sql` and `show-tables.sql`

Author: Xiao Li <gatorsmile@gmail.com>

Closes #17394 from gatorsmile/descFollowUp.
2017-04-03 23:30:12 -07:00
Dilip Biswal 3bfb639cb7 [SPARK-10364][SQL] Support Parquet logical type TIMESTAMP_MILLIS
## What changes were proposed in this pull request?

**Description** from JIRA

The TimestampType in Spark SQL is of microsecond precision. Ideally, we should convert Spark SQL timestamp values into Parquet TIMESTAMP_MICROS. But unfortunately parquet-mr hasn't supported it yet.
For the read path, we should be able to read TIMESTAMP_MILLIS Parquet values and pad a 0 microsecond part to read values.
For the write path, currently we are writing timestamps as INT96, similar to Impala and Hive. One alternative is that, we can have a separate SQL option to let users be able to write Spark SQL timestamp values as TIMESTAMP_MILLIS. Of course, in this way the microsecond part will be truncated.
## How was this patch tested?

Added new tests in ParquetQuerySuite and ParquetIOSuite

Author: Dilip Biswal <dbiswal@us.ibm.com>

Closes #15332 from dilipbiswal/parquet-time-millis.
2017-04-04 09:53:05 +09:00
samelamin 58c9e6e77a [SPARK-20145] Fix range case insensitive bug in SQL
## What changes were proposed in this pull request?
Range in SQL should be case insensitive

## How was this patch tested?
unit test

Author: samelamin <hussam.elamin@gmail.com>
Author: samelamin <sam_elamin@discovery.com>

Closes #17487 from samelamin/SPARK-20145.
2017-04-03 17:16:31 -07:00
Adrian Ionescu 703c42c398 [SPARK-20194] Add support for partition pruning to in-memory catalog
## What changes were proposed in this pull request?
This patch implements `listPartitionsByFilter()` for `InMemoryCatalog` and thus resolves an outstanding TODO causing the `PruneFileSourcePartitions` optimizer rule not to apply when "spark.sql.catalogImplementation" is set to "in-memory" (which is the default).

The change is straightforward: it extracts the code for further filtering of the list of partitions returned by the metastore's `getPartitionsByFilter()` out from `HiveExternalCatalog` into `ExternalCatalogUtils` and calls this new function from `InMemoryCatalog` on the whole list of partitions.

Now that this method is implemented we can always pass the `CatalogTable` to the `DataSource` in `FindDataSourceTable`, so that the latter is resolved to a relation with a `CatalogFileIndex`, which is what the `PruneFileSourcePartitions` rule matches for.

## How was this patch tested?
Ran existing tests and added new test for `listPartitionsByFilter` in `ExternalCatalogSuite`, which is subclassed by both `InMemoryCatalogSuite` and `HiveExternalCatalogSuite`.

Author: Adrian Ionescu <adrian@databricks.com>

Closes #17510 from adrian-ionescu/InMemoryCatalog.
2017-04-03 08:48:49 -07:00
hyukjinkwon 4fa1a43af6 [SPARK-19641][SQL] JSON schema inference in DROPMALFORMED mode produces incorrect schema for non-array/object JSONs
## What changes were proposed in this pull request?

Currently, when we infer the types for vaild JSON strings but object or array, we are producing empty schemas regardless of parse modes as below:

```scala
scala> spark.read.option("mode", "DROPMALFORMED").json(Seq("""{"a": 1}""", """"a"""").toDS).printSchema()
root
```

```scala
scala> spark.read.option("mode", "FAILFAST").json(Seq("""{"a": 1}""", """"a"""").toDS).printSchema()
root
```

This PR proposes to handle parse modes in type inference.

After this PR,

```scala

scala> spark.read.option("mode", "DROPMALFORMED").json(Seq("""{"a": 1}""", """"a"""").toDS).printSchema()
root
 |-- a: long (nullable = true)
```

```
scala> spark.read.option("mode", "FAILFAST").json(Seq("""{"a": 1}""", """"a"""").toDS).printSchema()
java.lang.RuntimeException: Failed to infer a common schema. Struct types are expected but string was found.
```

This PR is based on e233fd0334 and I and NathanHowell talked about this in https://issues.apache.org/jira/browse/SPARK-19641

## How was this patch tested?

Unit tests in `JsonSuite` for both `DROPMALFORMED` and `FAILFAST` modes.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #17492 from HyukjinKwon/SPARK-19641.
2017-04-03 17:44:39 +08:00
hyukjinkwon cff11fd20e [SPARK-20166][SQL] Use XXX for ISO 8601 timezone instead of ZZ (FastDateFormat specific) in CSV/JSON timeformat options
## What changes were proposed in this pull request?

This PR proposes to use `XXX` format instead of `ZZ`. `ZZ` seems a `FastDateFormat` specific.

`ZZ` supports "ISO 8601 extended format time zones" but it seems `FastDateFormat` specific option.
I misunderstood this is compatible format with `SimpleDateFormat` when this change is introduced.
Please see [SimpleDateFormat documentation]( https://docs.oracle.com/javase/7/docs/api/java/text/SimpleDateFormat.html#iso8601timezone) and [FastDateFormat documentation](https://commons.apache.org/proper/commons-lang/apidocs/org/apache/commons/lang3/time/FastDateFormat.html).

It seems we better replace `ZZ` to `XXX` because they look using the same strategy - [FastDateParser.java#L930](8767cd4f1a/src/main/java/org/apache/commons/lang3/time/FastDateParser.java (L930)), [FastDateParser.java#L932-L951 ](8767cd4f1a/src/main/java/org/apache/commons/lang3/time/FastDateParser.java (L932-L951)) and [FastDateParser.java#L596-L601](8767cd4f1a/src/main/java/org/apache/commons/lang3/time/FastDateParser.java (L596-L601)).

I also checked the codes and manually debugged it for sure. It seems both cases use the same pattern `( Z|(?:[+-]\\d{2}(?::)\\d{2}))`.

_Note that this should be rather a fix about documentation and not the behaviour change because `ZZ` seems invalid date format in `SimpleDateFormat` as documented in `DataFrameReader` and etc, and both `ZZ` and `XXX` look identically working with `FastDateFormat`_

Current documentation is as below:

```
   * <li>`timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that
   * indicates a timestamp format. Custom date formats follow the formats at
   * `java.text.SimpleDateFormat`. This applies to timestamp type.</li>
```

## How was this patch tested?

Existing tests should cover this. Also, manually tested as below (BTW, I don't think these are worth being added as tests within Spark):

**Parse**

```scala
scala> new java.text.SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX").parse("2017-03-21T00:00:00.000-11:00")
res4: java.util.Date = Tue Mar 21 20:00:00 KST 2017

scala>  new java.text.SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX").parse("2017-03-21T00:00:00.000Z")
res10: java.util.Date = Tue Mar 21 09:00:00 KST 2017

scala> new java.text.SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZZ").parse("2017-03-21T00:00:00.000-11:00")
java.text.ParseException: Unparseable date: "2017-03-21T00:00:00.000-11:00"
  at java.text.DateFormat.parse(DateFormat.java:366)
  ... 48 elided
scala>  new java.text.SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZZ").parse("2017-03-21T00:00:00.000Z")
java.text.ParseException: Unparseable date: "2017-03-21T00:00:00.000Z"
  at java.text.DateFormat.parse(DateFormat.java:366)
  ... 48 elided
```

```scala
scala> org.apache.commons.lang3.time.FastDateFormat.getInstance("yyyy-MM-dd'T'HH:mm:ss.SSSXXX").parse("2017-03-21T00:00:00.000-11:00")
res7: java.util.Date = Tue Mar 21 20:00:00 KST 2017

scala> org.apache.commons.lang3.time.FastDateFormat.getInstance("yyyy-MM-dd'T'HH:mm:ss.SSSXXX").parse("2017-03-21T00:00:00.000Z")
res1: java.util.Date = Tue Mar 21 09:00:00 KST 2017

scala> org.apache.commons.lang3.time.FastDateFormat.getInstance("yyyy-MM-dd'T'HH:mm:ss.SSSZZ").parse("2017-03-21T00:00:00.000-11:00")
res8: java.util.Date = Tue Mar 21 20:00:00 KST 2017

scala> org.apache.commons.lang3.time.FastDateFormat.getInstance("yyyy-MM-dd'T'HH:mm:ss.SSSZZ").parse("2017-03-21T00:00:00.000Z")
res2: java.util.Date = Tue Mar 21 09:00:00 KST 2017
```

**Format**

```scala
scala> new java.text.SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX").format(new java.text.SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX").parse("2017-03-21T00:00:00.000-11:00"))
res6: String = 2017-03-21T20:00:00.000+09:00
```

```scala
scala> val fd = org.apache.commons.lang3.time.FastDateFormat.getInstance("yyyy-MM-dd'T'HH:mm:ss.SSSZZ")
fd: org.apache.commons.lang3.time.FastDateFormat = FastDateFormat[yyyy-MM-dd'T'HH:mm:ss.SSSZZ,ko_KR,Asia/Seoul]

scala> fd.format(fd.parse("2017-03-21T00:00:00.000-11:00"))
res1: String = 2017-03-21T20:00:00.000+09:00

scala> val fd = org.apache.commons.lang3.time.FastDateFormat.getInstance("yyyy-MM-dd'T'HH:mm:ss.SSSXXX")
fd: org.apache.commons.lang3.time.FastDateFormat = FastDateFormat[yyyy-MM-dd'T'HH:mm:ss.SSSXXX,ko_KR,Asia/Seoul]

scala> fd.format(fd.parse("2017-03-21T00:00:00.000-11:00"))
res2: String = 2017-03-21T20:00:00.000+09:00
```

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #17489 from HyukjinKwon/SPARK-20166.
2017-04-03 10:07:41 +01:00
Xiao Li 89d6822f72 [SPARK-19148][SQL][FOLLOW-UP] do not expose the external table concept in Catalog
### What changes were proposed in this pull request?
After we renames `Catalog`.`createExternalTable` to `createTable` in the PR: https://github.com/apache/spark/pull/16528, we also need to deprecate the corresponding functions in `SQLContext`.

### How was this patch tested?
N/A

Author: Xiao Li <gatorsmile@gmail.com>

Closes #17502 from gatorsmile/deprecateCreateExternalTable.
2017-04-01 20:43:13 +08:00
Tathagata Das 567a50acfb [SPARK-20165][SS] Resolve state encoder's deserializer in driver in FlatMapGroupsWithStateExec
## What changes were proposed in this pull request?

- Encoder's deserializer must be resolved at the driver where the class is defined. Otherwise there are corner cases using nested classes where resolving at the executor can fail.

- Fixed flaky test related to processing time timeout. The flakiness is caused because the test thread (that adds data to memory source) has a race condition with the streaming query thread. When testing the manual clock, the goal is to add data and increment clock together atomically, such that a trigger sees new data AND updated clock simultaneously (both or none). This fix adds additional synchronization in when adding data; it makes sure that the streaming query thread is waiting on the manual clock to be incremented (so no batch is currently running) before adding data.

- Added`testQuietly` on some tests that generate a lot of error logs.

## How was this patch tested?
Multiple runs on existing unit tests

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #17488 from tdas/SPARK-20165.
2017-03-31 10:58:43 -07:00
Kunal Khamar 254877c2f0 [SPARK-20164][SQL] AnalysisException not tolerant of null query plan.
## What changes were proposed in this pull request?

The query plan in an `AnalysisException` may be `null` when an `AnalysisException` object is serialized and then deserialized, since `plan` is marked `transient`. Or when someone throws an `AnalysisException` with a null query plan (which should not happen).
`def getMessage` is not tolerant of this and throws a `NullPointerException`, leading to loss of information about the original exception.
The fix is to add a `null` check in `getMessage`.

## How was this patch tested?

- Unit test

Author: Kunal Khamar <kkhamar@outlook.com>

Closes #17486 from kunalkhamar/spark-20164.
2017-03-31 09:17:22 -07:00
Reynold Xin a8a765b3f3 [SPARK-20151][SQL] Account for partition pruning in scan metadataTime metrics
## What changes were proposed in this pull request?
After SPARK-20136, we report metadata timing metrics in scan operator. However, that timing metric doesn't include one of the most important part of metadata, which is partition pruning. This patch adds that time measurement to the scan metrics.

## How was this patch tested?
N/A - I tried adding a test in SQLMetricsSuite but it was extremely convoluted to the point that I'm not sure if this is worth it.

Author: Reynold Xin <rxin@databricks.com>

Closes #17476 from rxin/SPARK-20151.
2017-03-30 23:09:33 -07:00
Jacek Laskowski 0197262a35 [DOCS] Docs-only improvements
…adoc

## What changes were proposed in this pull request?

Use recommended values for row boundaries in Window's scaladoc, i.e. `Window.unboundedPreceding`, `Window.unboundedFollowing`, and `Window.currentRow` (that were introduced in 2.1.0).

## How was this patch tested?

Local build

Author: Jacek Laskowski <jacek@japila.pl>

Closes #17417 from jaceklaskowski/window-expression-scaladoc.
2017-03-30 16:07:27 +01:00
Eric Liang 79636054f6 [SPARK-20148][SQL] Extend the file commit API to allow subscribing to task commit messages
## What changes were proposed in this pull request?

The internal FileCommitProtocol interface returns all task commit messages in bulk to the implementation when a job finishes. However, it is sometimes useful to access those messages before the job completes, so that the driver gets incremental progress updates before the job finishes.

This adds an `onTaskCommit` listener to the internal api.

## How was this patch tested?

Unit tests.

cc rxin

Author: Eric Liang <ekl@databricks.com>

Closes #17475 from ericl/file-commit-api-ext.
2017-03-29 20:59:48 -07:00
Reynold Xin 60977889ea [SPARK-20136][SQL] Add num files and metadata operation timing to scan operator metrics
## What changes were proposed in this pull request?
This patch adds explicit metadata operation timing and number of files in data source metrics. Those would be useful to include for performance profiling.

Screenshot of a UI with this change (num files and metadata time are new metrics):

<img width="321" alt="screen shot 2017-03-29 at 12 29 28 am" src="https://cloud.githubusercontent.com/assets/323388/24443272/d4ea58c0-1416-11e7-8940-ecb69375554a.png">

## How was this patch tested?
N/A

Author: Reynold Xin <rxin@databricks.com>

Closes #17465 from rxin/SPARK-20136.
2017-03-29 19:06:51 -07:00
Takeshi Yamamuro c4008480b7 [SPARK-20009][SQL] Support DDL strings for defining schema in functions.from_json
## What changes were proposed in this pull request?
This pr added `StructType.fromDDL`  to convert a DDL format string into `StructType` for defining schemas in `functions.from_json`.

## How was this patch tested?
Added tests in `JsonFunctionsSuite`.

Author: Takeshi Yamamuro <yamamuro@apache.org>

Closes #17406 from maropu/SPARK-20009.
2017-03-29 12:37:49 -07:00
Kunal Khamar 142f6d1492 [SPARK-20048][SQL] Cloning SessionState does not clone query execution listeners
## What changes were proposed in this pull request?

Bugfix from [SPARK-19540.](https://github.com/apache/spark/pull/16826)
Cloning SessionState does not clone query execution listeners, so cloned session is unable to listen to events on queries.

## How was this patch tested?

- Unit test

Author: Kunal Khamar <kkhamar@outlook.com>

Closes #17379 from kunalkhamar/clone-bugfix.
2017-03-29 12:35:19 -07:00
Reynold Xin 9712bd3954 [SPARK-20134][SQL] SQLMetrics.postDriverMetricUpdates to simplify driver side metric updates
## What changes were proposed in this pull request?
It is not super intuitive how to update SQLMetric on the driver side. This patch introduces a new SQLMetrics.postDriverMetricUpdates function to do that, and adds documentation to make it more obvious.

## How was this patch tested?
Updated a test case to use this method.

Author: Reynold Xin <rxin@databricks.com>

Closes #17464 from rxin/SPARK-20134.
2017-03-29 00:02:15 -07:00
Wenchen Fan d4fac410e0 [SPARK-20125][SQL] Dataset of type option of map does not work
## What changes were proposed in this pull request?

When we build the deserializer expression for map type, we will use `StaticInvoke` to call `ArrayBasedMapData.toScalaMap`, and declare the return type as `scala.collection.immutable.Map`. If the map is inside an Option, we will wrap this `StaticInvoke` with `WrapOption`, which requires the input to be `scala.collect.Map`. Ideally this should be fine, as `scala.collection.immutable.Map` extends `scala.collect.Map`, but our `ObjectType` is too strict about this, this PR fixes it.

## How was this patch tested?

new regression test

Author: Wenchen Fan <wenchen@databricks.com>

Closes #17454 from cloud-fan/map.
2017-03-28 11:47:43 -07:00
Herman van Hovell f82461fc11 [SPARK-20126][SQL] Remove HiveSessionState
## What changes were proposed in this pull request?
Commit ea361165e1 moved most of the logic from the SessionState classes into an accompanying builder. This makes the existence of the `HiveSessionState` redundant. This PR removes the `HiveSessionState`.

## How was this patch tested?
Existing tests.

Author: Herman van Hovell <hvanhovell@databricks.com>

Closes #17457 from hvanhovell/SPARK-20126.
2017-03-28 23:14:31 +08:00
Xiao Li a9abff281b [SPARK-20119][TEST-MAVEN] Fix the test case fail in DataSourceScanExecRedactionSuite
### What changes were proposed in this pull request?
Changed the pattern to match the first n characters in the location field so that the string truncation does not affect it.

### How was this patch tested?
N/A

Author: Xiao Li <gatorsmile@gmail.com>

Closes #17448 from gatorsmile/fixTestCAse.
2017-03-28 09:37:28 +02:00
Herman van Hovell ea361165e1 [SPARK-20100][SQL] Refactor SessionState initialization
## What changes were proposed in this pull request?
The current SessionState initialization code path is quite complex. A part of the creation is done in the SessionState companion objects, a part of the creation is one inside the SessionState class, and a part is done by passing functions.

This PR refactors this code path, and consolidates SessionState initialization into a builder class. This SessionState will not do any initialization and just becomes a place holder for the various Spark SQL internals. This also lays the ground work for two future improvements:

1. This provides us with a start for removing the `HiveSessionState`. Removing the `HiveSessionState` would also require us to move resource loading into a separate class, and to (re)move metadata hive.
2. This makes it easier to customize the Spark Session. Currently you will need to create a custom version of the builder. I have added hooks to facilitate this. A future step will be to create a semi stable API on top of this.

## How was this patch tested?
Existing tests.

Author: Herman van Hovell <hvanhovell@databricks.com>

Closes #17433 from hvanhovell/SPARK-20100.
2017-03-28 10:07:24 +08:00
Tathagata Das 8a6f33f048 [SPARK-19876][SS] Follow up: Refactored BatchCommitLog to simplify logic
## What changes were proposed in this pull request?

Existing logic seemingly writes null to the BatchCommitLog, even though it does additional checks to write '{}' (valid json) to the log. This PR simplifies the logic by disallowing use of `log.add(batchId, metadata)` and instead using `log.add(batchId)`. No question of specifying metadata, so no confusion related to null.

## How was this patch tested?
Existing tests pass.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #17444 from tdas/SPARK-19876-1.
2017-03-27 19:04:16 -07:00
Kazuaki Ishizaki 93bb0b911b [SPARK-20046][SQL] Facilitate loop optimizations in a JIT compiler regarding sqlContext.read.parquet()
## What changes were proposed in this pull request?

This PR improves performance of operations with `sqlContext.read.parquet()` by changing Java code generated by Catalyst. This PR is inspired by [the blog article](https://databricks.com/blog/2017/02/16/processing-trillion-rows-per-second-single-machine-can-nested-loop-joins-fast.html) and [this stackoverflow entry](http://stackoverflow.com/questions/40629435/fast-parquet-row-count-in-spark).

This PR changes generated code in the following two points.
1. Replace a while-loop with long instance variables a for-loop with int local variables
2. Suppress generation of `shouldStop()` method if this method is unnecessary (e.g. `append()` is not generated).

These points facilitates compiler optimizations in a JIT compiler by feeding the simplified Java code into the JIT compiler. The performance of `sqlContext.read.parquet().count` is improved by 1.09x.

Benchmark program:
```java
val dir = "/dev/shm/parquet"
val N = 1000 * 1000 * 40
val iters = 20
val benchmark = new Benchmark("Parquet", N * iters, minNumIters = 5, warmupTime = 30.seconds)
sparkSession.range(n).write.mode("overwrite").parquet(dir)

benchmark.addCase("count") { i: Int =>
  var n = 0
  var len = 0L
  while (n < iters) {
    len += sparkSession.read.parquet(dir).count
    n += 1
  }
}
benchmark.run
```

Performance result without this PR
```
OpenJDK 64-Bit Server VM 1.8.0_121-8u121-b13-0ubuntu1.16.04.2-b13 on Linux 4.4.0-47-generic
Intel(R) Xeon(R) CPU E5-2667 v3  3.20GHz
Parquet:                                 Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
w/o this PR                                   1152 / 1211        694.7           1.4       1.0X
```

Performance result with this PR
```
OpenJDK 64-Bit Server VM 1.8.0_121-8u121-b13-0ubuntu1.16.04.2-b13 on Linux 4.4.0-47-generic
Intel(R) Xeon(R) CPU E5-2667 v3  3.20GHz
Parquet:                                 Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
with this PR                                  1053 / 1121        760.0           1.3       1.0X
```

Here is a comparison between generated code w/o and with this PR. Only the method ```agg_doAggregateWithoutKey``` is changed.

Generated code without this PR
```java
/* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */   private Object[] references;
/* 007 */   private scala.collection.Iterator[] inputs;
/* 008 */   private boolean agg_initAgg;
/* 009 */   private boolean agg_bufIsNull;
/* 010 */   private long agg_bufValue;
/* 011 */   private scala.collection.Iterator scan_input;
/* 012 */   private org.apache.spark.sql.execution.metric.SQLMetric scan_numOutputRows;
/* 013 */   private org.apache.spark.sql.execution.metric.SQLMetric scan_scanTime;
/* 014 */   private long scan_scanTime1;
/* 015 */   private org.apache.spark.sql.execution.vectorized.ColumnarBatch scan_batch;
/* 016 */   private int scan_batchIdx;
/* 017 */   private org.apache.spark.sql.execution.metric.SQLMetric agg_numOutputRows;
/* 018 */   private org.apache.spark.sql.execution.metric.SQLMetric agg_aggTime;
/* 019 */   private UnsafeRow agg_result;
/* 020 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder;
/* 021 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter;
/* 022 */
/* 023 */   public GeneratedIterator(Object[] references) {
/* 024 */     this.references = references;
/* 025 */   }
/* 026 */
/* 027 */   public void init(int index, scala.collection.Iterator[] inputs) {
/* 028 */     partitionIndex = index;
/* 029 */     this.inputs = inputs;
/* 030 */     agg_initAgg = false;
/* 031 */
/* 032 */     scan_input = inputs[0];
/* 033 */     this.scan_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[0];
/* 034 */     this.scan_scanTime = (org.apache.spark.sql.execution.metric.SQLMetric) references[1];
/* 035 */     scan_scanTime1 = 0;
/* 036 */     scan_batch = null;
/* 037 */     scan_batchIdx = 0;
/* 038 */     this.agg_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[2];
/* 039 */     this.agg_aggTime = (org.apache.spark.sql.execution.metric.SQLMetric) references[3];
/* 040 */     agg_result = new UnsafeRow(1);
/* 041 */     this.agg_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_result, 0);
/* 042 */     this.agg_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_holder, 1);
/* 043 */
/* 044 */   }
/* 045 */
/* 046 */   private void agg_doAggregateWithoutKey() throws java.io.IOException {
/* 047 */     // initialize aggregation buffer
/* 048 */     agg_bufIsNull = false;
/* 049 */     agg_bufValue = 0L;
/* 050 */
/* 051 */     if (scan_batch == null) {
/* 052 */       scan_nextBatch();
/* 053 */     }
/* 054 */     while (scan_batch != null) {
/* 055 */       int numRows = scan_batch.numRows();
/* 056 */       while (scan_batchIdx < numRows) {
/* 057 */         int scan_rowIdx = scan_batchIdx++;
/* 058 */         // do aggregate
/* 059 */         // common sub-expressions
/* 060 */
/* 061 */         // evaluate aggregate function
/* 062 */         boolean agg_isNull1 = false;
/* 063 */
/* 064 */         long agg_value1 = -1L;
/* 065 */         agg_value1 = agg_bufValue + 1L;
/* 066 */         // update aggregation buffer
/* 067 */         agg_bufIsNull = false;
/* 068 */         agg_bufValue = agg_value1;
/* 069 */         if (shouldStop()) return;
/* 070 */       }
/* 071 */       scan_batch = null;
/* 072 */       scan_nextBatch();
/* 073 */     }
/* 074 */     scan_scanTime.add(scan_scanTime1 / (1000 * 1000));
/* 075 */     scan_scanTime1 = 0;
/* 076 */
/* 077 */   }
/* 078 */
/* 079 */   private void scan_nextBatch() throws java.io.IOException {
/* 080 */     long getBatchStart = System.nanoTime();
/* 081 */     if (scan_input.hasNext()) {
/* 082 */       scan_batch = (org.apache.spark.sql.execution.vectorized.ColumnarBatch)scan_input.next();
/* 083 */       scan_numOutputRows.add(scan_batch.numRows());
/* 084 */       scan_batchIdx = 0;
/* 085 */
/* 086 */     }
/* 087 */     scan_scanTime1 += System.nanoTime() - getBatchStart;
/* 088 */   }
/* 089 */
/* 090 */   protected void processNext() throws java.io.IOException {
/* 091 */     while (!agg_initAgg) {
/* 092 */       agg_initAgg = true;
/* 093 */       long agg_beforeAgg = System.nanoTime();
/* 094 */       agg_doAggregateWithoutKey();
/* 095 */       agg_aggTime.add((System.nanoTime() - agg_beforeAgg) / 1000000);
/* 096 */
/* 097 */       // output the result
/* 098 */
/* 099 */       agg_numOutputRows.add(1);
/* 100 */       agg_rowWriter.zeroOutNullBytes();
/* 101 */
/* 102 */       if (agg_bufIsNull) {
/* 103 */         agg_rowWriter.setNullAt(0);
/* 104 */       } else {
/* 105 */         agg_rowWriter.write(0, agg_bufValue);
/* 106 */       }
/* 107 */       append(agg_result);
/* 108 */     }
/* 109 */   }
/* 110 */ }
```

Generated code with this PR
```java
/* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */   private Object[] references;
/* 007 */   private scala.collection.Iterator[] inputs;
/* 008 */   private boolean agg_initAgg;
/* 009 */   private boolean agg_bufIsNull;
/* 010 */   private long agg_bufValue;
/* 011 */   private scala.collection.Iterator scan_input;
/* 012 */   private org.apache.spark.sql.execution.metric.SQLMetric scan_numOutputRows;
/* 013 */   private org.apache.spark.sql.execution.metric.SQLMetric scan_scanTime;
/* 014 */   private long scan_scanTime1;
/* 015 */   private org.apache.spark.sql.execution.vectorized.ColumnarBatch scan_batch;
/* 016 */   private int scan_batchIdx;
/* 017 */   private org.apache.spark.sql.execution.metric.SQLMetric agg_numOutputRows;
/* 018 */   private org.apache.spark.sql.execution.metric.SQLMetric agg_aggTime;
/* 019 */   private UnsafeRow agg_result;
/* 020 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder;
/* 021 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter;
/* 022 */
/* 023 */   public GeneratedIterator(Object[] references) {
/* 024 */     this.references = references;
/* 025 */   }
/* 026 */
/* 027 */   public void init(int index, scala.collection.Iterator[] inputs) {
/* 028 */     partitionIndex = index;
/* 029 */     this.inputs = inputs;
/* 030 */     agg_initAgg = false;
/* 031 */
/* 032 */     scan_input = inputs[0];
/* 033 */     this.scan_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[0];
/* 034 */     this.scan_scanTime = (org.apache.spark.sql.execution.metric.SQLMetric) references[1];
/* 035 */     scan_scanTime1 = 0;
/* 036 */     scan_batch = null;
/* 037 */     scan_batchIdx = 0;
/* 038 */     this.agg_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[2];
/* 039 */     this.agg_aggTime = (org.apache.spark.sql.execution.metric.SQLMetric) references[3];
/* 040 */     agg_result = new UnsafeRow(1);
/* 041 */     this.agg_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_result, 0);
/* 042 */     this.agg_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_holder, 1);
/* 043 */
/* 044 */   }
/* 045 */
/* 046 */   private void agg_doAggregateWithoutKey() throws java.io.IOException {
/* 047 */     // initialize aggregation buffer
/* 048 */     agg_bufIsNull = false;
/* 049 */     agg_bufValue = 0L;
/* 050 */
/* 051 */     if (scan_batch == null) {
/* 052 */       scan_nextBatch();
/* 053 */     }
/* 054 */     while (scan_batch != null) {
/* 055 */       int numRows = scan_batch.numRows();
/* 056 */       int scan_localEnd = numRows - scan_batchIdx;
/* 057 */       for (int scan_localIdx = 0; scan_localIdx < scan_localEnd; scan_localIdx++) {
/* 058 */         int scan_rowIdx = scan_batchIdx + scan_localIdx;
/* 059 */         // do aggregate
/* 060 */         // common sub-expressions
/* 061 */
/* 062 */         // evaluate aggregate function
/* 063 */         boolean agg_isNull1 = false;
/* 064 */
/* 065 */         long agg_value1 = -1L;
/* 066 */         agg_value1 = agg_bufValue + 1L;
/* 067 */         // update aggregation buffer
/* 068 */         agg_bufIsNull = false;
/* 069 */         agg_bufValue = agg_value1;
/* 070 */         // shouldStop check is eliminated
/* 071 */       }
/* 072 */       scan_batchIdx = numRows;
/* 073 */       scan_batch = null;
/* 074 */       scan_nextBatch();
/* 075 */     }
/* 079 */   }
/* 080 */
/* 081 */   private void scan_nextBatch() throws java.io.IOException {
/* 082 */     long getBatchStart = System.nanoTime();
/* 083 */     if (scan_input.hasNext()) {
/* 084 */       scan_batch = (org.apache.spark.sql.execution.vectorized.ColumnarBatch)scan_input.next();
/* 085 */       scan_numOutputRows.add(scan_batch.numRows());
/* 086 */       scan_batchIdx = 0;
/* 087 */
/* 088 */     }
/* 089 */     scan_scanTime1 += System.nanoTime() - getBatchStart;
/* 090 */   }
/* 091 */
/* 092 */   protected void processNext() throws java.io.IOException {
/* 093 */     while (!agg_initAgg) {
/* 094 */       agg_initAgg = true;
/* 095 */       long agg_beforeAgg = System.nanoTime();
/* 096 */       agg_doAggregateWithoutKey();
/* 097 */       agg_aggTime.add((System.nanoTime() - agg_beforeAgg) / 1000000);
/* 098 */
/* 099 */       // output the result
/* 100 */
/* 101 */       agg_numOutputRows.add(1);
/* 102 */       agg_rowWriter.zeroOutNullBytes();
/* 103 */
/* 104 */       if (agg_bufIsNull) {
/* 105 */         agg_rowWriter.setNullAt(0);
/* 106 */       } else {
/* 107 */         agg_rowWriter.write(0, agg_bufValue);
/* 108 */       }
/* 109 */       append(agg_result);
/* 110 */     }
/* 111 */   }
/* 112 */ }
```

## How was this patch tested?

Tested existing test suites

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes #17378 from kiszk/SPARK-20046.
2017-03-26 09:20:22 +02:00
Wenchen Fan 0b903caef3 [SPARK-19949][SQL][FOLLOW-UP] move FailureSafeParser from catalyst to sql core
## What changes were proposed in this pull request?

The `FailureSafeParser` is only used in sql core, it doesn't make sense to put it in catalyst module.

## How was this patch tested?

N/A

Author: Wenchen Fan <wenchen@databricks.com>

Closes #17408 from cloud-fan/minor.
2017-03-25 11:46:54 -07:00
Xiao Li a2ce0a2e30 [HOTFIX][SQL] Fix the failed test cases in GeneratorFunctionSuite
### What changes were proposed in this pull request?
Multiple tests failed. Revert the changes on `supportCodegen` of `GenerateExec`. For example,

- https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/75194/testReport/

### How was this patch tested?
N/A

Author: Xiao Li <gatorsmile@gmail.com>

Closes #17425 from gatorsmile/turnOnCodeGenGenerateExec.
2017-03-24 23:27:42 -07:00
Roxanne Moslehi f88f56b835 [DOCS] Clarify round mode for format_number & round functions
## What changes were proposed in this pull request?

Updated the description for the `format_number` description to indicate that it uses `HALF_EVEN` rounding. Updated the description for the `round` description to indicate that it uses `HALF_UP` rounding.

## How was this patch tested?

Just changing the two function comments so no testing involved.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Roxanne Moslehi <rmoslehi@palantir.com>
Author: roxannemoslehi <rmoslehi@berkeley.edu>

Closes #17399 from roxannemoslehi/patch-1.
2017-03-25 00:10:30 +01:00
Reynold Xin b5c5bd98ea Disable generate codegen since it fails my workload. 2017-03-24 23:57:29 +01:00
Herman van Hovell 91fa80fe8a [SPARK-20070][SQL] Redact DataSourceScanExec treeString
## What changes were proposed in this pull request?
The explain output of `DataSourceScanExec` can contain sensitive information (like Amazon keys). Such information should not end up in logs, or be exposed to non privileged users.

This PR addresses this by adding a redaction facility for the `DataSourceScanExec.treeString`. A user can enable this by setting a regex in the `spark.redaction.string.regex` configuration.

## How was this patch tested?
Added a unit test to check the output of DataSourceScanExec.

Author: Herman van Hovell <hvanhovell@databricks.com>

Closes #17397 from hvanhovell/SPARK-20070.
2017-03-24 15:52:48 -07:00
Eric Liang 8e558041aa [SPARK-19820][CORE] Add interface to kill tasks w/ a reason
This commit adds a killTaskAttempt method to SparkContext, to allow users to
kill tasks so that they can be re-scheduled elsewhere.

This also refactors the task kill path to allow specifying a reason for the task kill. The reason is propagated opaquely through events, and will show up in the UI automatically as `(N killed: $reason)` and `TaskKilled: $reason`. Without this change, there is no way to provide the user feedback through the UI.

Currently used reasons are "stage cancelled", "another attempt succeeded", and "killed via SparkContext.killTask". The user can also specify a custom reason through `SparkContext.killTask`.

cc rxin

In the stage overview UI the reasons are summarized:
![1](https://cloud.githubusercontent.com/assets/14922/23929209/a83b2862-08e1-11e7-8b3e-ae1967bbe2e5.png)

Within the stage UI you can see individual task kill reasons:
![2](https://cloud.githubusercontent.com/assets/14922/23929200/9a798692-08e1-11e7-8697-72b27ad8a287.png)

Existing tests, tried killing some stages in the UI and verified the messages are as expected.

Author: Eric Liang <ekl@databricks.com>
Author: Eric Liang <ekl@google.com>

Closes #17166 from ericl/kill-reason.
2017-03-23 23:30:44 -07:00
Kazuaki Ishizaki bb823ca4b4 [SPARK-19959][SQL] Fix to throw NullPointerException in df[java.lang.Long].collect
## What changes were proposed in this pull request?

This PR fixes `NullPointerException` in the generated code by Catalyst. When we run the following code, we get the following `NullPointerException`. This is because there is no null checks for `inputadapter_value`  while `java.lang.Long inputadapter_value` at Line 30 may have `null`.

This happen when a type of DataFrame is nullable primitive type such as `java.lang.Long` and the wholestage codegen is used. While the physical plan keeps `nullable=true` in `input[0, java.lang.Long, true].longValue`, `BoundReference.doGenCode` ignores `nullable=true`. Thus, nullcheck code will not be generated and `NullPointerException` will occur.

This PR checks the nullability and correctly generates nullcheck if needed.
```java
sparkContext.parallelize(Seq[java.lang.Long](0L, null, 2L), 1).toDF.collect
```

```java
Caused by: java.lang.NullPointerException
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(generated.java:37)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:393)
...
```

Generated code without this PR
```java
/* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */   private Object[] references;
/* 007 */   private scala.collection.Iterator[] inputs;
/* 008 */   private scala.collection.Iterator inputadapter_input;
/* 009 */   private UnsafeRow serializefromobject_result;
/* 010 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder serializefromobject_holder;
/* 011 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter serializefromobject_rowWriter;
/* 012 */
/* 013 */   public GeneratedIterator(Object[] references) {
/* 014 */     this.references = references;
/* 015 */   }
/* 016 */
/* 017 */   public void init(int index, scala.collection.Iterator[] inputs) {
/* 018 */     partitionIndex = index;
/* 019 */     this.inputs = inputs;
/* 020 */     inputadapter_input = inputs[0];
/* 021 */     serializefromobject_result = new UnsafeRow(1);
/* 022 */     this.serializefromobject_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(serializefromobject_result, 0);
/* 023 */     this.serializefromobject_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(serializefromobject_holder, 1);
/* 024 */
/* 025 */   }
/* 026 */
/* 027 */   protected void processNext() throws java.io.IOException {
/* 028 */     while (inputadapter_input.hasNext() && !stopEarly()) {
/* 029 */       InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
/* 030 */       java.lang.Long inputadapter_value = (java.lang.Long)inputadapter_row.get(0, null);
/* 031 */
/* 032 */       boolean serializefromobject_isNull = true;
/* 033 */       long serializefromobject_value = -1L;
/* 034 */       if (!false) {
/* 035 */         serializefromobject_isNull = false;
/* 036 */         if (!serializefromobject_isNull) {
/* 037 */           serializefromobject_value = inputadapter_value.longValue();
/* 038 */         }
/* 039 */
/* 040 */       }
/* 041 */       serializefromobject_rowWriter.zeroOutNullBytes();
/* 042 */
/* 043 */       if (serializefromobject_isNull) {
/* 044 */         serializefromobject_rowWriter.setNullAt(0);
/* 045 */       } else {
/* 046 */         serializefromobject_rowWriter.write(0, serializefromobject_value);
/* 047 */       }
/* 048 */       append(serializefromobject_result);
/* 049 */       if (shouldStop()) return;
/* 050 */     }
/* 051 */   }
/* 052 */ }
```

Generated code with this PR

```java
/* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */   private Object[] references;
/* 007 */   private scala.collection.Iterator[] inputs;
/* 008 */   private scala.collection.Iterator inputadapter_input;
/* 009 */   private UnsafeRow serializefromobject_result;
/* 010 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder serializefromobject_holder;
/* 011 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter serializefromobject_rowWriter;
/* 012 */
/* 013 */   public GeneratedIterator(Object[] references) {
/* 014 */     this.references = references;
/* 015 */   }
/* 016 */
/* 017 */   public void init(int index, scala.collection.Iterator[] inputs) {
/* 018 */     partitionIndex = index;
/* 019 */     this.inputs = inputs;
/* 020 */     inputadapter_input = inputs[0];
/* 021 */     serializefromobject_result = new UnsafeRow(1);
/* 022 */     this.serializefromobject_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(serializefromobject_result, 0);
/* 023 */     this.serializefromobject_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(serializefromobject_holder, 1);
/* 024 */
/* 025 */   }
/* 026 */
/* 027 */   protected void processNext() throws java.io.IOException {
/* 028 */     while (inputadapter_input.hasNext() && !stopEarly()) {
/* 029 */       InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
/* 030 */       boolean inputadapter_isNull = inputadapter_row.isNullAt(0);
/* 031 */       java.lang.Long inputadapter_value = inputadapter_isNull ? null : ((java.lang.Long)inputadapter_row.get(0, null));
/* 032 */
/* 033 */       boolean serializefromobject_isNull = true;
/* 034 */       long serializefromobject_value = -1L;
/* 035 */       if (!inputadapter_isNull) {
/* 036 */         serializefromobject_isNull = false;
/* 037 */         if (!serializefromobject_isNull) {
/* 038 */           serializefromobject_value = inputadapter_value.longValue();
/* 039 */         }
/* 040 */
/* 041 */       }
/* 042 */       serializefromobject_rowWriter.zeroOutNullBytes();
/* 043 */
/* 044 */       if (serializefromobject_isNull) {
/* 045 */         serializefromobject_rowWriter.setNullAt(0);
/* 046 */       } else {
/* 047 */         serializefromobject_rowWriter.write(0, serializefromobject_value);
/* 048 */       }
/* 049 */       append(serializefromobject_result);
/* 050 */       if (shouldStop()) return;
/* 051 */     }
/* 052 */   }
/* 053 */ }
```

## How was this patch tested?

Added new test suites in `DataFrameSuites`

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes #17302 from kiszk/SPARK-19959.
2017-03-24 12:57:56 +08:00
Burak Yavuz 93581fbc18 Fix compilation of the Scala 2.10 master branch
## What changes were proposed in this pull request?

Fixes break caused by: 746a558de2

## How was this patch tested?

Compiled with `build/sbt -Dscala2.10 sql/compile` locally

Author: Burak Yavuz <brkyvz@gmail.com>

Closes #17403 from brkyvz/onceTrigger2.10.
2017-03-23 17:57:31 -07:00
sureshthalamati c791180705 [SPARK-10849][SQL] Adds option to the JDBC data source write for user to specify database column type for the create table
## What changes were proposed in this pull request?
Currently JDBC data source creates tables in the target database using the default type mapping, and the JDBC dialect mechanism.  If users want to specify different database data type for only some of columns, there is no option available. In scenarios where default mapping does not work, users are forced to create tables on the target database before writing. This workaround is probably not acceptable from a usability point of view. This PR is to provide a user-defined type mapping for specific columns.

The solution is to allow users to specify database column data type for the create table  as JDBC datasource option(createTableColumnTypes) on write. Data type information can be specified in the same format as table schema DDL format (e.g: `name CHAR(64), comments VARCHAR(1024)`).

All supported target database types can not be specified ,  the data types has to be valid spark sql data types also.  For example user can not specify target database  CLOB data type. This will be supported in the follow-up PR.

Example:
```Scala
df.write
.option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)")
.jdbc(url, "TEST.DBCOLTYPETEST", properties)
```
## How was this patch tested?
Added new test cases to the JDBCWriteSuite

Author: sureshthalamati <suresh.thalamati@gmail.com>

Closes #16209 from sureshthalamati/jdbc_custom_dbtype_option_json-spark-10849.
2017-03-23 17:39:33 -07:00
Tyson Condie 746a558de2 [SPARK-19876][SS][WIP] OneTime Trigger Executor
## What changes were proposed in this pull request?

An additional trigger and trigger executor that will execute a single trigger only. One can use this OneTime trigger to have more control over the scheduling of triggers.

In addition, this patch requires an optimization to StreamExecution that logs a commit record at the end of successfully processing a batch. This new commit log will be used to determine the next batch (offsets) to process after a restart, instead of using the offset log itself to determine what batch to process next after restart; using the offset log to determine this would process the previously logged batch, always, thus not permitting a OneTime trigger feature.

## How was this patch tested?

A number of existing tests have been revised. These tests all assumed that when restarting a stream, the last batch in the offset log is to be re-processed. Given that we now have a commit log that will tell us if that last batch was processed successfully, the results/assumptions of those tests needed to be revised accordingly.

In addition, a OneTime trigger test was added to StreamingQuerySuite, which tests:
- The semantics of OneTime trigger (i.e., on start, execute a single batch, then stop).
- The case when the commit log was not able to successfully log the completion of a batch before restart, which would mean that we should fall back to what's in the offset log.
- A OneTime trigger execution that results in an exception being thrown.

marmbrus tdas zsxwing

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Tyson Condie <tcondie@gmail.com>
Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #17219 from tcondie/stream-commit.
2017-03-23 14:32:05 -07:00
hyukjinkwon aefe798905 [MINOR][BUILD] Fix javadoc8 break
## What changes were proposed in this pull request?

Several javadoc8 breaks have been introduced. This PR proposes fix those instances so that we can build Scala/Java API docs.

```
[error] .../spark/sql/core/target/java/org/apache/spark/sql/streaming/GroupState.java:6: error: reference not found
[error]  * <code>flatMapGroupsWithState</code> operations on {link KeyValueGroupedDataset}.
[error]                                                             ^
[error] .../spark/sql/core/target/java/org/apache/spark/sql/streaming/GroupState.java:10: error: reference not found
[error]  * Both, <code>mapGroupsWithState</code> and <code>flatMapGroupsWithState</code> in {link KeyValueGroupedDataset}
[error]                                                                                            ^
[error] .../spark/sql/core/target/java/org/apache/spark/sql/streaming/GroupState.java:51: error: reference not found
[error]  *    {link GroupStateTimeout.ProcessingTimeTimeout}) or event time (i.e.
[error]              ^
[error] .../spark/sql/core/target/java/org/apache/spark/sql/streaming/GroupState.java:52: error: reference not found
[error]  *    {link GroupStateTimeout.EventTimeTimeout}).
[error]              ^
[error] .../spark/sql/core/target/java/org/apache/spark/sql/streaming/GroupState.java:158: error: reference not found
[error]  *           Spark SQL types (see {link Encoder} for more details).
[error]                                          ^
[error] .../spark/mllib/target/java/org/apache/spark/ml/fpm/FPGrowthParams.java:26: error: bad use of '>'
[error]    * Number of partitions (>=1) used by parallel FP-growth. By default the param is not set, and
[error]                            ^
[error] .../spark/sql/core/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsWithStateFunction.java:30: error: reference not found
[error]  * {link org.apache.spark.sql.KeyValueGroupedDataset#flatMapGroupsWithState(
[error]           ^
[error] .../spark/sql/core/target/java/org/apache/spark/sql/KeyValueGroupedDataset.java:211: error: reference not found
[error]    * See {link GroupState} for more details.
[error]                 ^
[error] .../spark/sql/core/target/java/org/apache/spark/sql/KeyValueGroupedDataset.java:232: error: reference not found
[error]    * See {link GroupState} for more details.
[error]                 ^
[error] .../spark/sql/core/target/java/org/apache/spark/sql/KeyValueGroupedDataset.java:254: error: reference not found
[error]    * See {link GroupState} for more details.
[error]                 ^
[error] .../spark/sql/core/target/java/org/apache/spark/sql/KeyValueGroupedDataset.java:277: error: reference not found
[error]    * See {link GroupState} for more details.
[error]                 ^
[error] .../spark/core/target/java/org/apache/spark/TaskContextImpl.java:10: error: reference not found
[error]  * {link TaskMetrics} &amp; {link MetricsSystem} objects are not thread safe.
[error]           ^
[error] .../spark/core/target/java/org/apache/spark/TaskContextImpl.java:10: error: reference not found
[error]  * {link TaskMetrics} &amp; {link MetricsSystem} objects are not thread safe.
[error]                                     ^
[info] 13 errors
```

```
jekyll 3.3.1 | Error:  Unidoc generation failed
```

## How was this patch tested?

Manually via `jekyll build`

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #17389 from HyukjinKwon/minor-javadoc8-fix.
2017-03-23 08:41:30 +00:00
hyukjinkwon 07c12c09a7 [SPARK-18579][SQL] Use ignoreLeadingWhiteSpace and ignoreTrailingWhiteSpace options in CSV writing
## What changes were proposed in this pull request?

This PR proposes to support _not_ trimming the white spaces when writing out. These are `false` by default in CSV reading path but these are `true` by default in CSV writing in univocity parser.

Both `ignoreLeadingWhiteSpace` and `ignoreTrailingWhiteSpace` options are not being used for writing and therefore, we are always trimming the white spaces.

It seems we should provide a way to keep this white spaces easily.

WIth the data below:

```scala
val df = spark.read.csv(Seq("a , b  , c").toDS)
df.show()
```

```
+---+----+---+
|_c0| _c1|_c2|
+---+----+---+
| a | b  |  c|
+---+----+---+
```

**Before**

```scala
df.write.csv("/tmp/text.csv")
spark.read.text("/tmp/text.csv").show()
```

```
+-----+
|value|
+-----+
|a,b,c|
+-----+
```

It seems this can't be worked around via `quoteAll` too.

```scala
df.write.option("quoteAll", true).csv("/tmp/text.csv")
spark.read.text("/tmp/text.csv").show()
```
```
+-----------+
|      value|
+-----------+
|"a","b","c"|
+-----------+
```

**After**

```scala
df.write.option("ignoreLeadingWhiteSpace", false).option("ignoreTrailingWhiteSpace", false).csv("/tmp/text.csv")
spark.read.text("/tmp/text.csv").show()
```

```
+----------+
|     value|
+----------+
|a , b  , c|
+----------+
```

Note that this case is possible in R

```r
> system("cat text.csv")
f1,f2,f3
a , b  , c
> df <- read.csv(file="text.csv")
> df
  f1   f2 f3
1 a   b    c
> write.csv(df, file="text1.csv", quote=F, row.names=F)
> system("cat text1.csv")
f1,f2,f3
a , b  , c
```

## How was this patch tested?

Unit tests in `CSVSuite` and manual tests for Python.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #17310 from HyukjinKwon/SPARK-18579.
2017-03-23 00:25:01 -07:00
Sameer Agarwal 12cd00706c [BUILD][MINOR] Fix 2.10 build
## What changes were proposed in this pull request?

https://github.com/apache/spark/pull/17385 breaks the 2.10 sbt/maven builds by hitting an empty-string interpolation bug (https://issues.scala-lang.org/browse/SI-7919).

https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Compile/job/spark-master-compile-sbt-scala-2.10/4072/
https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Compile/job/spark-master-compile-maven-scala-2.10/3987/

## How was this patch tested?

Compiles

Author: Sameer Agarwal <sameerag@cs.berkeley.edu>

Closes #17391 from sameeragarwal/build-fix.
2017-03-22 15:58:42 -07:00
Tathagata Das 82b598b963 [SPARK-20057][SS] Renamed KeyedState to GroupState in mapGroupsWithState
## What changes were proposed in this pull request?

Since the state is tied a "group" in the "mapGroupsWithState" operations, its better to call the state "GroupState" instead of a key. This would make it more general if you extends this operation to RelationGroupedDataset and python APIs.

## How was this patch tested?
Existing unit tests.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #17385 from tdas/SPARK-20057.
2017-03-22 12:30:36 -07:00
hyukjinkwon 80fd070389 [SPARK-20018][SQL] Pivot with timestamp and count should not print internal representation
## What changes were proposed in this pull request?

Currently, when we perform count with timestamp types, it prints the internal representation as the column name as below:

```scala
Seq(new java.sql.Timestamp(1)).toDF("a").groupBy("a").pivot("a").count().show()
```

```
+--------------------+----+
|                   a|1000|
+--------------------+----+
|1969-12-31 16:00:...|   1|
+--------------------+----+
```

This PR proposes to use external Scala value instead of the internal representation in the column names as below:

```
+--------------------+-----------------------+
|                   a|1969-12-31 16:00:00.001|
+--------------------+-----------------------+
|1969-12-31 16:00:...|                      1|
+--------------------+-----------------------+
```

## How was this patch tested?

Unit test in `DataFramePivotSuite` and manual tests.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #17348 from HyukjinKwon/SPARK-20018.
2017-03-22 09:58:46 -07:00
hyukjinkwon 465818389a [SPARK-19949][SQL][FOLLOW-UP] Clean up parse modes and update related comments
## What changes were proposed in this pull request?

This PR proposes to make `mode` options in both CSV and JSON to use `cass object` and fix some related comments related previous fix.

Also, this PR modifies some tests related parse modes.

## How was this patch tested?

Modified unit tests in both `CSVSuite.scala` and `JsonSuite.scala`.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #17377 from HyukjinKwon/SPARK-19949.
2017-03-22 09:52:37 -07:00
Prashant Sharma 0caade6340 [SPARK-20027][DOCS] Compilation fix in java docs.
## What changes were proposed in this pull request?

During build/sbt publish-local, build breaks due to javadocs errors. This patch fixes those errors.

## How was this patch tested?

Tested by running the sbt build.

Author: Prashant Sharma <prashsh1@in.ibm.com>

Closes #17358 from ScrapCodes/docs-fix.
2017-03-22 13:52:03 +00:00
Xiao Li 7343a09401 [SPARK-20023][SQL] Output table comment for DESC FORMATTED
### What changes were proposed in this pull request?
Currently, `DESC FORMATTED` did not output the table comment, unlike what `DESC EXTENDED` does. This PR is to fix it.

Also correct the following displayed names in `DESC FORMATTED`, for being consistent with `DESC EXTENDED`
- `"Create Time:"` -> `"Created:"`
- `"Last Access Time:"` -> `"Last Access:"`

### How was this patch tested?
Added test cases in `describe.sql`

Author: Xiao Li <gatorsmile@gmail.com>

Closes #17381 from gatorsmile/descFormattedTableComment.
2017-03-22 19:08:28 +08:00
Tathagata Das c1e87e384d [SPARK-20030][SS] Event-time-based timeout for MapGroupsWithState
## What changes were proposed in this pull request?

Adding event time based timeout. The user sets the timeout timestamp directly using `KeyedState.setTimeoutTimestamp`. The keys times out when the watermark crosses the timeout timestamp.

## How was this patch tested?
Unit tests

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #17361 from tdas/SPARK-20030.
2017-03-21 21:27:08 -07:00
Kunal Khamar 2d73fcced0 [SPARK-20051][SS] Fix StreamSuite flaky test - recover from v2.1 checkpoint
## What changes were proposed in this pull request?

There is a race condition between calling stop on a streaming query and deleting directories in `withTempDir` that causes test to fail, fixing to do lazy deletion using delete on shutdown JVM hook.

## How was this patch tested?

- Unit test
  - repeated 300 runs with no failure

Author: Kunal Khamar <kkhamar@outlook.com>

Closes #17382 from kunalkhamar/partition-bugfix.
2017-03-21 18:56:14 -07:00
hyukjinkwon 9281a3d504 [SPARK-19919][SQL] Defer throwing the exception for empty paths in CSV datasource into DataSource
## What changes were proposed in this pull request?

This PR proposes to defer throwing the exception within `DataSource`.

Currently, if other datasources fail to infer the schema, it returns `None` and then this is being validated in `DataSource` as below:

```
scala> spark.read.json("emptydir")
org.apache.spark.sql.AnalysisException: Unable to infer schema for JSON. It must be specified manually.;
```

```
scala> spark.read.orc("emptydir")
org.apache.spark.sql.AnalysisException: Unable to infer schema for ORC. It must be specified manually.;
```

```
scala> spark.read.parquet("emptydir")
org.apache.spark.sql.AnalysisException: Unable to infer schema for Parquet. It must be specified manually.;
```

However, CSV it checks it within the datasource implementation and throws another exception message as below:

```
scala> spark.read.csv("emptydir")
java.lang.IllegalArgumentException: requirement failed: Cannot infer schema from an empty set of files
```

We could remove this duplicated check and validate this in one place in the same way with the same message.

## How was this patch tested?

Unit test in `CSVSuite` and manual test.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #17256 from HyukjinKwon/SPARK-19919.
2017-03-22 08:41:46 +08:00
Will Manning a04dcde8cb clarify array_contains function description
## What changes were proposed in this pull request?

The description in the comment for array_contains is vague/incomplete (i.e., doesn't mention that it returns `null` if the array is `null`); this PR fixes that.

## How was this patch tested?

No testing, since it merely changes a comment.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Will Manning <lwwmanning@gmail.com>

Closes #17380 from lwwmanning/patch-1.
2017-03-22 00:40:48 +01:00
Xin Wu 4c0ff5f585 [SPARK-19261][SQL] Alter add columns for Hive serde and some datasource tables
## What changes were proposed in this pull request?
Support` ALTER TABLE ADD COLUMNS (...) `syntax for Hive serde and some datasource tables.
In this PR, we consider a few aspects:

1. View is not supported for `ALTER ADD COLUMNS`

2. Since tables created in SparkSQL with Hive DDL syntax will populate table properties with schema information, we need make sure the consistency of the schema before and after ALTER operation in order for future use.

3. For embedded-schema type of format, such as `parquet`, we need to make sure that the predicate on the newly-added columns can be evaluated properly, or pushed down properly. In case of the data file does not have the columns for the newly-added columns, such predicates should return as if the column values are NULLs.

4. For datasource table, this feature does not support the following:
4.1 TEXT format, since there is only one default column `value` is inferred for text format data.
4.2 ORC format, since SparkSQL native ORC reader does not support the difference between user-specified-schema and inferred schema from ORC files.
4.3 Third party datasource types that implements RelationProvider, including the built-in JDBC format, since different implementations by the vendors may have different ways to dealing with schema.
4.4 Other datasource types, such as `parquet`, `json`, `csv`, `hive` are supported.

5. Column names being added can not be duplicate of any existing data column or partition column names. Case sensitivity is taken into consideration according to the sql configuration.

6. This feature also supports In-Memory catalog, while Hive support is turned off.
## How was this patch tested?
Add new test cases

Author: Xin Wu <xinwu@us.ibm.com>

Closes #16626 from xwu0226/alter_add_columns.
2017-03-21 08:49:54 -07:00
Wenchen Fan 68d65fae71 [SPARK-19949][SQL] unify bad record handling in CSV and JSON
## What changes were proposed in this pull request?

Currently JSON and CSV have exactly the same logic about handling bad records, this PR tries to abstract it and put it in a upper level to reduce code duplication.

The overall idea is, we make the JSON and CSV parser to throw a BadRecordException, then the upper level, FailureSafeParser, handles bad records according to the parse mode.

Behavior changes:
1. with PERMISSIVE mode, if the number of tokens doesn't match the schema, previously CSV parser will treat it as a legal record and parse as many tokens as possible. After this PR, we treat it as an illegal record, and put the raw record string in a special column, but we still parse as many tokens as possible.
2. all logging is removed as they are not very useful in practice.

## How was this patch tested?

existing tests

Author: Wenchen Fan <wenchen@databricks.com>
Author: hyukjinkwon <gurwls223@gmail.com>
Author: Wenchen Fan <cloud0fan@gmail.com>

Closes #17315 from cloud-fan/bad-record2.
2017-03-20 21:43:14 -07:00
Takeshi Yamamuro 0ec1db5475 [SPARK-19980][SQL] Add NULL checks in Bean serializer
## What changes were proposed in this pull request?
A Bean serializer in `ExpressionEncoder`  could change values when Beans having NULL. A concrete example is as follows;
```
scala> :paste
class Outer extends Serializable {
  private var cls: Inner = _
  def setCls(c: Inner): Unit = cls = c
  def getCls(): Inner = cls
}

class Inner extends Serializable {
  private var str: String = _
  def setStr(s: String): Unit = str = str
  def getStr(): String = str
}

scala> Seq("""{"cls":null}""", """{"cls": {"str":null}}""").toDF().write.text("data")
scala> val encoder = Encoders.bean(classOf[Outer])
scala> val schema = encoder.schema
scala> val df = spark.read.schema(schema).json("data").as[Outer](encoder)
scala> df.show
+------+
|   cls|
+------+
|[null]|
|  null|
+------+

scala> df.map(x => x)(encoder).show()
+------+
|   cls|
+------+
|[null]|
|[null]|     // <-- Value changed
+------+
```

This is because the Bean serializer does not have the NULL-check expressions that the serializer of Scala's product types has. Actually, this value change does not happen in Scala's product types;

```
scala> :paste
case class Outer(cls: Inner)
case class Inner(str: String)

scala> val encoder = Encoders.product[Outer]
scala> val schema = encoder.schema
scala> val df = spark.read.schema(schema).json("data").as[Outer](encoder)
scala> df.show
+------+
|   cls|
+------+
|[null]|
|  null|
+------+

scala> df.map(x => x)(encoder).show()
+------+
|   cls|
+------+
|[null]|
|  null|
+------+
```

This pr added the NULL-check expressions in Bean serializer along with the serializer of Scala's product types.

## How was this patch tested?
Added tests in `JavaDatasetSuite`.

Author: Takeshi Yamamuro <yamamuro@apache.org>

Closes #17347 from maropu/SPARK-19980.
2017-03-21 11:17:34 +08:00
wangzhenhua e9c91badce [SPARK-20010][SQL] Sort information is lost after sort merge join
## What changes were proposed in this pull request?

After sort merge join for inner join, now we only keep left key ordering. However, after inner join, right key has the same value and order as left key. So if we need another smj on right key, we will unnecessarily add a sort which causes additional cost.

As a more complicated example, A join B on A.key = B.key join C on B.key = C.key join D on A.key = D.key. We will unnecessarily add a sort on B.key when join {A, B} and C, and add a sort on A.key when join {A, B, C} and D.

To fix this, we need to propagate all sorted information (equivalent expressions) from bottom up through `outputOrdering` and `SortOrder`.

## How was this patch tested?

Test cases are added.

Author: wangzhenhua <wangzhenhua@huawei.com>

Closes #17339 from wzhfy/sortEnhance.
2017-03-21 10:43:17 +08:00
Zheng RuiFeng 10691d36de [SPARK-19573][SQL] Make NaN/null handling consistent in approxQuantile
## What changes were proposed in this pull request?
update `StatFunctions.multipleApproxQuantiles` to handle NaN/null

## How was this patch tested?
existing tests and added tests

Author: Zheng RuiFeng <ruifengz@foxmail.com>

Closes #16971 from zhengruifeng/quantiles_nan.
2017-03-20 18:25:59 -07:00
windpiger 7ce30e00b2 [SPARK-19990][SQL][TEST-MAVEN] create a temp file for file in test.jar's resource when run mvn test accross different modules
## What changes were proposed in this pull request?

After we have merged the `HiveDDLSuite` and `DDLSuite` in [SPARK-19235](https://issues.apache.org/jira/browse/SPARK-19235), we have two subclasses of `DDLSuite`, that is `HiveCatalogedDDLSuite` and `InMemoryCatalogDDLSuite`.

While `DDLSuite` is in `sql/core module`, and `HiveCatalogedDDLSuite` is in `sql/hive module`, if we mvn test
`HiveCatalogedDDLSuite`, it will run the test in its parent class `DDLSuite`, this will cause some test case failed which will get and use the test file path in `sql/core module` 's `resource`.

Because the test file path getted will start with 'jar:' like "jar:file:/home/jenkins/workspace/spark-master-test-maven-hadoop-2.6/sql/core/target/spark-sql_2.11-2.2.0-SNAPSHOT-tests.jar!/test-data/cars.csv", which will failed when new Path() in datasource.scala

This PR fix this by copy file from resource to  a temp dir.

## How was this patch tested?
N/A

Author: windpiger <songjun@outlook.com>

Closes #17338 from windpiger/fixtestfailemvn.
2017-03-20 21:36:00 +08:00
wangzhenhua 965a5abcff [SPARK-19994][SQL] Wrong outputOrdering for right/full outer smj
## What changes were proposed in this pull request?

For right outer join, values of the left key will be filled with nulls if it can't match the value of the right key, so `nullOrdering` of the left key can't be guaranteed. We should output right key order instead of left key order.

For full outer join, neither left key nor right key guarantees `nullOrdering`. We should not output any ordering.

In tests, besides adding three test cases for left/right/full outer sort merge join, this patch also reorganizes code in `PlannerSuite` by putting together tests for `Sort`, and also extracts common logic in Sort tests into a method.

## How was this patch tested?

Corresponding test cases are added.

Author: wangzhenhua <wangzhenhua@huawei.com>
Author: Zhenhua Wang <wzh_zju@163.com>

Closes #17331 from wzhfy/wrongOrdering.
2017-03-20 14:37:23 +08:00
hyukjinkwon 0cdcf91145 [SPARK-19849][SQL] Support ArrayType in to_json to produce JSON array
## What changes were proposed in this pull request?

This PR proposes to support an array of struct type in `to_json` as below:

```scala
import org.apache.spark.sql.functions._

val df = Seq(Tuple1(Tuple1(1) :: Nil)).toDF("a")
df.select(to_json($"a").as("json")).show()
```

```
+----------+
|      json|
+----------+
|[{"_1":1}]|
+----------+
```

Currently, it throws an exception as below (a newline manually inserted for readability):

```
org.apache.spark.sql.AnalysisException: cannot resolve 'structtojson(`array`)' due to data type
mismatch: structtojson requires that the expression is a struct expression.;;
```

This allows the roundtrip with `from_json` as below:

```scala
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

val schema = ArrayType(StructType(StructField("a", IntegerType) :: Nil))
val df = Seq("""[{"a":1}, {"a":2}]""").toDF("json").select(from_json($"json", schema).as("array"))
df.show()

// Read back.
df.select(to_json($"array").as("json")).show()
```

```
+----------+
|     array|
+----------+
|[[1], [2]]|
+----------+

+-----------------+
|             json|
+-----------------+
|[{"a":1},{"a":2}]|
+-----------------+
```

Also, this PR proposes to rename from `StructToJson` to `StructsToJson ` and `JsonToStruct` to `JsonToStructs`.

## How was this patch tested?

Unit tests in `JsonFunctionsSuite` and `JsonExpressionsSuite` for Scala, doctest for Python and test in `test_sparkSQL.R` for R.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #17192 from HyukjinKwon/SPARK-19849.
2017-03-19 22:33:01 -07:00
Tathagata Das 990af630d0 [SPARK-19067][SS] Processing-time-based timeout in MapGroupsWithState
## What changes were proposed in this pull request?

When a key does not get any new data in `mapGroupsWithState`, the mapping function is never called on it. So we need a timeout feature that calls the function again in such cases, so that the user can decide whether to continue waiting or clean up (remove state, save stuff externally, etc.).
Timeouts can be either based on processing time or event time. This JIRA is for processing time, but defines the high level API design for both. The usage would look like this.
```
def stateFunction(key: K, value: Iterator[V], state: KeyedState[S]): U = {
  ...
  state.setTimeoutDuration(10000)
  ...
}

dataset					// type is Dataset[T]
  .groupByKey[K](keyingFunc)   // generates KeyValueGroupedDataset[K, T]
  .mapGroupsWithState[S, U](
     func = stateFunction,
     timeout = KeyedStateTimeout.withProcessingTime)	// returns Dataset[U]
```

Note the following design aspects.

- The timeout type is provided as a param in mapGroupsWithState as a parameter global to all the keys. This is so that the planner knows this at planning time, and accordingly optimize the execution based on whether to saves extra info in state or not (e.g. timeout durations or timestamps).

- The exact timeout duration is provided inside the function call so that it can be customized on a per key basis.

- When the timeout occurs for a key, the function is called with no values, and KeyedState.isTimingOut() set to true.

- The timeout is reset for key every time the function is called on the key, that is, when the key has new data, or the key has timed out. So the user has to set the timeout duration everytime the function is called, otherwise there will not be any timeout set.

Guarantees provided on timeout of key, when timeout duration is D ms:
- Timeout will never be called before real clock time has advanced by D ms
- Timeout will be called eventually when there is a trigger with any data in it (i.e. after D ms). So there is a no strict upper bound on when the timeout would occur. For example, if there is no data in the stream (for any key) for a while, then the timeout will not be hit.

Implementation details:
- Added new param to `mapGroupsWithState` for timeout
- Added new method to `StateStore` to filter data based on timeout timestamp
- Changed the internal map type of `HDFSBackedStateStore` from Java's `HashMap` to `ConcurrentHashMap` as the latter allows weakly-consistent fail-safe iterators on the map data. See comments in code for more details.
- Refactored logic of `MapGroupsWithStateExec` to
  - Save timeout info to state store for each key that has data.
  - Then, filter states that should be timed out based on the current batch processing timestamp.
- Moved KeyedState for `o.a.s.sql` to `o.a.s.sql.streaming`. I remember that this was a feedback in the MapGroupsWithState PR that I had forgotten to address.

## How was this patch tested?
New unit tests in
- MapGroupsWithStateSuite for timeouts.
- StateStoreSuite for new APIs in StateStore.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #17179 from tdas/mapgroupwithstate-timeout.
2017-03-19 14:07:49 -07:00
Takeshi Yamamuro ccba622e35 [SPARK-19896][SQL] Throw an exception if case classes have circular references in toDS
## What changes were proposed in this pull request?
If case classes have circular references below, it throws StackOverflowError;
```
scala> :pasge
case class classA(i: Int, cls: classB)
case class classB(cls: classA)

scala> Seq(classA(0, null)).toDS()
java.lang.StackOverflowError
  at scala.reflect.internal.Symbols$Symbol.info(Symbols.scala:1494)
  at scala.reflect.runtime.JavaMirrors$JavaMirror$$anon$1.scala$reflect$runtime$SynchronizedSymbols$SynchronizedSymbol$$super$info(JavaMirrors.scala:66)
  at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anonfun$info$1.apply(SynchronizedSymbols.scala:127)
  at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anonfun$info$1.apply(SynchronizedSymbols.scala:127)
  at scala.reflect.runtime.Gil$class.gilSynchronized(Gil.scala:19)
  at scala.reflect.runtime.JavaUniverse.gilSynchronized(JavaUniverse.scala:16)
  at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$class.gilSynchronizedIfNotThreadsafe(SynchronizedSymbols.scala:123)
  at scala.reflect.runtime.JavaMirrors$JavaMirror$$anon$1.gilSynchronizedIfNotThreadsafe(JavaMirrors.scala:66)
  at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$class.info(SynchronizedSymbols.scala:127)
  at scala.reflect.runtime.JavaMirrors$JavaMirror$$anon$1.info(JavaMirrors.scala:66)
  at scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:48)
  at scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:45)
  at scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:45)
  at scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:45)
  at scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:45)
```
This pr added code to throw UnsupportedOperationException in that case as follows;
```
scala> :paste
case class A(cls: B)
case class B(cls: A)

scala> Seq(A(null)).toDS()
java.lang.UnsupportedOperationException: cannot have circular references in class, but got the circular reference of class B
  at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:627)
  at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.apply(ScalaReflection.scala:644)
  at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.apply(ScalaReflection.scala:632)
  at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
  at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
  at scala.collection.immutable.List.foreach(List.scala:381)
  at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
```

## How was this patch tested?
Added tests in `DatasetSuite`.

Author: Takeshi Yamamuro <yamamuro@apache.org>

Closes #17318 from maropu/SPARK-19896.
2017-03-18 14:40:16 +08:00
Jacek Laskowski 6326d406b9 [SQL][MINOR] Fix scaladoc for UDFRegistration
## What changes were proposed in this pull request?

Fix scaladoc for UDFRegistration

## How was this patch tested?

local build

Author: Jacek Laskowski <jacek@japila.pl>

Closes #17337 from jaceklaskowski/udfregistration-scaladoc.
2017-03-17 21:55:10 -07:00
Kunal Khamar 3783539d7a [SPARK-19873][SS] Record num shuffle partitions in offset log and enforce in next batch.
## What changes were proposed in this pull request?

If the user changes the shuffle partition number between batches, Streaming aggregation will fail.

Here are some possible cases:

- Change "spark.sql.shuffle.partitions"
- Use "repartition" and change the partition number in codes
- RangePartitioner doesn't generate deterministic partitions. Right now it's safe as we disallow sort before aggregation. Not sure if we will add some operators using RangePartitioner in future.

## How was this patch tested?

- Unit tests
- Manual tests
  - forward compatibility tested by using the new `OffsetSeqMetadata` json with Spark v2.1.0

Author: Kunal Khamar <kkhamar@outlook.com>

Closes #17216 from kunalkhamar/num-partitions.
2017-03-17 16:16:22 -07:00
Takeshi Yamamuro 7de66bae58 [SPARK-19967][SQL] Add from_json in FunctionRegistry
## What changes were proposed in this pull request?
This pr added entries in `FunctionRegistry` and supported `from_json` in SQL.

## How was this patch tested?
Added tests in `JsonFunctionsSuite` and `SQLQueryTestSuite`.

Author: Takeshi Yamamuro <yamamuro@apache.org>

Closes #17320 from maropu/SPARK-19967.
2017-03-17 14:51:59 -07:00
Andrew Ray 13538cf3dd [SPARK-19882][SQL] Pivot with null as a distinct pivot value throws NPE
## What changes were proposed in this pull request?

Allows null values of the pivot column to be included in the pivot values list without throwing NPE

Note this PR was made as an alternative to #17224 but preserves the two phase aggregate operation that is needed for good performance.

## How was this patch tested?

Additional unit test

Author: Andrew Ray <ray.andrew@gmail.com>

Closes #17226 from aray/pivot-null.
2017-03-17 16:43:42 +08:00
Reynold Xin 8537c00e0a [SPARK-19987][SQL] Pass all filters into FileIndex
## What changes were proposed in this pull request?
This is a tiny teeny refactoring to pass data filters also to the FileIndex, so FileIndex can have a more global view on predicates.

## How was this patch tested?
Change should be covered by existing test cases.

Author: Reynold Xin <rxin@databricks.com>

Closes #17322 from rxin/SPARK-19987.
2017-03-16 18:31:57 -07:00
Liwei Lin 2ea214dd05 [SPARK-19721][SS] Good error message for version mismatch in log files
## Problem

There are several places where we write out version identifiers in various logs for structured streaming (usually `v1`). However, in the places where we check for this, we throw a confusing error message.

## What changes were proposed in this pull request?

This patch made two major changes:
1. added a `parseVersion(...)` method, and based on this method, fixed the following places the way they did version checking (no other place needed to do this checking):
```
HDFSMetadataLog
  - CompactibleFileStreamLog  ------------> fixed with this patch
    - FileStreamSourceLog  ---------------> inherited the fix of `CompactibleFileStreamLog`
    - FileStreamSinkLog  -----------------> inherited the fix of `CompactibleFileStreamLog`
  - OffsetSeqLog  ------------------------> fixed with this patch
  - anonymous subclass in KafkaSource  ---> fixed with this patch
```

2. changed the type of `FileStreamSinkLog.VERSION`, `FileStreamSourceLog.VERSION` etc. from `String` to `Int`, so that we can identify newer versions via `version > 1` instead of `version != "v1"`
    - note this didn't break any backwards compatibility -- we are still writing out `"v1"` and reading back `"v1"`

## Exception message with this patch
```
java.lang.IllegalStateException: Failed to read log file /private/var/folders/nn/82rmvkk568sd8p3p8tb33trw0000gn/T/spark-86867b65-0069-4ef1-b0eb-d8bd258ff5b8/0. UnsupportedLogVersion: maximum supported log version is v1, but encountered v99. The log file was produced by a newer version of Spark and cannot be read by this version. Please upgrade.
	at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.get(HDFSMetadataLog.scala:202)
	at org.apache.spark.sql.execution.streaming.OffsetSeqLogSuite$$anonfun$3$$anonfun$apply$mcV$sp$2.apply(OffsetSeqLogSuite.scala:78)
	at org.apache.spark.sql.execution.streaming.OffsetSeqLogSuite$$anonfun$3$$anonfun$apply$mcV$sp$2.apply(OffsetSeqLogSuite.scala:75)
	at org.apache.spark.sql.test.SQLTestUtils$class.withTempDir(SQLTestUtils.scala:133)
	at org.apache.spark.sql.execution.streaming.OffsetSeqLogSuite.withTempDir(OffsetSeqLogSuite.scala:26)
	at org.apache.spark.sql.execution.streaming.OffsetSeqLogSuite$$anonfun$3.apply$mcV$sp(OffsetSeqLogSuite.scala:75)
	at org.apache.spark.sql.execution.streaming.OffsetSeqLogSuite$$anonfun$3.apply(OffsetSeqLogSuite.scala:75)
	at org.apache.spark.sql.execution.streaming.OffsetSeqLogSuite$$anonfun$3.apply(OffsetSeqLogSuite.scala:75)
	at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
	at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
```

## How was this patch tested?

unit tests

Author: Liwei Lin <lwlin7@gmail.com>

Closes #17070 from lw-lin/better-msg.
2017-03-16 13:05:36 -07:00
Takeshi Yamamuro 21f333c635 [SPARK-19751][SQL] Throw an exception if bean class has one's own class in fields
## What changes were proposed in this pull request?
The current master throws `StackOverflowError` in `createDataFrame`/`createDataset` if bean has one's own class in fields;
```
public class SelfClassInFieldBean implements Serializable {
  private SelfClassInFieldBean child;
  ...
}
```
This pr added code to throw `UnsupportedOperationException` in that case as soon as possible.

## How was this patch tested?
Added tests in `JavaDataFrameSuite` and `JavaDatasetSuite`.

Author: Takeshi Yamamuro <yamamuro@apache.org>

Closes #17188 from maropu/SPARK-19751.
2017-03-16 08:50:01 +08:00
windpiger fc9314671c [SPARK-19961][SQL][MINOR] unify a erro msg when drop databse for HiveExternalCatalog and InMemoryCatalog
## What changes were proposed in this pull request?

unify a exception erro msg for dropdatabase when the database still have some tables for HiveExternalCatalog and InMemoryCatalog
## How was this patch tested?
N/A

Author: windpiger <songjun@outlook.com>

Closes #17305 from windpiger/unifyErromsg.
2017-03-16 08:44:57 +08:00
Juliusz Sompolski 339b237dc1 [SPARK-19948] Document that saveAsTable uses catalog as source of truth for table existence.
It is quirky behaviour that saveAsTable to e.g. a JDBC source with SaveMode other
than Overwrite will nevertheless overwrite the table in the external source,
if that table was not a catalog table.

Author: Juliusz Sompolski <julek@databricks.com>

Closes #17289 from juliuszsompolski/saveAsTableDoc.
2017-03-16 08:20:47 +08:00
Liang-Chi Hsieh 7d734a6583 [SPARK-19931][SQL] InMemoryTableScanExec should rewrite output partitioning and ordering when aliasing output attributes
## What changes were proposed in this pull request?

Now `InMemoryTableScanExec` simply takes the `outputPartitioning` and `outputOrdering` from the associated `InMemoryRelation`'s `child.outputPartitioning` and `outputOrdering`.

However, `InMemoryTableScanExec` can alias the output attributes. In this case, its `outputPartitioning` and `outputOrdering` are not correct and its parent operators can't correctly determine its data distribution.

## How was this patch tested?

Jenkins tests.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #17175 from viirya/ensure-no-unnecessary-shuffle.
2017-03-16 08:18:36 +08:00