Commit graph

2639 commits

Author SHA1 Message Date
petermaxlee d3af6731fa [SPARK-16274][SQL] Implement xpath_boolean
## What changes were proposed in this pull request?
This patch implements xpath_boolean expression for Spark SQL, a xpath function that returns true or false. The implementation is modelled after Hive's xpath_boolean, except that how the expression handles null inputs. Hive throws a NullPointerException at runtime if either of the input is null. This implementation returns null if either of the input is null.

## How was this patch tested?
Created two new test suites. One for unit tests covering the expression, and the other for end-to-end test in SQL.

Author: petermaxlee <petermaxlee@gmail.com>

Closes #13964 from petermaxlee/SPARK-16274.
2016-06-30 09:27:48 +08:00
Dongjoon Hyun 831a04f5d1 [SPARK-16267][TEST] Replace deprecated CREATE TEMPORARY TABLE ... USING from testsuites.
## What changes were proposed in this pull request?

After SPARK-15674, `DDLStrategy` prints out the following deprecation messages in the testsuites.

```
12:10:53.284 WARN org.apache.spark.sql.execution.SparkStrategies$DDLStrategy:
CREATE TEMPORARY TABLE normal_orc_source USING... is deprecated,
please use CREATE TEMPORARY VIEW viewName USING... instead
```

Total : 40
- JDBCWriteSuite: 14
- DDLSuite: 6
- TableScanSuite: 6
- ParquetSourceSuite: 5
- OrcSourceSuite: 2
- SQLQuerySuite: 2
- HiveCommandSuite: 2
- JsonSuite: 1
- PrunedScanSuite: 1
- FilteredScanSuite  1

This PR replaces `CREATE TEMPORARY TABLE` with `CREATE TEMPORARY VIEW` in order to remove the deprecation messages in the above testsuites except `DDLSuite`, `SQLQuerySuite`, `HiveCommandSuite`.

The Jenkins results shows only remaining 10 messages.

https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/61422/consoleFull

## How was this patch tested?

This is a testsuite-only change.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #13956 from dongjoon-hyun/SPARK-16267.
2016-06-29 17:29:17 -07:00
Wenchen Fan d063898beb [SPARK-16134][SQL] optimizer rules for typed filter
## What changes were proposed in this pull request?

This PR adds 3 optimizer rules for typed filter:

1. push typed filter down through `SerializeFromObject` and eliminate the deserialization in filter condition.
2. pull typed filter up through `SerializeFromObject` and eliminate the deserialization in filter condition.
3. combine adjacent typed filters and share the deserialized object among all the condition expressions.

This PR also adds `TypedFilter` logical plan, to separate it from normal filter, so that the concept is more clear and it's easier to write optimizer rules.

## How was this patch tested?

`TypedFilterOptimizationSuite`

Author: Wenchen Fan <wenchen@databricks.com>

Closes #13846 from cloud-fan/filter.
2016-06-30 08:15:08 +08:00
Dongjoon Hyun 9b1b3ae771 [SPARK-16006][SQL] Attemping to write empty DataFrame with no fields throws non-intuitive exception
## What changes were proposed in this pull request?

This PR allows `emptyDataFrame.write` since the user didn't specify any partition columns.

**Before**
```scala
scala> spark.emptyDataFrame.write.parquet("/tmp/t1")
org.apache.spark.sql.AnalysisException: Cannot use all columns for partition columns;
scala> spark.emptyDataFrame.write.csv("/tmp/t1")
org.apache.spark.sql.AnalysisException: Cannot use all columns for partition columns;
```

After this PR, there occurs no exceptions and the created directory has only one file, `_SUCCESS`, as expected.

## How was this patch tested?

Pass the Jenkins tests including updated test cases.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #13730 from dongjoon-hyun/SPARK-16006.
2016-06-29 15:00:41 -07:00
hyukjinkwon cb1b9d34f3 [SPARK-14480][SQL] Remove meaningless StringIteratorReader for CSV data source.
## What changes were proposed in this pull request?

This PR removes meaningless `StringIteratorReader` for CSV data source.

In `CSVParser.scala`, there is an `Reader` wrapping `Iterator` but there are two problems by this.

Firstly, it was actually not faster than processing line by line with Iterator due to additional logics to wrap `Iterator` to `Reader`.
Secondly, this brought a bit of complexity because it needs additional logics to allow every line to be read bytes by bytes. So, it was pretty difficult to figure out issues about parsing, (eg. SPARK-14103).

A benchmark was performed manually and the results were below:

- Original codes with Reader wrapping Iterator

|End-to-end (ns)  |   Parse Time (ns) |
|-----------------------|------------------------|
|14116265034      |2008277960        |

- New codes with Iterator

|End-to-end (ns)  |   Parse Time (ns) |
|-----------------------|------------------------|
|13451699644      | 1549050564       |

For the details for the environment, dataset and methods, please refer the JIRA ticket.

## How was this patch tested?

Existing tests should cover this.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #13808 from HyukjinKwon/SPARK-14480-small.
2016-06-29 11:42:51 -07:00
gatorsmile 7ee9e39cb4 [SPARK-16157][SQL] Add New Methods for comments in StructField and StructType
#### What changes were proposed in this pull request?
Based on the previous discussion with cloud-fan hvanhovell in another related PR https://github.com/apache/spark/pull/13764#discussion_r67994276, it looks reasonable to add convenience methods for users to add `comment` when defining `StructField`.

Currently, the column-related `comment` attribute is stored in `Metadata` of `StructField`. For example, users can add the `comment` attribute using the following way:
```Scala
StructType(
  StructField(
    "cl1",
    IntegerType,
    nullable = false,
    new MetadataBuilder().putString("comment", "test").build()) :: Nil)
```
This PR is to add more user friendly methods for the `comment` attribute when defining a `StructField`. After the changes, users are provided three different ways to do it:
```Scala
val struct = (new StructType)
  .add("a", "int", true, "test1")

val struct = (new StructType)
  .add("c", StringType, true, "test3")

val struct = (new StructType)
  .add(StructField("d", StringType).withComment("test4"))
```

#### How was this patch tested?
Added test cases:
- `DataTypeSuite` is for testing three types of API changes,
- `DataFrameReaderWriterSuite` is for parquet, json and csv formats - using in-memory catalog
- `OrcQuerySuite.scala` is for orc format using Hive-metastore

Author: gatorsmile <gatorsmile@gmail.com>

Closes #13860 from gatorsmile/newMethodForComment.
2016-06-29 19:36:21 +08:00
Cheng Lian d1e8108854 [SPARK-16291][SQL] CheckAnalysis should capture nested aggregate functions that reference no input attributes
## What changes were proposed in this pull request?

`MAX(COUNT(*))` is invalid since aggregate expression can't be nested within another aggregate expression. This case should be captured at analysis phase, but somehow sneaks off to runtime.

The reason is that when checking aggregate expressions in `CheckAnalysis`, a checking branch treats all expressions that reference no input attributes as valid ones. However, `MAX(COUNT(*))` is translated into `MAX(COUNT(1))` at analysis phase and also references no input attribute.

This PR fixes this issue by removing the aforementioned branch.

## How was this patch tested?

New test case added in `AnalysisErrorSuite`.

Author: Cheng Lian <lian@databricks.com>

Closes #13968 from liancheng/spark-16291-nested-agg-functions.
2016-06-29 19:08:36 +08:00
Holden Karau 757dc2c09d [TRIVIAL][DOCS][STREAMING][SQL] The return type mentioned in the Javadoc is incorrect for toJavaRDD, …
## What changes were proposed in this pull request?

Change the return type mentioned in the JavaDoc for `toJavaRDD` / `javaRDD` to match the actual return type & be consistent with the scala rdd return type.

## How was this patch tested?

Docs only change.

Author: Holden Karau <holden@us.ibm.com>

Closes #13954 from holdenk/trivial-streaming-tojavardd-doc-fix.
2016-06-29 01:52:20 -07:00
Burak Yavuz 5545b79109 [MINOR][DOCS][STRUCTURED STREAMING] Minor doc fixes around DataFrameWriter and DataStreamWriter
## What changes were proposed in this pull request?

Fixes a couple old references to `DataFrameWriter.startStream` to `DataStreamWriter.start

Author: Burak Yavuz <brkyvz@gmail.com>

Closes #13952 from brkyvz/minor-doc-fix.
2016-06-28 17:02:16 -07:00
Wenchen Fan 8a977b0654 [SPARK-16100][SQL] fix bug when use Map as the buffer type of Aggregator
## What changes were proposed in this pull request?

The root cause is in `MapObjects`. Its parameter `loopVar` is not declared as child, but sometimes can be same with `lambdaFunction`(e.g. the function that takes `loopVar` and produces `lambdaFunction` may be `identity`), which is a child. This brings trouble when call `withNewChildren`, it may mistakenly treat `loopVar` as a child and cause `IndexOutOfBoundsException: 0` later.

This PR fixes this bug by simply pulling out the paremters from `LambdaVariable` and pass them to `MapObjects` directly.

## How was this patch tested?

new test in `DatasetAggregatorSuite`

Author: Wenchen Fan <wenchen@databricks.com>

Closes #13835 from cloud-fan/map-objects.
2016-06-29 06:39:28 +08:00
gatorsmile 25520e9762 [SPARK-16236][SQL] Add Path Option back to Load API in DataFrameReader
#### What changes were proposed in this pull request?
koertkuipers identified the PR https://github.com/apache/spark/pull/13727/ changed the behavior of `load` API. After the change, the `load` API does not add the value of `path` into the `options`. Thank you!

This PR is to add the option `path` back to `load()` API in `DataFrameReader`, if and only if users specify one and only one `path` in the `load` API. For example, users can see the `path` option after the following API call,
```Scala
spark.read
  .format("parquet")
  .load("/test")
```

#### How was this patch tested?
Added test cases.

Author: gatorsmile <gatorsmile@gmail.com>

Closes #13933 from gatorsmile/optionPath.
2016-06-28 15:32:45 -07:00
Wenchen Fan 1f2776df6e [SPARK-16181][SQL] outer join with isNull filter may return wrong result
## What changes were proposed in this pull request?

The root cause is: the output attributes of outer join are derived from its children, while they are actually different attributes(outer join can return null).

We have already added some special logic to handle it, e.g. `PushPredicateThroughJoin` won't push down predicates through outer join side, `FixNullability`.

This PR adds one more special logic in `FoldablePropagation`.

## How was this patch tested?

new test in `DataFrameSuite`

Author: Wenchen Fan <wenchen@databricks.com>

Closes #13884 from cloud-fan/bug.
2016-06-28 10:26:01 -07:00
Prashant Sharma f6b497fcdd [SPARK-16128][SQL] Allow setting length of characters to be truncated to, in Dataset.show function.
## What changes were proposed in this pull request?

Allowing truncate to a specific number of character is convenient at times, especially while operating from the REPL. Sometimes those last few characters make all the difference, and showing everything brings in whole lot of noise.

## How was this patch tested?
Existing tests. + 1 new test in DataFrameSuite.

For SparkR and pyspark, existing tests and manual testing.

Author: Prashant Sharma <prashsh1@in.ibm.com>
Author: Prashant Sharma <prashant@apache.org>

Closes #13839 from ScrapCodes/add_truncateTo_DF.show.
2016-06-28 17:11:06 +05:30
gatorsmile 4cbf611c1d [SPARK-16202][SQL][DOC] Correct The Description of CreatableRelationProvider's createRelation
#### What changes were proposed in this pull request?
The API description of `createRelation` in `CreatableRelationProvider` is misleading. The current description only expects users to return the relation.

```Scala
trait CreatableRelationProvider {
  def createRelation(
      sqlContext: SQLContext,
      mode: SaveMode,
      parameters: Map[String, String],
      data: DataFrame): BaseRelation
}
```

However, the major goal of this API should also include saving the `DataFrame`.

Since this API is critical for Data Source API developers, this PR is to correct the description.

#### How was this patch tested?
N/A

Author: gatorsmile <gatorsmile@gmail.com>

Closes #13903 from gatorsmile/readUnderscoreFiles.
2016-06-27 23:12:17 -07:00
Dongjoon Hyun a0da854fb3 [SPARK-16221][SQL] Redirect Parquet JUL logger via SLF4J for WRITE operations
## What changes were proposed in this pull request?

[SPARK-8118](https://github.com/apache/spark/pull/8196) implements redirecting Parquet JUL logger via SLF4J, but it is currently applied only when READ operations occurs. If users use only WRITE operations, there occurs many Parquet logs.

This PR makes the redirection work on WRITE operations, too.

**Before**
```scala
scala> spark.range(10).write.format("parquet").mode("overwrite").save("/tmp/p")
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Jun 26, 2016 9:04:38 PM INFO: org.apache.parquet.hadoop.codec.CodecConfig: Compression: SNAPPY
............ about 70 lines Parquet Log .............
scala> spark.range(10).write.format("parquet").mode("overwrite").save("/tmp/p")
............ about 70 lines Parquet Log .............
```

**After**
```scala
scala> spark.range(10).write.format("parquet").mode("overwrite").save("/tmp/p")
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
scala> spark.range(10).write.format("parquet").mode("overwrite").save("/tmp/p")
```

This PR also fixes some typos.

## How was this patch tested?

Manual.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #13918 from dongjoon-hyun/SPARK-16221.
2016-06-28 13:01:18 +08:00
Herman van Hovell 02a029df43 [SPARK-16220][SQL] Add scope to show functions
## What changes were proposed in this pull request?
Spark currently shows all functions when issue a `SHOW FUNCTIONS` command. This PR refines the `SHOW FUNCTIONS` command by allowing users to select all functions, user defined function or system functions. The following syntax can be used:

**ALL** (default)
```SHOW FUNCTIONS```
```SHOW ALL FUNCTIONS```

**SYSTEM**
```SHOW SYSTEM FUNCTIONS```

**USER**
```SHOW USER FUNCTIONS```
## How was this patch tested?
Updated tests and added tests to the DDLSuite

Author: Herman van Hovell <hvanhovell@databricks.com>

Closes #13929 from hvanhovell/SPARK-16220.
2016-06-27 16:57:34 -07:00
Bill Chambers c48c8ebc0a [SPARK-16220][SQL] Revert Change to Bring Back SHOW FUNCTIONS Functionality
## What changes were proposed in this pull request?

- Fix tests regarding show functions functionality
- Revert `catalog.ListFunctions` and `SHOW FUNCTIONS` to return to `Spark 1.X` functionality.

Cherry picked changes from this PR: https://github.com/apache/spark/pull/13413/files

## How was this patch tested?

Unit tests.

Author: Bill Chambers <bill@databricks.com>
Author: Bill Chambers <wchambers@ischool.berkeley.edu>

Closes #13916 from anabranch/master.
2016-06-27 11:50:34 -07:00
Dongjoon Hyun 11f420b4bb [SPARK-10591][SQL][TEST] Add a testcase to ensure if checkAnswer handles map correctly
## What changes were proposed in this pull request?

This PR adds a testcase to ensure if `checkAnswer` handles Map type correctly.

## How was this patch tested?

Pass the jenkins tests.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #13913 from dongjoon-hyun/SPARK-10591.
2016-06-27 19:04:50 +08:00
Felix Cheung 30b182bcc0 [SPARK-16184][SPARKR] conf API for SparkSession
## What changes were proposed in this pull request?

Add `conf` method to get Runtime Config from SparkSession

## How was this patch tested?

unit tests, manual tests

This is how it works in sparkR shell:
```
 SparkSession available as 'spark'.
> conf()
$hive.metastore.warehouse.dir
[1] "file:/opt/spark-2.0.0-bin-hadoop2.6/R/spark-warehouse"

$spark.app.id
[1] "local-1466749575523"

$spark.app.name
[1] "SparkR"

$spark.driver.host
[1] "10.0.2.1"

$spark.driver.port
[1] "45629"

$spark.executorEnv.LD_LIBRARY_PATH
[1] "$LD_LIBRARY_PATH:/usr/lib/R/lib:/usr/lib/x86_64-linux-gnu:/usr/lib/jvm/default-java/jre/lib/amd64/server"

$spark.executor.id
[1] "driver"

$spark.home
[1] "/opt/spark-2.0.0-bin-hadoop2.6"

$spark.master
[1] "local[*]"

$spark.sql.catalogImplementation
[1] "hive"

$spark.submit.deployMode
[1] "client"

> conf("spark.master")
$spark.master
[1] "local[*]"

```

Author: Felix Cheung <felixcheung_m@hotmail.com>

Closes #13885 from felixcheung/rconf.
2016-06-26 13:10:43 -07:00
Sital Kedia bf665a9586 [SPARK-15958] Make initial buffer size for the Sorter configurable
## What changes were proposed in this pull request?

Currently the initial buffer size in the sorter is hard coded inside the code and is too small for large workload. As a result, the sorter spends significant time expanding the buffer size and copying the data. It would be useful to have it configurable.

## How was this patch tested?

Tested by running a job on the cluster.

Author: Sital Kedia <skedia@fb.com>

Closes #13699 from sitalkedia/config_sort_buffer_upstream.
2016-06-25 09:13:39 +01:00
Dongjoon Hyun a7d29499dc [SPARK-16186] [SQL] Support partition batch pruning with IN predicate in InMemoryTableScanExec
## What changes were proposed in this pull request?

One of the most frequent usage patterns for Spark SQL is using **cached tables**. This PR improves `InMemoryTableScanExec` to handle `IN` predicate efficiently by pruning partition batches. Of course, the performance improvement varies over the queries and the datasets. But, for the following simple query, the query duration in Spark UI goes from 9 seconds to 50~90ms. It's about over 100 times faster.

**Before**
```scala
$ bin/spark-shell --driver-memory 6G
scala> val df = spark.range(2000000000)
scala> df.createOrReplaceTempView("t")
scala> spark.catalog.cacheTable("t")
scala> sql("select id from t where id = 1").collect()    // About 2 mins
scala> sql("select id from t where id = 1").collect()    // less than 90ms
scala> sql("select id from t where id in (1,2,3)").collect()  // 9 seconds
```

**After**
```scala
scala> sql("select id from t where id in (1,2,3)").collect() // less than 90ms
```

This PR has impacts over 35 queries of TPC-DS if the tables are cached.
Note that this optimization is applied for `IN`.  To apply `IN` predicate having more than 10 items, `spark.sql.optimizer.inSetConversionThreshold` option should be increased.

## How was this patch tested?

Pass the Jenkins tests (including new testcases).

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #13887 from dongjoon-hyun/SPARK-16186.
2016-06-24 22:34:31 -07:00
Takeshi YAMAMURO d2e44d7db8 [SPARK-16192][SQL] Add type checks in CollectSet
## What changes were proposed in this pull request?
`CollectSet` cannot have map-typed data because MapTypeData does not implement `equals`.
So, this pr is to add type checks in `CheckAnalysis`.

## How was this patch tested?
Added tests to check failures when we found map-typed data in `CollectSet`.

Author: Takeshi YAMAMURO <linguin.m.s@gmail.com>

Closes #13892 from maropu/SPARK-16192.
2016-06-24 21:07:03 -07:00
Dilip Biswal 9053054c7f [SPARK-16195][SQL] Allow users to specify empty over clause in window expressions through dataset API
## What changes were proposed in this pull request?
Allow to specify empty over clause in window expressions through dataset API

In SQL, its allowed to specify an empty OVER clause in the window expression.

```SQL
select area, sum(product) over () as c from windowData
where product > 3 group by area, product
having avg(month) > 0 order by avg(month), product
```
In this case the analytic function sum is presented based on all the rows of the result set

Currently its not allowed through dataset API and is handled in this PR.

## How was this patch tested?

Added a new test in DataframeWindowSuite

Author: Dilip Biswal <dbiswal@us.ibm.com>

Closes #13897 from dilipbiswal/spark-empty-over.
2016-06-24 17:27:33 -07:00
Dongjoon Hyun e5d0928e24 [SPARK-16173] [SQL] Can't join describe() of DataFrame in Scala 2.10
## What changes were proposed in this pull request?

This PR fixes `DataFrame.describe()` by forcing materialization to make the `Seq` serializable. Currently, `describe()` of DataFrame throws `Task not serializable` Spark exceptions when joining in Scala 2.10.

## How was this patch tested?

Manual. (After building with Scala 2.10, test on `bin/spark-shell` and `bin/pyspark`.)

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #13900 from dongjoon-hyun/SPARK-16173.
2016-06-24 17:26:39 -07:00
Davies Liu 20768dade2 Revert "[SPARK-16186] [SQL] Support partition batch pruning with IN predicate in InMemoryTableScanExec"
This reverts commit a65bcbc27d.
2016-06-24 17:21:18 -07:00
Dongjoon Hyun a65bcbc27d [SPARK-16186] [SQL] Support partition batch pruning with IN predicate in InMemoryTableScanExec
## What changes were proposed in this pull request?

One of the most frequent usage patterns for Spark SQL is using **cached tables**. This PR improves `InMemoryTableScanExec` to handle `IN` predicate efficiently by pruning partition batches. Of course, the performance improvement varies over the queries and the datasets. But, for the following simple query, the query duration in Spark UI goes from 9 seconds to 50~90ms. It's about over 100 times faster.

**Before**
```scala
$ bin/spark-shell --driver-memory 6G
scala> val df = spark.range(2000000000)
scala> df.createOrReplaceTempView("t")
scala> spark.catalog.cacheTable("t")
scala> sql("select id from t where id = 1").collect()    // About 2 mins
scala> sql("select id from t where id = 1").collect()    // less than 90ms
scala> sql("select id from t where id in (1,2,3)").collect()  // 9 seconds
```

**After**
```scala
scala> sql("select id from t where id in (1,2,3)").collect() // less than 90ms
```

This PR has impacts over 35 queries of TPC-DS if the tables are cached.
Note that this optimization is applied for `IN`.  To apply `IN` predicate having more than 10 items, `spark.sql.optimizer.inSetConversionThreshold` option should be increased.

## How was this patch tested?

Pass the Jenkins tests (including new testcases).

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #13887 from dongjoon-hyun/SPARK-16186.
2016-06-24 17:13:13 -07:00
Davies Liu 4435de1bd3 [SPARK-16179][PYSPARK] fix bugs for Python udf in generate
## What changes were proposed in this pull request?

This PR fix the bug when Python UDF is used in explode (generator), GenerateExec requires that all the attributes in expressions should be resolvable from children when creating, we should replace the children first, then replace it's expressions.

```
>>> df.select(explode(f(*df))).show()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/vlad/dev/spark/python/pyspark/sql/dataframe.py", line 286, in show
    print(self._jdf.showString(n, truncate))
  File "/home/vlad/dev/spark/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py", line 933, in __call__
  File "/home/vlad/dev/spark/python/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/home/vlad/dev/spark/python/lib/py4j-0.10.1-src.zip/py4j/protocol.py", line 312, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o52.showString.
: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: makeCopy, tree:
Generate explode(<lambda>(_1#0L)), false, false, [col#15L]
+- Scan ExistingRDD[_1#0L]

	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:50)
	at org.apache.spark.sql.catalyst.trees.TreeNode.makeCopy(TreeNode.scala:387)
	at org.apache.spark.sql.execution.SparkPlan.makeCopy(SparkPlan.scala:69)
	at org.apache.spark.sql.execution.SparkPlan.makeCopy(SparkPlan.scala:45)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsDown(QueryPlan.scala:177)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressions(QueryPlan.scala:144)
	at org.apache.spark.sql.execution.python.ExtractPythonUDFs$.org$apache$spark$sql$execution$python$ExtractPythonUDFs$$extract(ExtractPythonUDFs.scala:153)
	at org.apache.spark.sql.execution.python.ExtractPythonUDFs$$anonfun$apply$2.applyOrElse(ExtractPythonUDFs.scala:114)
	at org.apache.spark.sql.execution.python.ExtractPythonUDFs$$anonfun$apply$2.applyOrElse(ExtractPythonUDFs.scala:113)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:301)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:301)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:300)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:298)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:298)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:321)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:179)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:319)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:298)
	at org.apache.spark.sql.execution.python.ExtractPythonUDFs$.apply(ExtractPythonUDFs.scala:113)
	at org.apache.spark.sql.execution.python.ExtractPythonUDFs$.apply(ExtractPythonUDFs.scala:93)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$prepareForExecution$1.apply(QueryExecution.scala:95)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$prepareForExecution$1.apply(QueryExecution.scala:95)
	at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
	at scala.collection.immutable.List.foldLeft(List.scala:84)
	at org.apache.spark.sql.execution.QueryExecution.prepareForExecution(QueryExecution.scala:95)
	at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:85)
	at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:85)
	at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2557)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:1923)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2138)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:239)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:211)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.reflect.InvocationTargetException
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$makeCopy$1$$anonfun$apply$13.apply(TreeNode.scala:413)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$makeCopy$1$$anonfun$apply$13.apply(TreeNode.scala:413)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$makeCopy$1.apply(TreeNode.scala:412)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$makeCopy$1.apply(TreeNode.scala:387)
	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:49)
	... 42 more
Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding attribute, tree: pythonUDF0#20
	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:50)
	at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:88)
	at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:87)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:279)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:279)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:278)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:284)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:284)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:321)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:179)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:319)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:284)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:268)
	at org.apache.spark.sql.catalyst.expressions.BindReferences$.bindReference(BoundAttribute.scala:87)
	at org.apache.spark.sql.execution.GenerateExec.<init>(GenerateExec.scala:63)
	... 52 more
Caused by: java.lang.RuntimeException: Couldn't find pythonUDF0#20 in [_1#0L]
	at scala.sys.package$.error(package.scala:27)
	at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1$$anonfun$applyOrElse$1.apply(BoundAttribute.scala:94)
	at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1$$anonfun$applyOrElse$1.apply(BoundAttribute.scala:88)
	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:49)
	... 67 more
```

## How was this patch tested?

Added regression tests.

Author: Davies Liu <davies@databricks.com>

Closes #13883 from davies/udf_in_generate.
2016-06-24 15:20:39 -07:00
Reynold Xin 5f8de21606 [SQL][MINOR] Simplify data source predicate filter translation.
## What changes were proposed in this pull request?
This is a small patch to rewrite the predicate filter translation in DataSourceStrategy. The original code used excessive functional constructs (e.g. unzip) and was very difficult to understand.

## How was this patch tested?
Should be covered by existing tests.

Author: Reynold Xin <rxin@databricks.com>

Closes #13889 from rxin/simplify-predicate-filter.
2016-06-24 14:44:24 -07:00
Sean Owen 158af162ea [SPARK-16129][CORE][SQL] Eliminate direct use of commons-lang classes in favor of commons-lang3
## What changes were proposed in this pull request?

Replace use of `commons-lang` in favor of `commons-lang3` and forbid the former via scalastyle; remove `NotImplementedException` from `comons-lang` in favor of JDK `UnsupportedOperationException`

## How was this patch tested?

Jenkins tests

Author: Sean Owen <sowen@cloudera.com>

Closes #13843 from srowen/SPARK-16129.
2016-06-24 10:35:54 +01:00
Wenchen Fan 6a3c6276f5 [SQL][MINOR] ParserUtils.operationNotAllowed should throw exception directly
## What changes were proposed in this pull request?

It's weird that `ParserUtils.operationNotAllowed` returns an exception and the caller throw it.

## How was this patch tested?

N/A

Author: Wenchen Fan <wenchen@databricks.com>

Closes #13874 from cloud-fan/style.
2016-06-23 20:20:55 -07:00
Sameer Agarwal cc71d4fa37 [SPARK-16123] Avoid NegativeArraySizeException while reserving additional capacity in VectorizedColumnReader
## What changes were proposed in this pull request?

This patch fixes an overflow bug in vectorized parquet reader where both off-heap and on-heap variants of `ColumnVector.reserve()` can unfortunately overflow while reserving additional capacity during reads.

## How was this patch tested?

Manual Tests

Author: Sameer Agarwal <sameer@databricks.com>

Closes #13832 from sameeragarwal/negative-array.
2016-06-23 18:21:41 -07:00
Dongjoon Hyun 264bc63623 [SPARK-16165][SQL] Fix the update logic for InMemoryTableScanExec.readBatches
## What changes were proposed in this pull request?

Currently, `readBatches` accumulator of `InMemoryTableScanExec` is updated only when `spark.sql.inMemoryColumnarStorage.partitionPruning` is true. Although this metric is used for only testing purpose, we had better have correct metric without considering SQL options.

## How was this patch tested?

Pass the Jenkins tests (including a new testcase).

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #13870 from dongjoon-hyun/SPARK-16165.
2016-06-24 07:19:20 +08:00
Shixiong Zhu 0e4bdebece [SPARK-15443][SQL] Fix 'explain' for streaming Dataset
## What changes were proposed in this pull request?

- Fix the `explain` command for streaming Dataset/DataFrame. E.g.,
```
== Parsed Logical Plan ==
'SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#7]
+- 'MapElements <function1>, obj#6: java.lang.String
   +- 'DeserializeToObject unresolveddeserializer(createexternalrow(getcolumnbyordinal(0, StringType).toString, StructField(value,StringType,true))), obj#5: org.apache.spark.sql.Row
      +- Filter <function1>.apply
         +- StreamingRelation FileSource[/Users/zsx/stream], [value#0]

== Analyzed Logical Plan ==
value: string
SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#7]
+- MapElements <function1>, obj#6: java.lang.String
   +- DeserializeToObject createexternalrow(value#0.toString, StructField(value,StringType,true)), obj#5: org.apache.spark.sql.Row
      +- Filter <function1>.apply
         +- StreamingRelation FileSource[/Users/zsx/stream], [value#0]

== Optimized Logical Plan ==
SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#7]
+- MapElements <function1>, obj#6: java.lang.String
   +- DeserializeToObject createexternalrow(value#0.toString, StructField(value,StringType,true)), obj#5: org.apache.spark.sql.Row
      +- Filter <function1>.apply
         +- StreamingRelation FileSource[/Users/zsx/stream], [value#0]

== Physical Plan ==
*SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#7]
+- *MapElements <function1>, obj#6: java.lang.String
   +- *DeserializeToObject createexternalrow(value#0.toString, StructField(value,StringType,true)), obj#5: org.apache.spark.sql.Row
      +- *Filter <function1>.apply
         +- StreamingRelation FileSource[/Users/zsx/stream], [value#0]
```

- Add `StreamingQuery.explain` to display the last execution plan. E.g.,
```
== Parsed Logical Plan ==
SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#7]
+- MapElements <function1>, obj#6: java.lang.String
   +- DeserializeToObject createexternalrow(value#12.toString, StructField(value,StringType,true)), obj#5: org.apache.spark.sql.Row
      +- Filter <function1>.apply
         +- Relation[value#12] text

== Analyzed Logical Plan ==
value: string
SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#7]
+- MapElements <function1>, obj#6: java.lang.String
   +- DeserializeToObject createexternalrow(value#12.toString, StructField(value,StringType,true)), obj#5: org.apache.spark.sql.Row
      +- Filter <function1>.apply
         +- Relation[value#12] text

== Optimized Logical Plan ==
SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#7]
+- MapElements <function1>, obj#6: java.lang.String
   +- DeserializeToObject createexternalrow(value#12.toString, StructField(value,StringType,true)), obj#5: org.apache.spark.sql.Row
      +- Filter <function1>.apply
         +- Relation[value#12] text

== Physical Plan ==
*SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#7]
+- *MapElements <function1>, obj#6: java.lang.String
   +- *DeserializeToObject createexternalrow(value#12.toString, StructField(value,StringType,true)), obj#5: org.apache.spark.sql.Row
      +- *Filter <function1>.apply
         +- *Scan text [value#12] Format: org.apache.spark.sql.execution.datasources.text.TextFileFormat1836ab91, InputPaths: file:/Users/zsx/stream/a.txt, file:/Users/zsx/stream/b.txt, file:/Users/zsx/stream/c.txt, PushedFilters: [], ReadSchema: struct<value:string>
```

## How was this patch tested?

The added unit tests.

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #13815 from zsxwing/sdf-explain.
2016-06-23 16:04:16 -07:00
Davies Liu 10396d9505 [SPARK-16163] [SQL] Cache the statistics for logical plans
## What changes were proposed in this pull request?

This calculation of statistics is not trivial anymore, it could be very slow on large query (for example, TPC-DS Q64 took several minutes to plan).

During the planning of a query, the statistics of any logical plan should not change (even InMemoryRelation), so we should use `lazy val` to cache the statistics.

For InMemoryRelation, the statistics could be updated after materialization, it's only useful when used in another query (before planning), because once we finished the planning, the statistics will not be used anymore.

## How was this patch tested?

Testsed with TPC-DS Q64, it could be planned in a second after the patch.

Author: Davies Liu <davies@databricks.com>

Closes #13871 from davies/fix_statistics.
2016-06-23 11:48:48 -07:00
Shixiong Zhu d85bb10ce4 [SPARK-16116][SQL] ConsoleSink should not require checkpointLocation
## What changes were proposed in this pull request?

When the user uses `ConsoleSink`, we should use a temp location if `checkpointLocation` is not specified.

## How was this patch tested?

The added unit test.

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #13817 from zsxwing/console-checkpoint.
2016-06-23 10:46:20 -07:00
Cheng Lian f34b5c62b2 [SQL][MINOR] Fix minor formatting issues in SHOW CREATE TABLE output
## What changes were proposed in this pull request?

This PR fixes two minor formatting issues appearing in `SHOW CREATE TABLE` output.

Before:

```
CREATE EXTERNAL TABLE ...
...
WITH SERDEPROPERTIES ('serialization.format' = '1'
)
...
TBLPROPERTIES ('avro.schema.url' = '/tmp/avro/test.avsc',
  'transient_lastDdlTime' = '1466638180')
```

After:

```
CREATE EXTERNAL TABLE ...
...
WITH SERDEPROPERTIES (
  'serialization.format' = '1'
)
...
TBLPROPERTIES (
  'avro.schema.url' = '/tmp/avro/test.avsc',
  'transient_lastDdlTime' = '1466638180'
)
```

## How was this patch tested?

Manually tested.

Author: Cheng Lian <lian@databricks.com>

Closes #13864 from liancheng/show-create-table-format-fix.
2016-06-22 22:28:54 -07:00
bomeng 925884a612 [SPARK-15230][SQL] distinct() does not handle column name with dot properly
## What changes were proposed in this pull request?

When table is created with column name containing dot, distinct() will fail to run. For example,
```scala
val rowRDD = sparkContext.parallelize(Seq(Row(1), Row(1), Row(2)))
val schema = StructType(Array(StructField("column.with.dot", IntegerType, nullable = false)))
val df = spark.createDataFrame(rowRDD, schema)
```
running the following will have no problem:
```scala
df.select(new Column("`column.with.dot`"))
```
but running the query with additional distinct() will cause exception:
```scala
df.select(new Column("`column.with.dot`")).distinct()
```

The issue is that distinct() will try to resolve the column name, but the column name in the schema does not have backtick with it. So the solution is to add the backtick before passing the column name to resolve().

## How was this patch tested?

Added a new test case.

Author: bomeng <bmeng@us.ibm.com>

Closes #13140 from bomeng/SPARK-15230.
2016-06-23 11:06:19 +08:00
Reynold Xin 37f3be5d29 [SPARK-16159][SQL] Move RDD creation logic from FileSourceStrategy.apply
## What changes were proposed in this pull request?
We embed partitioning logic in FileSourceStrategy.apply, making the function very long. This is a small refactoring to move it into its own functions. Eventually we would be able to move the partitioning functions into a physical operator, rather than doing it in physical planning.

## How was this patch tested?
This is a simple code move.

Author: Reynold Xin <rxin@databricks.com>

Closes #13862 from rxin/SPARK-16159.
2016-06-22 18:19:07 -07:00
gatorsmile 9f990fa3f9 [SPARK-16024][SQL][TEST] Verify Column Comment for Data Source Tables
#### What changes were proposed in this pull request?
This PR is to improve test coverage. It verifies whether `Comment` of `Column` can be appropriate handled.

The test cases verify the related parts in Parser, both SQL and DataFrameWriter interface, and both Hive Metastore catalog and In-memory catalog.

#### How was this patch tested?
N/A

Author: gatorsmile <gatorsmile@gmail.com>

Closes #13764 from gatorsmile/dataSourceComment.
2016-06-23 09:12:20 +08:00
Wenchen Fan 01277d4b25 [SPARK-16097][SQL] Encoders.tuple should handle null object correctly
## What changes were proposed in this pull request?

Although the top level input object can not be null, but when we use `Encoders.tuple` to combine 2 encoders, their input objects are not top level anymore and can be null. We should handle this case.

## How was this patch tested?

new test in DatasetSuite

Author: Wenchen Fan <wenchen@databricks.com>

Closes #13807 from cloud-fan/bug.
2016-06-22 18:32:14 +08:00
Yin Huai 39ad53f7ff [SPARK-16121] ListingFileCatalog does not list in parallel anymore
## What changes were proposed in this pull request?
Seems the fix of SPARK-14959 breaks the parallel partitioning discovery. This PR fixes the problem

## How was this patch tested?
Tested manually. (This PR also adds a proper test for SPARK-14959)

Author: Yin Huai <yhuai@databricks.com>

Closes #13830 from yhuai/SPARK-16121.
2016-06-22 18:07:07 +08:00
gatorsmile 0e3ce75332 [SPARK-15644][MLLIB][SQL] Replace SQLContext with SparkSession in MLlib
#### What changes were proposed in this pull request?
This PR is to use the latest `SparkSession` to replace the existing `SQLContext` in `MLlib`. `SQLContext` is removed from `MLlib`.

Also fix a test case issue in `BroadcastJoinSuite`.

BTW, `SQLContext` is not being used in the `MLlib` test suites.
#### How was this patch tested?
Existing test cases.

Author: gatorsmile <gatorsmile@gmail.com>
Author: xiaoli <lixiao1983@gmail.com>
Author: Xiao Li <xiaoli@Xiaos-MacBook-Pro.local>

Closes #13380 from gatorsmile/sqlContextML.
2016-06-21 23:12:08 -07:00
hyukjinkwon 7580f3041a [SPARK-16104] [SQL] Do not creaate CSV writer object for every flush when writing
## What changes were proposed in this pull request?

This PR let `CsvWriter` object is not created for each time but able to be reused. This way was taken after from JSON data source.

Original `CsvWriter` was being created for each row but it was enhanced in https://github.com/apache/spark/pull/13229. However, it still creates `CsvWriter` object for each `flush()` in `LineCsvWriter`. It seems it does not have to close the object and re-create this for every flush.

It follows the original logic as it is but `CsvWriter` is reused by reseting `CharArrayWriter`.

## How was this patch tested?

Existing tests should cover this.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #13809 from HyukjinKwon/write-perf.
2016-06-21 21:58:38 -07:00
Shixiong Zhu c399c7f0e4 [SPARK-16002][SQL] Sleep when no new data arrives to avoid 100% CPU usage
## What changes were proposed in this pull request?

Add a configuration to allow people to set a minimum polling delay when no new data arrives (default is 10ms). This PR also cleans up some INFO logs.

## How was this patch tested?

Existing unit tests.

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #13718 from zsxwing/SPARK-16002.
2016-06-21 12:42:49 -07:00
bomeng f3a768b7b9 [SPARK-16084][SQL] Minor comments update for "DESCRIBE" table
## What changes were proposed in this pull request?

1. FORMATTED is actually supported, but partition is not supported;
2. Remove parenthesis as it is not necessary just like anywhere else.

## How was this patch tested?

Minor issue. I do not think it needs a test case!

Author: bomeng <bmeng@us.ibm.com>

Closes #13791 from bomeng/SPARK-16084.
2016-06-21 08:51:43 +01:00
hyukjinkwon 4f7f1c4362 [SPARK-16044][SQL] input_file_name() returns empty strings in data sources based on NewHadoopRDD
## What changes were proposed in this pull request?

This PR makes `input_file_name()` function return the file paths not empty strings for external data sources based on `NewHadoopRDD`, such as [spark-redshift](cba5eee1ab/src/main/scala/com/databricks/spark/redshift/RedshiftRelation.scala (L149)) and [spark-xml](https://github.com/databricks/spark-xml/blob/master/src/main/scala/com/databricks/spark/xml/util/XmlFile.scala#L39-L47).

The codes with the external data sources below:

```scala
df.select(input_file_name).show()
```

will produce

- **Before**
  ```
+-----------------+
|input_file_name()|
+-----------------+
|                 |
+-----------------+
```

- **After**
  ```
+--------------------+
|   input_file_name()|
+--------------------+
|file:/private/var...|
+--------------------+
```

## How was this patch tested?

Unit tests in `ColumnExpressionSuite`.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #13759 from HyukjinKwon/SPARK-16044.
2016-06-20 21:55:34 -07:00
gatorsmile d9a3a2a0be [SPARK-16056][SPARK-16057][SPARK-16058][SQL] Fix Multiple Bugs in Column Partitioning in JDBC Source
#### What changes were proposed in this pull request?
This PR is to fix the following bugs:

**Issue 1: Wrong Results when lowerBound is larger than upperBound in Column Partitioning**
```scala
spark.read.jdbc(
  url = urlWithUserAndPass,
  table = "TEST.seq",
  columnName = "id",
  lowerBound = 4,
  upperBound = 0,
  numPartitions = 3,
  connectionProperties = new Properties)
```
**Before code changes:**
The returned results are wrong and the generated partitions are wrong:
```
  Part 0 id < 3 or id is null
  Part 1 id >= 3 AND id < 2
  Part 2 id >= 2
```
**After code changes:**
Issue an `IllegalArgumentException` exception:
```
Operation not allowed: the lower bound of partitioning column is larger than the upper bound. lowerBound: 5; higherBound: 1
```
**Issue 2: numPartitions is more than the number of key values between upper and lower bounds**
```scala
spark.read.jdbc(
  url = urlWithUserAndPass,
  table = "TEST.seq",
  columnName = "id",
  lowerBound = 1,
  upperBound = 5,
  numPartitions = 10,
  connectionProperties = new Properties)
```
**Before code changes:**
Returned correct results but the generated partitions are very inefficient, like:
```
Partition 0: id < 1 or id is null
Partition 1: id >= 1 AND id < 1
Partition 2: id >= 1 AND id < 1
Partition 3: id >= 1 AND id < 1
Partition 4: id >= 1 AND id < 1
Partition 5: id >= 1 AND id < 1
Partition 6: id >= 1 AND id < 1
Partition 7: id >= 1 AND id < 1
Partition 8: id >= 1 AND id < 1
Partition 9: id >= 1
```
**After code changes:**
Adjust `numPartitions` and can return the correct answers:
```
Partition 0: id < 2 or id is null
Partition 1: id >= 2 AND id < 3
Partition 2: id >= 3 AND id < 4
Partition 3: id >= 4
```
**Issue 3: java.lang.ArithmeticException when numPartitions is zero**
```Scala
spark.read.jdbc(
  url = urlWithUserAndPass,
  table = "TEST.seq",
  columnName = "id",
  lowerBound = 0,
  upperBound = 4,
  numPartitions = 0,
  connectionProperties = new Properties)
```
**Before code changes:**
Got the following exception:
```
  java.lang.ArithmeticException: / by zero
```
**After code changes:**
Able to return a correct answer by disabling column partitioning when numPartitions is equal to or less than zero

#### How was this patch tested?
Added test cases to verify the results

Author: gatorsmile <gatorsmile@gmail.com>

Closes #13773 from gatorsmile/jdbcPartitioning.
2016-06-20 21:49:33 -07:00
Reynold Xin c775bf09e0 [SPARK-13792][SQL] Limit logging of bad records in CSV data source
## What changes were proposed in this pull request?
This pull request adds a new option (maxMalformedLogPerPartition) in CSV reader to limit the maximum of logging message Spark generates per partition for malformed records.

The error log looks something like
```
16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4
16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4
16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4
16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4
16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4
16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4
16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4
16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4
16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4
16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4
16/06/20 18:50:14 WARN CSVRelation: More than 10 malformed records have been found on this partition. Malformed records from now on will not be logged.
```

Closes #12173

## How was this patch tested?
Manually tested.

Author: Reynold Xin <rxin@databricks.com>

Closes #13795 from rxin/SPARK-13792.
2016-06-20 21:46:12 -07:00
Kousuke Saruta 6daa8cf1a6 [SPARK-16061][SQL][MINOR] The property "spark.streaming.stateStore.maintenanceInterval" should be renamed to "spark.sql.streaming.stateStore.maintenanceInterval"
## What changes were proposed in this pull request?
The property spark.streaming.stateStore.maintenanceInterval should be renamed and harmonized with other properties related to Structured Streaming like spark.sql.streaming.stateStore.minDeltasForSnapshot.

## How was this patch tested?
Existing unit tests.

Author: Kousuke Saruta <sarutak@oss.nttdata.co.jp>

Closes #13777 from sarutak/SPARK-16061.
2016-06-20 15:12:40 -07:00
Tathagata Das b99129cc45 [SPARK-15982][SPARK-16009][SPARK-16007][SQL] Harmonize the behavior of DataFrameReader.text/csv/json/parquet/orc
## What changes were proposed in this pull request?

Issues with current reader behavior.
- `text()` without args returns an empty DF with no columns -> inconsistent, its expected that text will always return a DF with `value` string field,
- `textFile()` without args fails with exception because of the above reason, it expected the DF returned by `text()` to have a `value` field.
- `orc()` does not have var args, inconsistent with others
- `json(single-arg)` was removed, but that caused source compatibility issues - [SPARK-16009](https://issues.apache.org/jira/browse/SPARK-16009)
- user specified schema was not respected when `text/csv/...` were used with no args - [SPARK-16007](https://issues.apache.org/jira/browse/SPARK-16007)

The solution I am implementing is to do the following.
- For each format, there will be a single argument method, and a vararg method. For json, parquet, csv, text, this means adding json(string), etc.. For orc, this means adding orc(varargs).
- Remove the special handling of text(), csv(), etc. that returns empty dataframe with no fields. Rather pass on the empty sequence of paths to the datasource, and let each datasource handle it right. For e.g, text data source, should return empty DF with schema (value: string)
- Deduped docs and fixed their formatting.

## How was this patch tested?
Added new unit tests for Scala and Java tests

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #13727 from tdas/SPARK-15982.
2016-06-20 14:52:28 -07:00