Commit graph

1230 commits

Author SHA1 Message Date
Liang-Chi Hsieh 750ed64cd9 [SPARK-13930] [SQL] Apply fast serialization on collect limit operator
## What changes were proposed in this pull request?

JIRA: https://issues.apache.org/jira/browse/SPARK-13930

Recently the fast serialization has been introduced to collecting DataFrame/Dataset (#11664). The same technology can be used on collect limit operator too.

## How was this patch tested?

Add a benchmark for collect limit to `BenchmarkWholeStageCodegen`.

Without this patch:

    model name      : Westmere E56xx/L56xx/X56xx (Nehalem-C)
    collect limit:                      Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
    -------------------------------------------------------------------------------------------
    collect limit 1 million                  3413 / 3768          0.3        3255.0       1.0X
    collect limit 2 millions                9728 / 10440          0.1        9277.3       0.4X

With this patch:

    model name      : Westmere E56xx/L56xx/X56xx (Nehalem-C)
    collect limit:                      Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
    -------------------------------------------------------------------------------------------
    collect limit 1 million                   833 / 1284          1.3         794.4       1.0X
    collect limit 2 millions                 3348 / 4005          0.3        3193.3       0.2X

Author: Liang-Chi Hsieh <simonh@tw.ibm.com>

Closes #11759 from viirya/execute-take.
2016-03-17 23:24:44 -07:00
Dilip Biswal 637a78f1d3 [SPARK-13427][SQL] Support USING clause in JOIN.
## What changes were proposed in this pull request?

Support queries that JOIN tables with USING clause.
SELECT * from table1 JOIN table2 USING <column_list>

USING clause can be used as a means to simplify the join condition
when :

1) Equijoin semantics is desired and
2) The column names in the equijoin have the same name.

We already have the support for Natural Join in Spark. This PR makes
use of the already existing infrastructure for natural join to
form the join condition and also the projection list.

## How was the this patch tested?

Have added unit tests in SQLQuerySuite, CatalystQlSuite, ResolveNaturalJoinSuite

Author: Dilip Biswal <dbiswal@us.ibm.com>

Closes #11297 from dilipbiswal/spark-13427.
2016-03-17 10:01:41 -07:00
Wenchen Fan 8ef3399aff [SPARK-13928] Move org.apache.spark.Logging into org.apache.spark.internal.Logging
## What changes were proposed in this pull request?

Logging was made private in Spark 2.0. If we move it, then users would be able to create a Logging trait themselves to avoid changing their own code.

## How was this patch tested?

existing tests.

Author: Wenchen Fan <wenchen@databricks.com>

Closes #11764 from cloud-fan/logger.
2016-03-17 19:23:38 +08:00
Josh Rosen de1a84e56e [SPARK-13926] Automatically use Kryo serializer when shuffling RDDs with simple types
Because ClassTags are available when constructing ShuffledRDD we can use them to automatically use Kryo for shuffle serialization when the RDD's types are known to be compatible with Kryo.

This patch introduces `SerializerManager`, a component which picks the "best" serializer for a shuffle given the elements' ClassTags. It will automatically pick a Kryo serializer for ShuffledRDDs whose key, value, and/or combiner types are primitives, arrays of primitives, or strings. In the future we can use this class as a narrow extension point to integrate specialized serializers for other types, such as ByteBuffers.

In a planned followup patch, I will extend the BlockManager APIs so that we're able to use similar automatic serializer selection when caching RDDs (this is a little trickier because the ClassTags need to be threaded through many more places).

Author: Josh Rosen <joshrosen@databricks.com>

Closes #11755 from JoshRosen/automatically-pick-best-serializer.
2016-03-16 22:52:55 -07:00
Dongjoon Hyun c890c359b1 [MINOR][SQL][BUILD] Remove duplicated lines
## What changes were proposed in this pull request?

This PR removes three minor duplicated lines. First one is making the following unreachable code warning.
```
JoinSuite.scala:52: unreachable code
[warn]       case j: BroadcastHashJoin => j
```
The other two are just consecutive repetitions in `Seq` of MiMa filters.

## How was this patch tested?

Pass the existing Jenkins test.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #11773 from dongjoon-hyun/remove_duplicated_line.
2016-03-16 22:48:58 -07:00
Jakob Odersky 7eef2463ad [SPARK-13118][SQL] Expression encoding for optional synthetic classes
## What changes were proposed in this pull request?

Fix expression generation for optional types.
Standard Java reflection causes issues when dealing with synthetic Scala objects (things that do not map to Java and thus contain a dollar sign in their name). This patch introduces Scala reflection in such cases.

This patch also adds a regression test for Dataset's handling of classes defined in package objects (which was the initial purpose of this PR).

## How was this patch tested?
A new test in ExpressionEncoderSuite that tests optional inner classes and a regression test for Dataset's handling of package objects.

Author: Jakob Odersky <jakob@odersky.com>

Closes #11708 from jodersky/SPARK-13118-package-objects.
2016-03-16 21:53:16 -07:00
Davies Liu c100d31ddc [SPARK-13873] [SQL] Avoid copy of UnsafeRow when there is no join in whole stage codegen
## What changes were proposed in this pull request?

We need to copy the UnsafeRow since a Join could produce multiple rows from single input rows. We could avoid that if there is no join (or the join will not produce multiple rows) inside WholeStageCodegen.

Updated the benchmark for `collect`, we could see 20-30% speedup.

## How was this patch tested?

existing unit tests.

Author: Davies Liu <davies@databricks.com>

Closes #11740 from davies/avoid_copy2.
2016-03-16 21:46:04 -07:00
hyukjinkwon 917f4000b4 [SPARK-13719][SQL] Parse JSON rows having an array type and a struct type in the same fieild
## What changes were proposed in this pull request?

This https://github.com/apache/spark/pull/2400 added the support to parse JSON rows wrapped with an array. However, this throws an exception when the given data contains array data and struct data in the same field as below:

```json
{"a": {"b": 1}}
{"a": []}
```

and the schema is given as below:

```scala
val schema =
  StructType(
    StructField("a", StructType(
      StructField("b", StringType) :: Nil
    )) :: Nil)
```

- **Before**

```scala
sqlContext.read.schema(schema).json(path).show()
```

```scala
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 0.0 failed 4 times, most recent failure: Lost task 7.3 in stage 0.0 (TID 10, 192.168.1.170): java.lang.ClassCastException: org.apache.spark.sql.types.GenericArrayData cannot be cast to org.apache.spark.sql.catalyst.InternalRow
	at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getStruct(rows.scala:50)
	at org.apache.spark.sql.catalyst.expressions.GenericMutableRow.getStruct(rows.scala:247)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown Source)
...
```

- **After**

```scala
sqlContext.read.schema(schema).json(path).show()
```

```bash
+----+
|   a|
+----+
| [1]|
|null|
+----+
```

For other data types, in this case it converts the given values are `null` but only this case emits an exception.

This PR makes the support for wrapped rows applied only at the top level.

## How was this patch tested?

Unit tests were used and `./dev/run_tests` for code style tests.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #11752 from HyukjinKwon/SPARK-3308-follow-up.
2016-03-16 18:20:30 -07:00
Jakob Odersky d4d84936fb [SPARK-11011][SQL] Narrow type of UDT serialization
## What changes were proposed in this pull request?

Narrow down the parameter type of `UserDefinedType#serialize()`. Currently, the parameter type is `Any`, however it would logically make more sense to narrow it down to the type of the actual user defined type.

## How was this patch tested?

Existing tests were successfully run on local machine.

Author: Jakob Odersky <jakob@odersky.com>

Closes #11379 from jodersky/SPARK-11011-udt-types.
2016-03-16 16:59:36 -07:00
Sameer Agarwal b90c0206fa [SPARK-13922][SQL] Filter rows with null attributes in vectorized parquet reader
# What changes were proposed in this pull request?

It's common for many SQL operators to not care about reading `null` values for correctness. Currently, this is achieved by performing `isNotNull` checks (for all relevant columns) on a per-row basis. Pushing these null filters in the vectorized parquet reader should bring considerable benefits (especially for cases when the underlying data doesn't contain any nulls or contains all nulls).

## How was this patch tested?

        Intel(R) Core(TM) i7-4960HQ CPU  2.60GHz
        String with Nulls Scan (0%):        Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
        -------------------------------------------------------------------------------------------
        SQL Parquet Vectorized                   1229 / 1648          8.5         117.2       1.0X
        PR Vectorized                             833 /  846         12.6          79.4       1.5X
        PR Vectorized (Null Filtering)            732 /  782         14.3          69.8       1.7X

        Intel(R) Core(TM) i7-4960HQ CPU  2.60GHz
        String with Nulls Scan (50%):       Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
        -------------------------------------------------------------------------------------------
        SQL Parquet Vectorized                    995 / 1053         10.5          94.9       1.0X
        PR Vectorized                             732 /  772         14.3          69.8       1.4X
        PR Vectorized (Null Filtering)            725 /  790         14.5          69.1       1.4X

        Intel(R) Core(TM) i7-4960HQ CPU  2.60GHz
        String with Nulls Scan (95%):       Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
        -------------------------------------------------------------------------------------------
        SQL Parquet Vectorized                    326 /  333         32.2          31.1       1.0X
        PR Vectorized                             190 /  200         55.1          18.2       1.7X
        PR Vectorized (Null Filtering)            168 /  172         62.2          16.1       1.9X

Author: Sameer Agarwal <sameer@databricks.com>

Closes #11749 from sameeragarwal/perf-testing.
2016-03-16 16:25:40 -07:00
Cheng Hao d9670f8473 [SPARK-13894][SQL] SqlContext.range return type from DataFrame to DataSet
## What changes were proposed in this pull request?
https://issues.apache.org/jira/browse/SPARK-13894
Change the return type of the `SQLContext.range` API from `DataFrame` to `Dataset`.

## How was this patch tested?
No additional unit test required.

Author: Cheng Hao <hao.cheng@intel.com>

Closes #11730 from chenghao-intel/range.
2016-03-16 11:20:15 -07:00
Sean Owen 3b461d9ecd [SPARK-13823][SPARK-13397][SPARK-13395][CORE] More warnings, StandardCharset follow up
## What changes were proposed in this pull request?

Follow up to https://github.com/apache/spark/pull/11657

- Also update `String.getBytes("UTF-8")` to use `StandardCharsets.UTF_8`
- And fix one last new Coverity warning that turned up (use of unguarded `wait()` replaced by simpler/more robust `java.util.concurrent` classes in tests)
- And while we're here cleaning up Coverity warnings, just fix about 15 more build warnings

## How was this patch tested?

Jenkins tests

Author: Sean Owen <sowen@cloudera.com>

Closes #11725 from srowen/SPARK-13823.2.
2016-03-16 09:36:34 +00:00
hyukjinkwon 92024797a4 [SPARK-13899][SQL] Produce InternalRow instead of external Row at CSV data source
## What changes were proposed in this pull request?

https://issues.apache.org/jira/browse/SPARK-13899

This PR makes CSV data source produce `InternalRow` instead of `Row`.

Basically, this resembles JSON data source. It uses the same codes for casting.

## How was this patch tested?

Unit tests were used within IDE and code style was checked by `./dev/run_tests`.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #11717 from HyukjinKwon/SPARK-13899.
2016-03-15 23:31:46 -07:00
Davies Liu 421f6c20e8 [SPARK-13917] [SQL] generate broadcast semi join
## What changes were proposed in this pull request?

This PR brings codegen support for broadcast left-semi join.

## How was this patch tested?

Existing tests. Added benchmark, the result show 7X speedup.

Author: Davies Liu <davies@databricks.com>

Closes #11742 from davies/gen_semi.
2016-03-15 22:17:04 -07:00
Davies Liu bbd887f53c [SPARK-13918][SQL] Merge SortMergeJoin and SortMergerOuterJoin
## What changes were proposed in this pull request?

This PR just move some code from SortMergeOuterJoin into SortMergeJoin.

This is for support codegen for outer join.

## How was this patch tested?

existing tests.

Author: Davies Liu <davies@databricks.com>

Closes #11743 from davies/gen_smjouter.
2016-03-15 19:58:49 -07:00
Reynold Xin 643649dcbf [SPARK-13895][SQL] DataFrameReader.text should return Dataset[String]
## What changes were proposed in this pull request?
This patch changes DataFrameReader.text()'s return type from DataFrame to Dataset[String].

Closes #11731.

## How was this patch tested?
Updated existing integration tests to reflect the change.

Author: Reynold Xin <rxin@databricks.com>

Closes #11739 from rxin/SPARK-13895.
2016-03-15 14:57:54 -07:00
Stavros Kontopoulos 50e3644d00 [SPARK-13896][SQL][STRING] Dataset.toJSON should return Dataset
## What changes were proposed in this pull request?
Change the return type of toJson in Dataset class
## How was this patch tested?
No additional unit test required.

Author: Stavros Kontopoulos <stavros.kontopoulos@typesafe.com>

Closes #11732 from skonto/fix_toJson.
2016-03-15 12:18:30 -07:00
Reynold Xin 5e6f2f4563 [SPARK-13893][SQL] Remove SQLContext.catalog/analyzer (internal method)
## What changes were proposed in this pull request?
Our internal code can go through SessionState.catalog and SessionState.analyzer. This brings two small benefits:
1. Reduces internal dependency on SQLContext.
2. Removes 2 public methods in Java (Java does not obey package private visibility).

More importantly, according to the design in SPARK-13485, we'd need to claim this catalog function for the user-facing public functions, rather than having an internal field.

## How was this patch tested?
Existing unit/integration test code.

Author: Reynold Xin <rxin@databricks.com>

Closes #11716 from rxin/SPARK-13893.
2016-03-15 10:12:32 -07:00
Xin Ren 10251a7457 [SPARK-13660][SQL][TESTS] ContinuousQuerySuite floods the logs with garbage
## What changes were proposed in this pull request?

Use method 'testQuietly' to avoid ContinuousQuerySuite flooding the console logs with garbage

Make ContinuousQuerySuite not output logs to the console. The logs will still output to unit-tests.log.

## How was this patch tested?

Just check Jenkins output.

Author: Xin Ren <iamshrek@126.com>

Closes #11703 from keypointt/SPARK-13660.
2016-03-15 01:02:28 -07:00
Reynold Xin 276c2d51a3 [SPARK-13890][SQL] Remove some internal classes' dependency on SQLContext
## What changes were proposed in this pull request?
In general it is better for internal classes to not depend on the external class (in this case SQLContext) to reduce coupling between user-facing APIs and the internal implementations. This patch removes SQLContext dependency from some internal classes such as SparkPlanner, SparkOptimizer.

As part of this patch, I also removed the following internal methods from SQLContext:
```
protected[sql] def functionRegistry: FunctionRegistry
protected[sql] def optimizer: Optimizer
protected[sql] def sqlParser: ParserInterface
protected[sql] def planner: SparkPlanner
protected[sql] def continuousQueryManager
protected[sql] def prepareForExecution: RuleExecutor[SparkPlan]
```

## How was this patch tested?
Existing unit/integration tests.

Author: Reynold Xin <rxin@databricks.com>

Closes #11712 from rxin/sqlContext-planner.
2016-03-14 23:58:57 -07:00
Dongjoon Hyun a51f877b5d [SPARK-13870][SQL] Add scalastyle escaping correctly in CVSSuite.scala
## What changes were proposed in this pull request?

When initial creating `CVSSuite.scala` in SPARK-12833, there was a typo on `scalastyle:on`: `scalstyle:on`. So, it turns off ScalaStyle checking for the rest of the file mistakenly. So, it can not find a violation on the code of `SPARK-12668` added recently. This issue fixes the existing escaping correctly and adds a new escaping for `SPARK-12668` code like the following.

```scala
   test("test aliases sep and encoding for delimiter and charset") {
+    // scalastyle:off
     val cars = sqlContext
...
       .load(testFile(carsFile8859))
+    // scalastyle:on
```
This will prevent future potential problems, too.

## How was this patch tested?

Pass the Jenkins test.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #11700 from dongjoon-hyun/SPARK-13870.
2016-03-14 23:23:05 -07:00
Davies Liu f72743d971 [SPARK-13353][SQL] fast serialization for collecting DataFrame/Dataset
## What changes were proposed in this pull request?

When we call DataFrame/Dataset.collect(), Java serializer (or Kryo Serializer) will be used to serialize the UnsafeRows in executor, then deserialize them into UnsafeRows in driver. Java serializer (and Kyro serializer) are slow on millions rows, because they try to find out the same rows, but usually there is no same rows.

This PR will serialize the UnsafeRows as byte array by packing them together, then Java serializer (or Kyro serializer) serialize the bytes very fast (there are fewer blocks and byte array are not compared by content).

The UnsafeRow format is highly compressible, the serialized bytes are also compressed (configurable by spark.io.compression.codec).

## How was this patch tested?

Existing unit tests.

Add a benchmark for collect, before this patch:
```
Intel(R) Core(TM) i7-4558U CPU  2.80GHz
collect:                        Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
-------------------------------------------------------------------------------------------
collect 1 million                      3991 / 4311          0.3        3805.7       1.0X
collect 2 millions                  10083 / 10637          0.1        9616.0       0.4X
collect 4 millions                  29551 / 30072          0.0       28182.3       0.1X
```

```
Intel(R) Core(TM) i7-4558U CPU  2.80GHz
collect:                        Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
-------------------------------------------------------------------------------------------
collect 1 million                        775 / 1170          1.4         738.9       1.0X
collect 2 millions                     1153 / 1758          0.9        1099.3       0.7X
collect 4 millions                     4451 / 5124          0.2        4244.9       0.2X
```

We can see about 5-7X speedup.

Author: Davies Liu <davies@databricks.com>

Closes #11664 from davies/serialize_row.
2016-03-14 22:32:22 -07:00
Shixiong Zhu b5e3bd87f5 [SPARK-13791][SQL] Add MetadataLog and HDFSMetadataLog
## What changes were proposed in this pull request?

- Add a MetadataLog interface for  metadata reliably storage.
- Add HDFSMetadataLog as a MetadataLog implementation based on HDFS.
- Update FileStreamSource to use HDFSMetadataLog instead of managing metadata by itself.

## How was this patch tested?

unit tests

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #11625 from zsxwing/metadata-log.
2016-03-14 19:28:13 -07:00
Reynold Xin 4bf4609795 [SPARK-13882][SQL] Remove org.apache.spark.sql.execution.local
## What changes were proposed in this pull request?
We introduced some local operators in org.apache.spark.sql.execution.local package but never fully wired the engine to actually use these. We still plan to implement a full local mode, but it's probably going to be fairly different from what the current iterator-based local mode would look like. Based on what we know right now, we might want a push-based columnar version of these operators.

Let's just remove them for now, and we can always re-introduced them in the future by looking at branch-1.6.

## How was this patch tested?
This is simply dead code removal.

Author: Reynold Xin <rxin@databricks.com>

Closes #11705 from rxin/SPARK-13882.
2016-03-14 19:22:11 -07:00
Michael Armbrust 17eec0a71b [SPARK-13664][SQL] Add a strategy for planning partitioned and bucketed scans of files
This PR adds a new strategy, `FileSourceStrategy`, that can be used for planning scans of collections of files that might be partitioned or bucketed.

Compared with the existing planning logic in `DataSourceStrategy` this version has the following desirable properties:
 - It removes the need to have `RDD`, `broadcastedHadoopConf` and other distributed concerns  in the public API of `org.apache.spark.sql.sources.FileFormat`
 - Partition column appending is delegated to the format to avoid an extra copy / devectorization when appending partition columns
 - It minimizes the amount of data that is shipped to each executor (i.e. it does not send the whole list of files to every worker in the form of a hadoop conf)
 - it natively supports bucketing files into partitions, and thus does not require coalescing / creating a `UnionRDD` with the correct partitioning.
 - Small files are automatically coalesced into fewer tasks using an approximate bin-packing algorithm.

Currently only a testing source is planned / tested using this strategy.  In follow-up PRs we will port the existing formats to this API.

A stub for `FileScanRDD` is also added, but most methods remain unimplemented.

Other minor cleanups:
 - partition pruning is pushed into `FileCatalog` so both the new and old code paths can use this logic.  This will also allow future implementations to use indexes or other tricks (i.e. a MySQL metastore)
 - The partitions from the `FileCatalog` now propagate information about file sizes all the way up to the planner so we can intelligently spread files out.
 - `Array` -> `Seq` in some internal APIs to avoid unnecessary `toArray` calls
 - Rename `Partition` to `PartitionDirectory` to differentiate partitions used earlier in pruning from those where we have already enumerated the files and their sizes.

Author: Michael Armbrust <michael@databricks.com>

Closes #11646 from marmbrus/fileStrategy.
2016-03-14 19:21:12 -07:00
Andrew Or 9a1680c2c8 [SPARK-13139][SQL] Follow-ups to #11573
Addressing outstanding comments in #11573.

Jenkins, new test case in `DDLCommandSuite`

Author: Andrew Or <andrew@databricks.com>

Closes #11667 from andrewor14/ddl-parser-followups.
2016-03-14 09:59:22 -07:00
Yin Huai 250832c733 [SPARK-13207][SQL] Make partitioning discovery ignore _SUCCESS files.
If a _SUCCESS appears in the inner partitioning dir, partition discovery will treat that _SUCCESS file as a data file. Then, partition discovery will fail because it finds that the dir structure is not valid. We should ignore those `_SUCCESS` files.

In future, it is better to ignore all files/dirs starting with `_` or `.`. This PR does not make this change. I am thinking about making this change simple, so we can consider of getting it in branch 1.6.

To ignore all files/dirs starting with `_` or `, the main change is to let ParquetRelation have another way to get metadata files. Right now, it relies on FileStatusCache's cachedLeafStatuses, which returns file statuses of both metadata files (e.g. metadata files used by parquet) and data files, which requires more changes.

https://issues.apache.org/jira/browse/SPARK-13207

Author: Yin Huai <yhuai@databricks.com>

Closes #11088 from yhuai/SPARK-13207.
2016-03-14 09:03:13 -07:00
Dongjoon Hyun acdf219703 [MINOR][DOCS] Fix more typos in comments/strings.
## What changes were proposed in this pull request?

This PR fixes 135 typos over 107 files:
* 121 typos in comments
* 11 typos in testcase name
* 3 typos in log messages

## How was this patch tested?

Manual.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #11689 from dongjoon-hyun/fix_more_typos.
2016-03-14 09:07:39 +00:00
Sean Owen 1840852841 [SPARK-13823][CORE][STREAMING][SQL] Always specify Charset in String <-> byte[] conversions (and remaining Coverity items)
## What changes were proposed in this pull request?

- Fixes calls to `new String(byte[])` or `String.getBytes()` that rely on platform default encoding, to use UTF-8
- Same for `InputStreamReader` and `OutputStreamWriter` constructors
- Standardizes on UTF-8 everywhere
- Standardizes specifying the encoding with `StandardCharsets.UTF-8`, not the Guava constant or "UTF-8" (which means handling `UnuspportedEncodingException`)
- (also addresses the other remaining Coverity scan issues, which are pretty trivial; these are separated into commit 1deecd8d9c )

## How was this patch tested?

Jenkins tests

Author: Sean Owen <sowen@cloudera.com>

Closes #11657 from srowen/SPARK-13823.
2016-03-13 21:03:49 -07:00
Cheng Lian c079420d7c [SPARK-13841][SQL] Removes Dataset.collectRows()/takeRows()
## What changes were proposed in this pull request?

This PR removes two methods, `collectRows()` and `takeRows()`, from `Dataset[T]`. These methods were added in PR #11443, and were later considered not useful.

## How was this patch tested?

Existing tests should do the work.

Author: Cheng Lian <lian@databricks.com>

Closes #11678 from liancheng/remove-collect-rows-and-take-rows.
2016-03-13 12:02:52 +08:00
Cheng Lian 4eace4d384 [SPARK-13828][SQL] Bring back stack trace of AnalysisException thrown from QueryExecution.assertAnalyzed
PR #11443 added an extra `plan: Option[LogicalPlan]` argument to `AnalysisException` and attached partially analyzed plan to thrown `AnalysisException` in `QueryExecution.assertAnalyzed()`.  However, the original stack trace wasn't properly inherited.  This PR fixes this issue by inheriting the stack trace.

A test case is added to verify that the first entry of `AnalysisException` stack trace isn't from `QueryExecution`.

Author: Cheng Lian <lian@databricks.com>

Closes #11677 from liancheng/analysis-exception-stacktrace.
2016-03-12 11:25:15 -08:00
Davies Liu ba8c86d06f [SPARK-13671] [SPARK-13311] [SQL] Use different physical plans for RDD and data sources
## What changes were proposed in this pull request?

This PR split the PhysicalRDD into two classes, PhysicalRDD and PhysicalScan. PhysicalRDD is used for DataFrames that is created from existing RDD. PhysicalScan is used for DataFrame that is created from data sources. This enable use to apply different optimization on both of them.

Also fix the problem for sameResult() on two DataSourceScan.

Also fix the equality check to toString for `In`. It's better to use Seq there, but we can't break this public API (sad).

## How was this patch tested?

Existing tests. Manually tested with TPCDS query Q59 and Q64, all those duplicated exchanges can be re-used now, also saw there are 40+% performance improvement (saving half of the scan).

Author: Davies Liu <davies@databricks.com>

Closes #11514 from davies/existing_rdd.
2016-03-12 00:48:36 -08:00
Andrew Or 66d9d0edfe [SPARK-13139][SQL] Parse Hive DDL commands ourselves
## What changes were proposed in this pull request?

This patch is ported over from viirya's changes in #11048. Currently for most DDLs we just pass the query text directly to Hive. Instead, we should parse these commands ourselves and in the future (not part of this patch) use the `HiveCatalog` to process these DDLs. This is a pretext to merging `SQLContext` and `HiveContext`.

Note: As of this patch we still pass the query text to Hive. The difference is that we now parse the commands ourselves so in the future we can just use our own catalog.

## How was this patch tested?

Jenkins, new `DDLCommandSuite`, which comprises of about 40% of the changes here.

Author: Andrew Or <andrew@databricks.com>

Closes #11573 from andrewor14/parser-plus-plus.
2016-03-11 15:13:48 -08:00
Cheng Lian 6d37e1eb90 [SPARK-13817][BUILD][SQL] Re-enable MiMA and removes object DataFrame
## What changes were proposed in this pull request?

PR #11443 temporarily disabled MiMA check, this PR re-enables it.

One extra change is that `object DataFrame` is also removed. The only purpose of introducing `object DataFrame` was to use it as an internal factory for creating `Dataset[Row]`. By replacing this internal factory with `Dataset.newDataFrame`, both `DataFrame` and `DataFrame$` are entirely removed from the API, so that we can simply put a `MissingClassProblem` filter in `MimaExcludes.scala` for most DataFrame API  changes.

## How was this patch tested?

Tested by MiMA check triggered by Jenkins.

Author: Cheng Lian <lian@databricks.com>

Closes #11656 from liancheng/re-enable-mima.
2016-03-11 22:17:50 +08:00
Cheng Lian 1d542785b9 [SPARK-13244][SQL] Migrates DataFrame to Dataset
## What changes were proposed in this pull request?

This PR unifies DataFrame and Dataset by migrating existing DataFrame operations to Dataset and make `DataFrame` a type alias of `Dataset[Row]`.

Most Scala code changes are source compatible, but Java API is broken as Java knows nothing about Scala type alias (mostly replacing `DataFrame` with `Dataset<Row>`).

There are several noticeable API changes related to those returning arrays:

1.  `collect`/`take`

    -   Old APIs in class `DataFrame`:

        ```scala
        def collect(): Array[Row]
        def take(n: Int): Array[Row]
        ```

    -   New APIs in class `Dataset[T]`:

        ```scala
        def collect(): Array[T]
        def take(n: Int): Array[T]

        def collectRows(): Array[Row]
        def takeRows(n: Int): Array[Row]
        ```

    Two specialized methods `collectRows` and `takeRows` are added because Java doesn't support returning generic arrays. Thus, for example, `DataFrame.collect(): Array[T]` actually returns `Object` instead of `Array<T>` from Java side.

    Normally, Java users may fall back to `collectAsList` and `takeAsList`.  The two new specialized versions are added to avoid performance regression in ML related code (but maybe I'm wrong and they are not necessary here).

1.  `randomSplit`

    -   Old APIs in class `DataFrame`:

        ```scala
        def randomSplit(weights: Array[Double], seed: Long): Array[DataFrame]
        def randomSplit(weights: Array[Double]): Array[DataFrame]
        ```

    -   New APIs in class `Dataset[T]`:

        ```scala
        def randomSplit(weights: Array[Double], seed: Long): Array[Dataset[T]]
        def randomSplit(weights: Array[Double]): Array[Dataset[T]]
        ```

    Similar problem as above, but hasn't been addressed for Java API yet.  We can probably add `randomSplitAsList` to fix this one.

1.  `groupBy`

    Some original `DataFrame.groupBy` methods have conflicting signature with original `Dataset.groupBy` methods.  To distinguish these two, typed `Dataset.groupBy` methods are renamed to `groupByKey`.

Other noticeable changes:

1.  Dataset always do eager analysis now

    We used to support disabling DataFrame eager analysis to help reporting partially analyzed malformed logical plan on analysis failure.  However, Dataset encoders requires eager analysi during Dataset construction.  To preserve the error reporting feature, `AnalysisException` now takes an extra `Option[LogicalPlan]` argument to hold the partially analyzed plan, so that we can check the plan tree when reporting test failures.  This plan is passed by `QueryExecution.assertAnalyzed`.

## How was this patch tested?

Existing tests do the work.

## TODO

- [ ] Fix all tests
- [ ] Re-enable MiMA check
- [ ] Update ScalaDoc (`since`, `group`, and example code)

Author: Cheng Lian <lian@databricks.com>
Author: Yin Huai <yhuai@databricks.com>
Author: Wenchen Fan <wenchen@databricks.com>
Author: Cheng Lian <liancheng@users.noreply.github.com>

Closes #11443 from liancheng/ds-to-df.
2016-03-10 17:00:17 -08:00
Dongjoon Hyun 91fed8e9c5 [SPARK-3854][BUILD] Scala style: require spaces before {.
## What changes were proposed in this pull request?

Since the opening curly brace, '{', has many usages as discussed in [SPARK-3854](https://issues.apache.org/jira/browse/SPARK-3854), this PR adds a ScalaStyle rule to prevent '){' pattern  for the following majority pattern and fixes the code accordingly. If we enforce this in ScalaStyle from now, it will improve the Scala code quality and reduce review time.
```
// Correct:
if (true) {
  println("Wow!")
}

// Incorrect:
if (true){
   println("Wow!")
}
```
IntelliJ also shows new warnings based on this.

## How was this patch tested?

Pass the Jenkins ScalaStyle test.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #11637 from dongjoon-hyun/SPARK-3854.
2016-03-10 15:57:22 -08:00
Tathagata Das 3d2b6f56e3 [SQL][TEST] Increased timeouts to reduce flakiness in ContinuousQueryManagerSuite
## What changes were proposed in this pull request?

ContinuousQueryManager is sometimes flaky on Jenkins. I could not reproduce it on my machine, so I guess it about the waiting times which causes problems if Jenkins is loaded. I have increased the wait time in the hope that it will be less flaky.

## How was this patch tested?

I reran the unit test many times on a loop in my machine. I am going to run it a few time in Jenkins, that's the real test.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #11638 from tdas/cqm-flaky-test.
2016-03-10 14:38:19 -08:00
Reynold Xin 8a3acb792d [SPARK-13794][SQL] Rename DataFrameWriter.stream() DataFrameWriter.startStream()
## What changes were proposed in this pull request?
The new name makes it more obvious with the verb "start" that we are actually starting some execution.

## How was this patch tested?
This is just a rename. Existing unit tests should cover it.

Author: Reynold Xin <rxin@databricks.com>

Closes #11627 from rxin/SPARK-13794.
2016-03-09 21:04:56 -08:00
hyukjinkwon aa0eba2c35 [SPARK-13766][SQL] Consistent file extensions for files written by internal data sources
## What changes were proposed in this pull request?

https://issues.apache.org/jira/browse/SPARK-13766
This PR makes the file extensions (written by internal datasource) consistent.

**Before**

- TEXT, CSV and JSON
```
[.COMPRESSION_CODEC_NAME]
```

- Parquet
```
[.COMPRESSION_CODEC_NAME].parquet
```

- ORC
```
.orc
```

**After**

- TEXT, CSV and JSON
```
.txt[.COMPRESSION_CODEC_NAME]
.csv[.COMPRESSION_CODEC_NAME]
.json[.COMPRESSION_CODEC_NAME]
```

- Parquet
```
[.COMPRESSION_CODEC_NAME].parquet
```

- ORC
```
[.COMPRESSION_CODEC_NAME].orc
```

When the compression codec is set,
- For Parquet and ORC, each still stays in Parquet and ORC format but just have compressed data internally. So, I think it is okay to name `.parquet` and `.orc` at the end.

- For Text, CSV and JSON, each does not stays in each format but it has different data format according to compression codec. So, each has the names `.json`, `.csv` and `.txt` before the compression extension.

## How was this patch tested?

Unit tests are used and `./dev/run_tests` for coding style tests.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #11604 from HyukjinKwon/SPARK-13766.
2016-03-09 19:12:46 -08:00
Andrew Or 37fcda3e6c [SPARK-13747][SQL] Fix concurrent query with fork-join pool
## What changes were proposed in this pull request?

Fix this use case, which was already fixed in SPARK-10548 in 1.6 but was broken in master due to #9264:

```
(1 to 100).par.foreach { _ => sc.parallelize(1 to 5).map { i => (i, i) }.toDF("a", "b").count() }
```

This threw `IllegalArgumentException` consistently before this patch. For more detail, see the JIRA.

## How was this patch tested?

New test in `SQLExecutionSuite`.

Author: Andrew Or <andrew@databricks.com>

Closes #11586 from andrewor14/fix-concurrent-sql.
2016-03-09 17:34:28 -08:00
Davies Liu 3dc9ae2e15 [SPARK-13523] [SQL] Reuse exchanges in a query
## What changes were proposed in this pull request?

It’s possible to have common parts in a query, for example, self join, it will be good to avoid the duplicated part to same CPUs and memory (Broadcast or cache).

Exchange will materialize the underlying RDD by shuffle or collect, it’s a great point to check duplicates and reuse them. Duplicated exchanges means they generate exactly the same result inside a query.

In order to find out the duplicated exchanges, we should be able to compare SparkPlan to check that they have same results or not. We already have that for LogicalPlan, so we should move that into QueryPlan to make it available for SparkPlan.

Once we can find the duplicated exchanges, we should replace all of them with same SparkPlan object (could be wrapped by ReusedExchage for explain), then the plan tree become a DAG. Since all the planner only work with tree, so this rule should be the last one for the entire planning.

After the rule, the plan will looks like:

```
WholeStageCodegen
:  +- Project [id#0L]
:     +- BroadcastHashJoin [id#0L], [id#2L], Inner, BuildRight, None
:        :- Project [id#0L]
:        :  +- BroadcastHashJoin [id#0L], [id#1L], Inner, BuildRight, None
:        :     :- Range 0, 1, 4, 1024, [id#0L]
:        :     +- INPUT
:        +- INPUT
:- BroadcastExchange HashedRelationBroadcastMode(true,List(id#1L),List(id#1L))
:  +- WholeStageCodegen
:     :  +- Range 0, 1, 4, 1024, [id#1L]
+- ReusedExchange [id#2L], BroadcastExchange HashedRelationBroadcastMode(true,List(id#1L),List(id#1L))
```

![bjoin](https://cloud.githubusercontent.com/assets/40902/13414787/209e8c5c-df0a-11e5-8a0f-edff69d89e83.png)

For three ways SortMergeJoin,
```
== Physical Plan ==
WholeStageCodegen
:  +- Project [id#0L]
:     +- SortMergeJoin [id#0L], [id#4L], None
:        :- INPUT
:        +- INPUT
:- WholeStageCodegen
:  :  +- Project [id#0L]
:  :     +- SortMergeJoin [id#0L], [id#3L], None
:  :        :- INPUT
:  :        +- INPUT
:  :- WholeStageCodegen
:  :  :  +- Sort [id#0L ASC], false, 0
:  :  :     +- INPUT
:  :  +- Exchange hashpartitioning(id#0L, 200), None
:  :     +- WholeStageCodegen
:  :        :  +- Range 0, 1, 4, 33554432, [id#0L]
:  +- WholeStageCodegen
:     :  +- Sort [id#3L ASC], false, 0
:     :     +- INPUT
:     +- ReusedExchange [id#3L], Exchange hashpartitioning(id#0L, 200), None
+- WholeStageCodegen
   :  +- Sort [id#4L ASC], false, 0
   :     +- INPUT
   +- ReusedExchange [id#4L], Exchange hashpartitioning(id#0L, 200), None
```
![sjoin](https://cloud.githubusercontent.com/assets/40902/13414790/27aea61c-df0a-11e5-8cbf-fbc985c31d95.png)

If the same ShuffleExchange or BroadcastExchange, execute()/executeBroadcast() will be called by different parents, they should cached the RDD/Broadcast, return the same one for all the parents.

## How was this patch tested?

Added some unit tests for this.  Had done some manual tests on TPCDS query Q59 and Q64, we can see some exchanges are re-used (this requires a change in PhysicalRDD to for sameResult, is be done in #11514 ).

Author: Davies Liu <davies@databricks.com>

Closes #11403 from davies/dedup.
2016-03-09 12:04:29 -08:00
Davies Liu 7791d0c3a9 Revert "[SPARK-13668][SQL] Reorder filter/join predicates to short-circuit isNotNull checks"
This reverts commit e430614eae.
2016-03-09 10:05:57 -08:00
Dongjoon Hyun c3689bc24e [SPARK-13702][CORE][SQL][MLLIB] Use diamond operator for generic instance creation in Java code.
## What changes were proposed in this pull request?

In order to make `docs/examples` (and other related code) more simple/readable/user-friendly, this PR replaces existing codes like the followings by using `diamond` operator.

```
-    final ArrayList<Product2<Object, Object>> dataToWrite =
-      new ArrayList<Product2<Object, Object>>();
+    final ArrayList<Product2<Object, Object>> dataToWrite = new ArrayList<>();
```

Java 7 or higher supports **diamond** operator which replaces the type arguments required to invoke the constructor of a generic class with an empty set of type parameters (<>). Currently, Spark Java code use mixed usage of this.

## How was this patch tested?

Manual.
Pass the existing tests.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #11541 from dongjoon-hyun/SPARK-13702.
2016-03-09 10:31:26 +00:00
Dongjoon Hyun f3201aeeb0 [SPARK-13692][CORE][SQL] Fix trivial Coverity/Checkstyle defects
## What changes were proposed in this pull request?

This issue fixes the following potential bugs and Java coding style detected by Coverity and Checkstyle.

- Implement both null and type checking in equals functions.
- Fix wrong type casting logic in SimpleJavaBean2.equals.
- Add `implement Cloneable` to `UTF8String` and `SortedIterator`.
- Remove dereferencing before null check in `AbstractBytesToBytesMapSuite`.
- Fix coding style: Add '{}' to single `for` statement in mllib examples.
- Remove unused imports in `ColumnarBatch` and `JavaKinesisStreamSuite`.
- Remove unused fields in `ChunkFetchIntegrationSuite`.
- Add `stop()` to prevent resource leak.

Please note that the last two checkstyle errors exist on newly added commits after [SPARK-13583](https://issues.apache.org/jira/browse/SPARK-13583).

## How was this patch tested?

manual via `./dev/lint-java` and Coverity site.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #11530 from dongjoon-hyun/SPARK-13692.
2016-03-09 10:12:23 +00:00
Jakob Odersky 035d3acdf3 [SPARK-7286][SQL] Deprecate !== in favour of =!=
This PR replaces #9925 which had issues with CI. **Please see the original PR for any previous discussions.**

## What changes were proposed in this pull request?
Deprecate the SparkSQL column operator !== and use =!= as an alternative.
Fixes subtle issues related to operator precedence (basically, !== does not have the same priority as its logical negation, ===).

## How was this patch tested?
All currently existing tests.

Author: Jakob Odersky <jodersky@gmail.com>

Closes #11588 from jodersky/SPARK-7286.
2016-03-08 18:11:09 -08:00
Hossein cc4ab37ee7 [SPARK-13754] Keep old data source name for backwards compatibility
## Motivation
CSV data source was contributed by Databricks. It is the inlined version of https://github.com/databricks/spark-csv. The data source name was `com.databricks.spark.csv`. As a result there are many tables created on older versions of spark with that name as the source. For backwards compatibility we should keep the old name.

## Proposed changes
`com.databricks.spark.csv` was added to list of `backwardCompatibilityMap` in `ResolvedDataSource.scala`

## Tests
A unit test was added to `CSVSuite` to parse a csv file using the old name.

Author: Hossein <hossein@databricks.com>

Closes #11589 from falaki/SPARK-13754.
2016-03-08 17:45:15 -08:00
Davies Liu 982ef2b87e [SPARK-13750][SQL] fix sizeInBytes of HadoopFsRelation
## What changes were proposed in this pull request?

This PR fix the sizeInBytes of HadoopFsRelation.

## How was this patch tested?

Added regression test for that.

Author: Davies Liu <davies@databricks.com>

Closes #11590 from davies/fix_sizeInBytes.
2016-03-08 17:42:52 -08:00
Sameer Agarwal e430614eae [SPARK-13668][SQL] Reorder filter/join predicates to short-circuit isNotNull checks
## What changes were proposed in this pull request?

If a filter predicate or a join condition consists of `IsNotNull` checks, we should reorder these checks such that these non-nullability checks are evaluated before the rest of the predicates.

For e.g., if a filter predicate is of the form `a > 5 && isNotNull(b)`, we should rewrite this as `isNotNull(b) && a > 5` during physical plan generation.

## How was this patch tested?

new unit tests that verify the physical plan for both filters and joins in `ReorderedPredicateSuite`

Author: Sameer Agarwal <sameer@databricks.com>

Closes #11511 from sameeragarwal/reorder-isnotnull.
2016-03-08 15:40:45 -08:00
Michael Armbrust 1e28840594 [SPARK-13738][SQL] Cleanup Data Source resolution
Follow-up to #11509, that simply refactors the interface that we use when resolving a pluggable `DataSource`.
 - Multiple functions share the same set of arguments so we make this a case class, called `DataSource`.  Actual resolution is now done by calling a function on this class.
 - Instead of having multiple methods named `apply` (some of which do writing some of which do reading) we now explicitly have `resolveRelation()` and `write(mode, df)`.
 - Get rid of `Array[String]` since this is an internal API and was forcing us to awkwardly call `toArray` in a bunch of places.

Author: Michael Armbrust <michael@databricks.com>

Closes #11572 from marmbrus/dataSourceResolution.
2016-03-08 15:19:26 -08:00
Michael Armbrust e720dda42e [SPARK-13665][SQL] Separate the concerns of HadoopFsRelation
`HadoopFsRelation` is used for reading most files into Spark SQL.  However today this class mixes the concerns of file management, schema reconciliation, scan building, bucketing, partitioning, and writing data.  As a result, many data sources are forced to reimplement the same functionality and the various layers have accumulated a fair bit of inefficiency.  This PR is a first cut at separating this into several components / interfaces that are each described below.  Additionally, all implementations inside of Spark (parquet, csv, json, text, orc, svmlib) have been ported to the new API `FileFormat`.  External libraries, such as spark-avro will also need to be ported to work with Spark 2.0.

### HadoopFsRelation
A simple `case class` that acts as a container for all of the metadata required to read from a datasource.  All discovery, resolution and merging logic for schemas and partitions has been removed.  This an internal representation that no longer needs to be exposed to developers.

```scala
case class HadoopFsRelation(
    sqlContext: SQLContext,
    location: FileCatalog,
    partitionSchema: StructType,
    dataSchema: StructType,
    bucketSpec: Option[BucketSpec],
    fileFormat: FileFormat,
    options: Map[String, String]) extends BaseRelation
```

### FileFormat
The primary interface that will be implemented by each different format including external libraries.  Implementors are responsible for reading a given format and converting it into `InternalRow` as well as writing out an `InternalRow`.  A format can optionally return a schema that is inferred from a set of files.

```scala
trait FileFormat {
  def inferSchema(
      sqlContext: SQLContext,
      options: Map[String, String],
      files: Seq[FileStatus]): Option[StructType]

  def prepareWrite(
      sqlContext: SQLContext,
      job: Job,
      options: Map[String, String],
      dataSchema: StructType): OutputWriterFactory

  def buildInternalScan(
      sqlContext: SQLContext,
      dataSchema: StructType,
      requiredColumns: Array[String],
      filters: Array[Filter],
      bucketSet: Option[BitSet],
      inputFiles: Array[FileStatus],
      broadcastedConf: Broadcast[SerializableConfiguration],
      options: Map[String, String]): RDD[InternalRow]
}
```

The current interface is based on what was required to get all the tests passing again, but still mixes a couple of concerns (i.e. `bucketSet` is passed down to the scan instead of being resolved by the planner).  Additionally, scans are still returning `RDD`s instead of iterators for single files.  In a future PR, bucketing should be removed from this interface and the scan should be isolated to a single file.

### FileCatalog
This interface is used to list the files that make up a given relation, as well as handle directory based partitioning.

```scala
trait FileCatalog {
  def paths: Seq[Path]
  def partitionSpec(schema: Option[StructType]): PartitionSpec
  def allFiles(): Seq[FileStatus]
  def getStatus(path: Path): Array[FileStatus]
  def refresh(): Unit
}
```

Currently there are two implementations:
 - `HDFSFileCatalog` - based on code from the old `HadoopFsRelation`.  Infers partitioning by recursive listing and caches this data for performance
 - `HiveFileCatalog` - based on the above, but it uses the partition spec from the Hive Metastore.

### ResolvedDataSource
Produces a logical plan given the following description of a Data Source (which can come from DataFrameReader or a metastore):
 - `paths: Seq[String] = Nil`
 - `userSpecifiedSchema: Option[StructType] = None`
 - `partitionColumns: Array[String] = Array.empty`
 - `bucketSpec: Option[BucketSpec] = None`
 - `provider: String`
 - `options: Map[String, String]`

This class is responsible for deciding which of the Data Source APIs a given provider is using (including the non-file based ones).  All reconciliation of partitions, buckets, schema from metastores or inference is done here.

### DataSourceAnalysis / DataSourceStrategy
Responsible for analyzing and planning reading/writing of data using any of the Data Source APIs, including:
 - pruning the files from partitions that will be read based on filters.
 - appending partition columns*
 - applying additional filters when a data source can not evaluate them internally.
 - constructing an RDD that is bucketed correctly when required*
 - sanity checking schema match-up and other analysis when writing.

*In the future we should do that following:
 - Break out file handling into its own Strategy as its sufficiently complex / isolated.
 - Push the appending of partition columns down in to `FileFormat` to avoid an extra copy / unvectorization.
 - Use a custom RDD for scans instead of `SQLNewNewHadoopRDD2`

Author: Michael Armbrust <michael@databricks.com>
Author: Wenchen Fan <wenchen@databricks.com>

Closes #11509 from marmbrus/fileDataSource.
2016-03-07 15:15:10 -08:00