Author: Reynold Xin <rxin@databricks.com>
Closes#6541 from rxin/trailing-whitespace-on and squashes the following commits:
f72ebe4 [Reynold Xin] [SPARK-3850] Turn style checker on for trailing whitespaces.
Author: Reynold Xin <rxin@databricks.com>
Closes#6535 from rxin/whitespace-sql and squashes the following commits:
de50316 [Reynold Xin] [SPARK-3850] Trim trailing spaces for SQL.
Author: Reynold Xin <rxin@databricks.com>
This patch had conflicts when merged, resolved by
Committer: Reynold Xin <rxin@databricks.com>
Closes#6527 from rxin/covariant-equals and squashes the following commits:
e7d7784 [Reynold Xin] [SPARK-7975] Enforce CovariantEqualsChecker
Author: Cheng Lian <lian@databricks.com>
Closes#6521 from liancheng/classloader-comment-fix and squashes the following commits:
fc09606 [Cheng Lian] Addresses @srowen's comment
59945c5 [Cheng Lian] Fixes a minor comment mistake in IsolatedClientLoader
I went through all the JavaDocs and tightened up visibility.
Author: Reynold Xin <rxin@databricks.com>
Closes#6526 from rxin/sql-1.4-visibility-for-docs and squashes the following commits:
bc37d1e [Reynold Xin] Tighten up visibility for JavaDoc.
Right now `unit-tests.log` are not of much value because we can't tell where the test boundaries are easily. This patch adds log statements before and after each test to outline the test boundaries, e.g.:
```
===== TEST OUTPUT FOR o.a.s.serializer.KryoSerializerSuite: 'kryo with parallelize for primitive arrays' =====
15/05/27 12:36:39.596 pool-1-thread-1-ScalaTest-running-KryoSerializerSuite INFO SparkContext: Starting job: count at KryoSerializerSuite.scala:230
15/05/27 12:36:39.596 dag-scheduler-event-loop INFO DAGScheduler: Got job 3 (count at KryoSerializerSuite.scala:230) with 4 output partitions (allowLocal=false)
15/05/27 12:36:39.596 dag-scheduler-event-loop INFO DAGScheduler: Final stage: ResultStage 3(count at KryoSerializerSuite.scala:230)
15/05/27 12:36:39.596 dag-scheduler-event-loop INFO DAGScheduler: Parents of final stage: List()
15/05/27 12:36:39.597 dag-scheduler-event-loop INFO DAGScheduler: Missing parents: List()
15/05/27 12:36:39.597 dag-scheduler-event-loop INFO DAGScheduler: Submitting ResultStage 3 (ParallelCollectionRDD[5] at parallelize at KryoSerializerSuite.scala:230), which has no missing parents
...
15/05/27 12:36:39.624 pool-1-thread-1-ScalaTest-running-KryoSerializerSuite INFO DAGScheduler: Job 3 finished: count at KryoSerializerSuite.scala:230, took 0.028563 s
15/05/27 12:36:39.625 pool-1-thread-1-ScalaTest-running-KryoSerializerSuite INFO KryoSerializerSuite:
***** FINISHED o.a.s.serializer.KryoSerializerSuite: 'kryo with parallelize for primitive arrays' *****
...
```
Author: Andrew Or <andrew@databricks.com>
Closes#6441 from andrewor14/demarcate-tests and squashes the following commits:
879b060 [Andrew Or] Fix compile after rebase
d622af7 [Andrew Or] Merge branch 'master' of github.com:apache/spark into demarcate-tests
017c8ba [Andrew Or] Merge branch 'master' of github.com:apache/spark into demarcate-tests
7790b6c [Andrew Or] Fix tests after logical merge conflict
c7460c0 [Andrew Or] Merge branch 'master' of github.com:apache/spark into demarcate-tests
c43ffc4 [Andrew Or] Fix tests?
8882581 [Andrew Or] Fix tests
ee22cda [Andrew Or] Fix log message
fa9450e [Andrew Or] Merge branch 'master' of github.com:apache/spark into demarcate-tests
12d1e1b [Andrew Or] Various whitespace changes (minor)
69cbb24 [Andrew Or] Make all test suites extend SparkFunSuite instead of FunSuite
bbce12e [Andrew Or] Fix manual things that cannot be covered through automation
da0b12f [Andrew Or] Add core tests as dependencies in all modules
f7d29ce [Andrew Or] Introduce base abstract class for all test suites
So we can enable a whitespace enforcement rule in the style checker to save code review time.
Author: Reynold Xin <rxin@databricks.com>
Closes#6478 from rxin/whitespace-hive and squashes the following commits:
e01b0e0 [Reynold Xin] Fixed tests.
a3bba22 [Reynold Xin] [SPARK-7927] whitespace fixes for Hive and ThriftServer.
https://issues.apache.org/jira/browse/SPARK-7853
This fixes the problem introduced by my change in https://github.com/apache/spark/pull/6435, which causes that Hive Context fails to create in spark shell because of the class loader issue.
Author: Yin Huai <yhuai@databricks.com>
Closes#6459 from yhuai/SPARK-7853 and squashes the following commits:
37ad33e [Yin Huai] Do not use hiveQlTable at all.
47cdb6d [Yin Huai] Move hiveconf.set to the end of setConf.
005649b [Yin Huai] Update comment.
35d86f3 [Yin Huai] Access TTable directly to make sure Hive will not internally use any metastore utility functions.
3737766 [Yin Huai] Recursively find all jars.
This PR is based on PR #6396 authored by chenghao-intel. Essentially, Spark SQL should use context classloader to load SerDe classes.
yhuai helped updating the test case, and I fixed a bug in the original `CliSuite`: while testing the CLI tool with `runCliWithin`, we don't append `\n` to the last query, thus the last query is never executed.
Original PR description is pasted below.
----
```
bin/spark-sql --jars ./sql/hive/src/test/resources/hive-hcatalog-core-0.13.1.jar
CREATE TABLE t1(a string, b string) ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe';
```
Throws exception like
```
15/05/26 00:16:33 ERROR SparkSQLDriver: Failed in [CREATE TABLE t1(a string, b string) ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe']
org.apache.spark.sql.execution.QueryExecutionException: FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. Cannot validate serde: org.apache.hive.hcatalog.data.JsonSerDe
at org.apache.spark.sql.hive.client.ClientWrapper$$anonfun$runHive$1.apply(ClientWrapper.scala:333)
at org.apache.spark.sql.hive.client.ClientWrapper$$anonfun$runHive$1.apply(ClientWrapper.scala:310)
at org.apache.spark.sql.hive.client.ClientWrapper.withHiveState(ClientWrapper.scala:139)
at org.apache.spark.sql.hive.client.ClientWrapper.runHive(ClientWrapper.scala:310)
at org.apache.spark.sql.hive.client.ClientWrapper.runSqlHive(ClientWrapper.scala:300)
at org.apache.spark.sql.hive.HiveContext.runSqlHive(HiveContext.scala:457)
at org.apache.spark.sql.hive.execution.HiveNativeCommand.run(HiveNativeCommand.scala:33)
at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57)
at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57)
at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:68)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:88)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:88)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:87)
at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:922)
at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:922)
at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:147)
at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:131)
at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:51)
at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:727)
at org.apache.spark.sql.hive.thriftserver.AbstractSparkSQLDriver.run(AbstractSparkSQLDriver.scala:57)
```
Author: Cheng Hao <hao.cheng@intel.com>
Author: Cheng Lian <lian@databricks.com>
Author: Yin Huai <yhuai@databricks.com>
Closes#6435 from liancheng/classLoader and squashes the following commits:
d4c4845 [Cheng Lian] Fixes CliSuite
75e80e2 [Yin Huai] Update the fix.
fd26533 [Cheng Hao] scalastyle
dd78775 [Cheng Hao] workaround for classloader of IsolatedClientLoader
As stated in SPARK-7684, currently `TestHive.reset` has some execution order specific bug, which makes running specific test suites locally pretty frustrating. This PR refactors `MetastoreDataSourcesSuite` (which relies on `TestHive.reset` heavily) using various `withXxx` utility methods in `SQLTestUtils` to ask each test case to cleanup their own mess so that we can avoid calling `TestHive.reset`.
Author: Cheng Lian <lian@databricks.com>
Author: Yin Huai <yhuai@databricks.com>
Closes#6353 from liancheng/workaround-spark-7684 and squashes the following commits:
26939aa [Yin Huai] Move the initialization of jsonFilePath to beforeAll.
a423d48 [Cheng Lian] Fixes Scala style issue
dfe45d0 [Cheng Lian] Refactors MetastoreDataSourcesSuite to workaround SPARK-7684
92a116d [Cheng Lian] Fixes minor styling issues
Author: Daoyuan Wang <daoyuan.wang@intel.com>
Closes#6318 from adrian-wang/dynpart and squashes the following commits:
ad73b61 [Daoyuan Wang] not use sqlTestUtils for try catch because dont have sqlcontext here
6c33b51 [Daoyuan Wang] fix according to liancheng
f0f8074 [Daoyuan Wang] some specific types as dynamic partition
This type is not really used. Might as well remove it.
Author: Reynold Xin <rxin@databricks.com>
Closes#6427 from rxin/evalutedType and squashes the following commits:
51a319a [Reynold Xin] [SPARK-7887][SQL] Remove EvaluatedType from SQL Expression.
I grep'ed hive-0.12.0 in the source code and removed all the profiles and doc references.
Author: Cheolsoo Park <cheolsoop@netflix.com>
Closes#6393 from piaozhexiu/SPARK-7850 and squashes the following commits:
fb429ce [Cheolsoo Park] Remove hive-0.13.1 profile
82bf09a [Cheolsoo Park] Remove hive 0.12.0 shim code
f3722da [Cheolsoo Park] Remove hive-0.12.0 profile and references from POM and build docs
So that potential partial/corrupted data files left by failed tasks/jobs won't affect normal data scan.
Author: Cheng Lian <lian@databricks.com>
Closes#6411 from liancheng/spark-7868 and squashes the following commits:
273ea36 [Cheng Lian] Ignores _temporary directories
In `DataSourceStrategy.createPhysicalRDD`, we use the relation schema as the target schema for converting incoming rows into Catalyst rows. However, we should be using the output schema instead, since our scan might return a subset of the relation's columns.
This patch incorporates #6414 by liancheng, which fixes an issue in `SimpleTestRelation` that prevented this bug from being caught by our old tests:
> In `SimpleTextRelation`, we specified `needsConversion` to `true`, indicating that values produced by this testing relation should be of Scala types, and need to be converted to Catalyst types when necessary. However, we also used `Cast` to convert strings to expected data types. And `Cast` always produces values of Catalyst types, thus no conversion is done at all. This PR makes `SimpleTextRelation` produce Scala values so that data conversion code paths can be properly tested.
Closes#5986.
Author: Josh Rosen <joshrosen@databricks.com>
Author: Cheng Lian <lian@databricks.com>
Author: Cheng Lian <liancheng@users.noreply.github.com>
Closes#6400 from JoshRosen/SPARK-7858 and squashes the following commits:
e71c866 [Josh Rosen] Re-fix bug so that the tests pass again
56b13e5 [Josh Rosen] Add regression test to hadoopFsRelationSuites
2169a0f [Josh Rosen] Remove use of SpecificMutableRow and BufferedIterator
6cd7366 [Josh Rosen] Fix SPARK-7858 by using output types for conversion.
5a00e66 [Josh Rosen] Add assertions in order to reproduce SPARK-7858
8ba195c [Cheng Lian] Merge 9968fba9979287aaa1f141ba18bfb9d4c116a3b3 into 61664732b2
9968fba [Cheng Lian] Tests the data type conversion code paths
When committing/aborting a write task issued in `InsertIntoHadoopFsRelation`, if an exception is thrown from `OutputWriter.close()`, the committing/aborting process will be interrupted, and leaves messy stuff behind (e.g., the `_temporary` directory created by `FileOutputCommitter`).
This PR makes these two process more robust by catching potential exceptions and falling back to normal task committment/abort.
Author: Cheng Lian <lian@databricks.com>
Closes#6378 from liancheng/spark-7838 and squashes the following commits:
f18253a [Cheng Lian] Makes task committing/aborting in InsertIntoHadoopFsRelation more robust
The "Database does not exist" error reported in SPARK-7684 was caused by `HiveContext.newTemporaryConfiguration()`, which always creates a new temporary metastore directory and returns a metastore configuration pointing that directory. This makes `TestHive.reset()` always replaces old temporary metastore with an empty new one.
Author: Cheng Lian <lian@databricks.com>
Closes#6359 from liancheng/spark-7684 and squashes the following commits:
95d2eb8 [Cheng Lian] Addresses @marmbrust's comment
042769d [Cheng Lian] Don't create new temp directory in HiveContext.newTemporaryConfiguration()
This one continues the work of https://github.com/apache/spark/pull/6216.
Author: Yin Huai <yhuai@databricks.com>
Author: Reynold Xin <rxin@databricks.com>
Closes#6366 from yhuai/insert and squashes the following commits:
3d717fb [Yin Huai] Use insertInto to handle the casue when table exists and Append is used for saveAsTable.
56d2540 [Yin Huai] Add PreWriteCheck to HiveContext's analyzer.
c636e35 [Yin Huai] Remove unnecessary empty lines.
cf83837 [Yin Huai] Move insertInto to write. Also, remove the partition columns from InsertIntoHadoopFsRelation.
0841a54 [Reynold Xin] Removed experimental tag for deprecated methods.
33ed8ef [Reynold Xin] [SPARK-7654][SQL] Move insertInto into reader/writer interface.
JIRA: https://issues.apache.org/jira/browse/SPARK-7270
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#5864 from viirya/dyn_partition_insert and squashes the following commits:
b5627df [Liang-Chi Hsieh] For comments.
3b21e4b [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into dyn_partition_insert
8a4352d [Liang-Chi Hsieh] Consider dynamic partition when inserting into hive table.
https://issues.apache.org/jira/browse/SPARK-7758
When initializing `executionHive`, we only masks
`javax.jdo.option.ConnectionURL` to override metastore location. However,
other properties that relates to the actual Hive metastore data source are not
masked. For example, when using Spark SQL with a PostgreSQL backed Hive
metastore, `executionHive` actually tries to use settings read from
`hive-site.xml`, which talks about PostgreSQL, to connect to the temporary
Derby metastore, thus causes error.
To fix this, we need to mask all metastore data source properties.
Specifically, according to the code of [Hive `ObjectStore.getDataSourceProps()`
method] [1], all properties whose name mentions "jdo" and "datanucleus" must be
included.
[1]: https://github.com/apache/hive/blob/release-0.13.1/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java#L288
Have tested using postgre sql as metastore, it worked fine.
Author: WangTaoTheTonic <wangtao111@huawei.com>
Closes#6314 from WangTaoTheTonic/SPARK-7758 and squashes the following commits:
ca7ae7c [WangTaoTheTonic] add comments
86caf2c [WangTaoTheTonic] delete unused import
e4f0feb [WangTaoTheTonic] block more data source related property
92a81fa [WangTaoTheTonic] fix style check
e3e683d [WangTaoTheTonic] override more configs to avoid failuer connecting to postgre sql
This closes#6104.
Author: Cheng Hao <hao.cheng@intel.com>
Author: Reynold Xin <rxin@databricks.com>
Closes#6343 from rxin/window-df and squashes the following commits:
026d587 [Reynold Xin] Address code review feedback.
dc448fe [Reynold Xin] Fixed Hive tests.
9794d9d [Reynold Xin] Moved Java test package.
9331605 [Reynold Xin] Refactored API.
3313e2a [Reynold Xin] Merge pull request #6104 from chenghao-intel/df_window
d625a64 [Cheng Hao] Update the dataframe window API as suggsted
c141fb1 [Cheng Hao] hide all of properties of the WindowFunctionDefinition
3b1865f [Cheng Hao] scaladoc typos
f3fd2d0 [Cheng Hao] polish the unit test
6847825 [Cheng Hao] Add additional analystcs functions
57e3bc0 [Cheng Hao] typos
24a08ec [Cheng Hao] scaladoc
28222ed [Cheng Hao] fix bug of range/row Frame
1d91865 [Cheng Hao] style issue
53f89f2 [Cheng Hao] remove the over from the functions.scala
964c013 [Cheng Hao] add more unit tests and window functions
64e18a7 [Cheng Hao] Add Window Function support for DataFrame
Author: Yin Huai <yhuai@databricks.com>
Author: Cheng Lian <lian@databricks.com>
Closes#6285 from liancheng/spark-7763 and squashes the following commits:
bb2829d [Yin Huai] Fix hashCode.
d677f7d [Cheng Lian] Fixes Scala style issue
44b283f [Cheng Lian] Adds test case for SPARK-7616
6733276 [Yin Huai] Fix a bug that potentially causes https://issues.apache.org/jira/browse/SPARK-7616.
6cabf3c [Yin Huai] Update unit test.
7e02910 [Yin Huai] Use metastore partition columns and do not hijack maybePartitionSpec.
e9a03ec [Cheng Lian] Persists partition columns into metastore
java.lang.Math.exp(1.0) has different result between jdk versions. so do not use createQueryTest, write a separate test for it.
```
jdk version result
1.7.0_11 2.7182818284590455
1.7.0_05 2.7182818284590455
1.7.0_71 2.718281828459045
```
Author: scwf <wangfei1@huawei.com>
Closes#6274 from scwf/java_method and squashes the following commits:
3dd2516 [scwf] address comments
5fa1459 [scwf] style
df46445 [scwf] fix test error
fcb6d22 [scwf] fix udf_java_method
When no partition columns can be found, we should have an empty `PartitionSpec`, rather than a `PartitionSpec` with empty partition columns.
This PR together with #6285 should fix SPARK-7749.
Author: Cheng Lian <lian@databricks.com>
Author: Yin Huai <yhuai@databricks.com>
Closes#6287 from liancheng/spark-7749 and squashes the following commits:
a799ff3 [Cheng Lian] Adds test cases for SPARK-7749
c4949be [Cheng Lian] Minor refactoring, and tolerant _TEMPORARY directory name
5aa87ea [Yin Huai] Make parsePartitions more robust.
fc56656 [Cheng Lian] Returns empty PartitionSpec if no partition columns can be inferred
19ae41e [Cheng Lian] Don't list base directory as leaf directory
Follow up of #6340, to avoid the test report missing once it fails.
Author: Cheng Hao <hao.cheng@intel.com>
Closes#6312 from chenghao-intel/rollup_minor and squashes the following commits:
b03a25f [Cheng Hao] simplify the testData instantiation
09b7e8b [Cheng Hao] move the testData into beforeAll()
This is a follow up for #6257, which broke the maven test.
Add cube & rollup for DataFrame
For example:
```scala
testData.rollup($"a" + $"b", $"b").agg(sum($"a" - $"b"))
testData.cube($"a" + $"b", $"b").agg(sum($"a" - $"b"))
```
Author: Cheng Hao <hao.cheng@intel.com>
Closes#6304 from chenghao-intel/rollup and squashes the following commits:
04bb1de [Cheng Hao] move the table register/unregister into beforeAll/afterAll
a6069f1 [Cheng Hao] cancel the implicit keyword
ced4b8f [Cheng Hao] remove the unnecessary code changes
9959dfa [Cheng Hao] update the code as comments
e1d88aa [Cheng Hao] update the code as suggested
03bc3d9 [Cheng Hao] Remove the CubedData & RollupedData
5fd62d0 [Cheng Hao] hiden the CubedData & RollupedData
5ffb196 [Cheng Hao] Add Cube / Rollup for dataframe
follow up for #5806
Author: scwf <wangfei1@huawei.com>
Closes#6164 from scwf/FunctionRegistry and squashes the following commits:
15e6697 [scwf] use catalogconf in FunctionRegistry
```
select explode(map(value, key)) from src;
```
Throws exception
```
org.apache.spark.sql.AnalysisException: The number of aliases supplied in the AS clause does not match the number of columns output by the UDTF expected 2 aliases but got _c0 ;
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:38)
at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:43)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveGenerate$.org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveGenerate$$makeGeneratorOutput(Analyzer.scala:605)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveGenerate$$anonfun$apply$16$$anonfun$22.apply(Analyzer.scala:562)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveGenerate$$anonfun$apply$16$$anonfun$22.apply(Analyzer.scala:548)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveGenerate$$anonfun$apply$16.applyOrElse(Analyzer.scala:548)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveGenerate$$anonfun$apply$16.applyOrElse(Analyzer.scala:538)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:222)
```
Author: Cheng Hao <hao.cheng@intel.com>
Closes#6178 from chenghao-intel/explode and squashes the following commits:
916fbe9 [Cheng Hao] add more strict rules for TGF alias
5c3f2c5 [Cheng Hao] fix bug in unit test
e1d93ab [Cheng Hao] Add more unit test
19db09e [Cheng Hao] resolve names for generator in projection
A follow-up to #6244.
Author: Michael Armbrust <michael@databricks.com>
Closes#6247 from marmbrus/fixOrcTests and squashes the following commits:
e39ee1b [Michael Armbrust] [SQL] Fix serializability of ORC table scan
Fix break caused by merging #6225 and #6194.
Author: Michael Armbrust <michael@databricks.com>
Closes#6244 from marmbrus/fixOrcBuildBreak and squashes the following commits:
b10e47b [Michael Armbrust] [HOTFIX] Fix ORC Build break
This PR introduces several performance optimizations to `HadoopFsRelation` and `ParquetRelation2`:
1. Moving `FileStatus` listing from `DataSourceStrategy` into a cache within `HadoopFsRelation`.
This new cache generalizes and replaces the one used in `ParquetRelation2`.
This also introduces an interface change: to reuse cached `FileStatus` objects, `HadoopFsRelation.buildScan` methods now receive `Array[FileStatus]` instead of `Array[String]`.
1. When Parquet task side metadata reading is enabled, skip reading row group information when reading Parquet footers.
This is basically what PR #5334 does. Also, now we uses `ParquetFileReader.readAllFootersInParallel` to read footers in parallel.
Another optimization in question is, instead of asking `HadoopFsRelation.buildScan` to return an `RDD[Row]` for a single selected partition and then union them all, we ask it to return an `RDD[Row]` for all selected partitions. This optimization is based on the fact that Hadoop configuration broadcasting used in `NewHadoopRDD` takes 34% time in the following microbenchmark. However, this complicates data source user code because user code must merge partition values manually.
To check the cost of broadcasting in `NewHadoopRDD`, I also did microbenchmark after removing the `broadcast` call in `NewHadoopRDD`. All results are shown below.
### Microbenchmark
#### Preparation code
Generating a partitioned table with 50k partitions, 1k rows per partition:
```scala
import sqlContext._
import sqlContext.implicits._
for (n <- 0 until 500) {
val data = for {
p <- (n * 10) until ((n + 1) * 10)
i <- 0 until 1000
} yield (i, f"val_$i%04d", f"$p%04d")
data.
toDF("a", "b", "p").
write.
partitionBy("p").
mode("append").
parquet(path)
}
```
#### Benchmarking code
```scala
import sqlContext._
import sqlContext.implicits._
import org.apache.spark.sql.types._
import com.google.common.base.Stopwatch
val path = "hdfs://localhost:9000/user/lian/5k"
def benchmark(n: Int)(f: => Unit) {
val stopwatch = new Stopwatch()
def run() = {
stopwatch.reset()
stopwatch.start()
f
stopwatch.stop()
stopwatch.elapsedMillis()
}
val records = (0 until n).map(_ => run())
(0 until n).foreach(i => println(s"Round $i: ${records(i)} ms"))
println(s"Average: ${records.sum / n.toDouble} ms")
}
benchmark(3) { read.parquet(path).explain(extended = true) }
```
#### Results
Before:
```
Round 0: 72528 ms
Round 1: 68938 ms
Round 2: 65372 ms
Average: 68946.0 ms
```
After:
```
Round 0: 59499 ms
Round 1: 53645 ms
Round 2: 53844 ms
Round 3: 49093 ms
Round 4: 50555 ms
Average: 53327.2 ms
```
Also removing Hadoop configuration broadcasting:
(Note that I was testing on a local laptop, thus network cost is pretty low.)
```
Round 0: 15806 ms
Round 1: 14394 ms
Round 2: 14699 ms
Round 3: 15334 ms
Round 4: 14123 ms
Average: 14871.2 ms
```
Author: Cheng Lian <lian@databricks.com>
Closes#6225 from liancheng/spark-7673 and squashes the following commits:
2d58a2b [Cheng Lian] Skips reading row group information when using task side metadata reading
7aa3748 [Cheng Lian] Optimizes FileStatusCache by introducing a map from parent directories to child files
ba41250 [Cheng Lian] Reuses HadoopFsRelation FileStatusCache in ParquetRelation2
3d278f7 [Cheng Lian] Fixes a bug when reading a single Parquet data file
b84612a [Cheng Lian] Fixes Scala style issue
6a08b02 [Cheng Lian] WIP: Moves file status cache into HadoopFSRelation
A modified version of https://github.com/apache/spark/pull/6110, use `semanticEquals` to make it more efficient.
Author: Wenchen Fan <cloud0fan@outlook.com>
Closes#6173 from cloud-fan/7269 and squashes the following commits:
e4a3cc7 [Wenchen Fan] address comments
cc02045 [Wenchen Fan] consider elements length equal
d7ff8f4 [Wenchen Fan] fix 7269
This PR updates PR #6135 authored by zhzhan from Hortonworks.
----
This PR implements a Spark SQL data source for accessing ORC files.
> **NOTE**
>
> Although ORC is now an Apache TLP, the codebase is still tightly coupled with Hive. That's why the new ORC data source is under `org.apache.spark.sql.hive` package, and must be used with `HiveContext`. However, it doesn't require existing Hive installation to access ORC files.
1. Saving/loading ORC files without contacting Hive metastore
1. Support for complex data types (i.e. array, map, and struct)
1. Aware of common optimizations provided by Spark SQL:
- Column pruning
- Partitioning pruning
- Filter push-down
1. Schema evolution support
1. Hive metastore table conversion
This PR also include initial work done by scwf from Huawei (PR #3753).
Author: Zhan Zhang <zhazhan@gmail.com>
Author: Cheng Lian <lian@databricks.com>
Closes#6194 from liancheng/polishing-orc and squashes the following commits:
55ecd96 [Cheng Lian] Reorganizes ORC test suites
d4afeed [Cheng Lian] Addresses comments
21ada22 [Cheng Lian] Adds @since and @Experimental annotations
128bd3b [Cheng Lian] ORC filter bug fix
d734496 [Cheng Lian] Polishes the ORC data source
2650a42 [Zhan Zhang] resolve review comments
3c9038e [Zhan Zhang] resolve review comments
7b3c7c5 [Zhan Zhang] save mode fix
f95abfd [Zhan Zhang] reuse test suite
7cc2c64 [Zhan Zhang] predicate fix
4e61c16 [Zhan Zhang] minor change
305418c [Zhan Zhang] orc data source support
Author: Michael Armbrust <michael@databricks.com>
Closes#6167 from marmbrus/configureIsolation and squashes the following commits:
6147cbe [Michael Armbrust] filter other conf
22cc3bc7 [Michael Armbrust] Merge remote-tracking branch 'origin/master' into configureIsolation
07476ee [Michael Armbrust] filter empty prefixes
dfdf19c [Michael Armbrust] [SPARK-6906][SQL] Allow configuration of classloader isolation for hive
Also moved all the deprecated functions into one place for SQLContext and DataFrame, and updated tests to use the new API.
Author: Reynold Xin <rxin@databricks.com>
Closes#6210 from rxin/df-writer-reader-jdbc and squashes the following commits:
7465c2c [Reynold Xin] Fixed unit test.
118e609 [Reynold Xin] Updated tests.
3441b57 [Reynold Xin] Updated javadoc.
13cdd1c [Reynold Xin] [SPARK-7654][SQL] Move JDBC into DataFrame's reader/writer interface.
This patch introduces DataFrameWriter and DataFrameReader.
DataFrameReader interface, accessible through SQLContext.read, contains methods that create DataFrames. These methods used to reside in SQLContext. Example usage:
```scala
sqlContext.read.json("...")
sqlContext.read.parquet("...")
```
DataFrameWriter interface, accessible through DataFrame.write, implements a builder pattern to avoid the proliferation of options in writing DataFrame out. It currently implements:
- mode
- format (e.g. "parquet", "json")
- options (generic options passed down into data sources)
- partitionBy (partitioning columns)
Example usage:
```scala
df.write.mode("append").format("json").partitionBy("date").saveAsTable("myJsonTable")
```
TODO:
- [ ] Documentation update
- [ ] Move JDBC into reader / writer?
- [ ] Deprecate the old interfaces
- [ ] Move the generic load interface into reader.
- [ ] Update example code and documentation
Author: Reynold Xin <rxin@databricks.com>
Closes#6175 from rxin/reader-writer and squashes the following commits:
b146c95 [Reynold Xin] Deprecation of old APIs.
bd8abdf [Reynold Xin] Fixed merge conflict.
26abea2 [Reynold Xin] Added general load methods.
244fbec [Reynold Xin] Added equivalent to example.
4f15d92 [Reynold Xin] Added documentation for partitionBy.
7e91611 [Reynold Xin] [SPARK-7654][SQL] DataFrameReader and DataFrameWriter for input/output API.
for example:
table: src(key string, value string)
sql: with v1 as(select key, count(value) over (partition by key) cnt_val from src), v2 as(select v1.key, v1_lag.cnt_val from v1, v1 v1_lag where v1.key = v1_lag.key) select * from v2 limit 5;
then will analyze fail when resolving conflicting references in Join:
'Limit 5
'Project [*]
'Subquery v2
'Project ['v1.key,'v1_lag.cnt_val]
'Filter ('v1.key = 'v1_lag.key)
'Join Inner, None
Subquery v1
Project [key#95,cnt_val#94L]
Window [key#95,value#96], [HiveWindowFunction#org.apache.hadoop.hive.ql.udf.generic.GenericUDAFCount(value#96) WindowSpecDefinition [key#95], [], ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING AS cnt_val#94L], WindowSpecDefinition [key#95], [], ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
Project [key#95,value#96]
MetastoreRelation default, src, None
Subquery v1_lag
Subquery v1
Project [key#97,cnt_val#94L]
Window [key#97,value#98], [HiveWindowFunction#org.apache.hadoop.hive.ql.udf.generic.GenericUDAFCount(value#98) WindowSpecDefinition [key#97], [], ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING AS cnt_val#94L], WindowSpecDefinition [key#97], [], ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
Project [key#97,value#98]
MetastoreRelation default, src, None
Conflicting attributes: cnt_val#94L
Author: linweizhong <linweizhong@huawei.com>
Closes#6114 from Sephiroth-Lin/spark-7595 and squashes the following commits:
f8f2637 [linweizhong] Add unit test
dfe9169 [linweizhong] Handle windowExpression with self join
JavaTypeInference into catalyst
types.DateUtils into catalyst
CacheManager into execution
DefaultParserDialect into catalyst
Author: Reynold Xin <rxin@databricks.com>
Closes#6108 from rxin/sql-rename and squashes the following commits:
3fc9613 [Reynold Xin] Fixed import ordering.
83d9ff4 [Reynold Xin] Fixed codegen tests.
e271e86 [Reynold Xin] mima
f4e24a6 [Reynold Xin] [SQL] Move some classes into packages that are more appropriate.
This PR migrates Parquet data source to the newly introduced `FSBasedRelation`. `FSBasedParquetRelation` is created to replace `ParquetRelation2`. Major differences are:
1. Partition discovery code has been factored out to `FSBasedRelation`
1. `AppendingParquetOutputFormat` is not used now. Instead, an anonymous subclass of `ParquetOutputFormat` is used to handle appending and writing dynamic partitions
1. When scanning partitioned tables, `FSBasedParquetRelation.buildScan` only builds an `RDD[Row]` for a single selected partition
1. `FSBasedParquetRelation` doesn't rely on Catalyst expressions for filter push down, thus it doesn't extend `CatalystScan` anymore
After migrating `JSONRelation` (which extends `CatalystScan`), we can remove `CatalystScan`.
<!-- Reviewable:start -->
[<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/6090)
<!-- Reviewable:end -->
Author: Cheng Lian <lian@databricks.com>
Closes#6090 from liancheng/parquet-migration and squashes the following commits:
6063f87 [Cheng Lian] Casts to OutputCommitter rather than FileOutputCommtter
bfd1cf0 [Cheng Lian] Fixes compilation error introduced while rebasing
f9ea56e [Cheng Lian] Adds ParquetRelation2 related classes to MiMa check whitelist
261d8c1 [Cheng Lian] Minor bug fix and more tests
db65660 [Cheng Lian] Migrates Parquet data source to FSBasedRelation
Some third-party UDTF extensions generate additional rows in the "GenericUDTF.close()" method, which is supported / documented by Hive.
https://cwiki.apache.org/confluence/display/Hive/DeveloperGuide+UDTF
However, Spark SQL ignores the "GenericUDTF.close()", and it causes bug while porting job from Hive to Spark SQL.
Author: Cheng Hao <hao.cheng@intel.com>
Closes#5383 from chenghao-intel/udtf_close and squashes the following commits:
98b4e4b [Cheng Hao] Support UDTF.close
Author: Cheng Lian <lian@databricks.com>
Closes#6123 from liancheng/remove-println and squashes the following commits:
03356b6 [Cheng Lian] Removes debugging println
This makes HiveContext.analyzer overrideable.
Author: Santiago M. Mola <santi@mola.io>
Closes#6086 from smola/patch-3 and squashes the following commits:
8ece136 [Santiago M. Mola] [SPARK-7566][SQL] Add type to HiveContext.analyzer
This pull request adds since tag to all public methods/classes in SQL/DataFrame to indicate which version the methods/classes were first added.
Author: Reynold Xin <rxin@databricks.com>
Closes#6101 from rxin/tbc and squashes the following commits:
ed55e11 [Reynold Xin] Add since version to all DataFrame methods.
This PR adds partitioning support for the external data sources API. It aims to simplify development of file system based data sources, and provide first class partitioning support for both read path and write path. Existing data sources like JSON and Parquet can be simplified with this work.
## New features provided
1. Hive compatible partition discovery
This actually generalizes the partition discovery strategy used in Parquet data source in Spark 1.3.0.
1. Generalized partition pruning optimization
Now partition pruning is handled during physical planning phase. Specific data sources don't need to worry about this harness anymore.
(This also implies that we can remove `CatalystScan` after migrating the Parquet data source, since now we don't need to pass Catalyst expressions to data source implementations.)
1. Insertion with dynamic partitions
When inserting data to a `FSBasedRelation`, data can be partitioned dynamically by specified partition columns.
## New structures provided
### Developer API
1. `FSBasedRelation`
Base abstract class for file system based data sources.
1. `OutputWriter`
Base abstract class for output row writers, responsible for writing a single row object.
1. `FSBasedRelationProvider`
A new relation provider for `FSBasedRelation` subclasses. Note that data sources extending `FSBasedRelation` don't need to extend `RelationProvider` and `SchemaRelationProvider`.
### User API
New overloaded versions of
1. `DataFrame.save()`
1. `DataFrame.saveAsTable()`
1. `SQLContext.load()`
are provided to allow users to save/load DataFrames with user defined dynamic partition columns.
### Spark SQL query planning
1. `InsertIntoFSBasedRelation`
Used to implement write path for `FSBasedRelation`s.
1. New rules for `FSBasedRelation` in `DataSourceStrategy`
These are added to hook `FSBasedRelation` into physical query plan in read path, and perform partition pruning.
## TODO
- [ ] Use scratch directories when overwriting a table with data selected from itself.
Currently, this is not supported, because the table been overwritten is always deleted before writing any data to it.
- [ ] When inserting with dynamic partition columns, use external sorter to group the data first.
This ensures that we only need to open a single `OutputWriter` at a time. For data sources like Parquet, `OutputWriter`s can be quite memory consuming. One issue is that, this approach breaks the row distribution in the original DataFrame. However, we did't promise to preserve data distribution when writing a DataFrame.
- [x] More tests. Specifically, test cases for
- [x] Self-join
- [x] Loading partitioned relations with a subset of partition columns stored in data files.
- [x] `SQLContext.load()` with user defined dynamic partition columns.
## Parquet data source migration
Parquet data source migration is covered in PR https://github.com/liancheng/spark/pull/6, which is against this PR branch and for preview only. A formal PR need to be made after this one is merged.
Author: Cheng Lian <lian@databricks.com>
Closes#5526 from liancheng/partitioning-support and squashes the following commits:
5351a1b [Cheng Lian] Fixes compilation error introduced while rebasing
1f9b1a5 [Cheng Lian] Tweaks data schema passed to FSBasedRelations
43ba50e [Cheng Lian] Avoids serializing generated projection code
edf49e7 [Cheng Lian] Removed commented stale code block
348a922 [Cheng Lian] Adds projection in FSBasedRelation.buildScan(requiredColumns, inputPaths)
ad4d4de [Cheng Lian] Enables HDFS style globbing
8d12e69 [Cheng Lian] Fixes compilation error
c71ac6c [Cheng Lian] Addresses comments from @marmbrus
7552168 [Cheng Lian] Fixes typo in MimaExclude.scala
0349e09 [Cheng Lian] Fixes compilation error introduced while rebasing
52b0c9b [Cheng Lian] Adjusts project/MimaExclude.scala
c466de6 [Cheng Lian] Addresses comments
bc3f9b4 [Cheng Lian] Uses projection to separate partition columns and data columns while inserting rows
795920a [Cheng Lian] Fixes compilation error after rebasing
0b8cd70 [Cheng Lian] Adds Scala/Catalyst row conversion when writing non-partitioned tables
fa543f3 [Cheng Lian] Addresses comments
5849dd0 [Cheng Lian] Fixes doc typos. Fixes partition discovery refresh.
51be443 [Cheng Lian] Replaces FSBasedRelation.outputCommitterClass with FSBasedRelation.prepareForWrite
c4ed4fe [Cheng Lian] Bug fixes and a new test suite
a29e663 [Cheng Lian] Bug fix: should only pass actuall data files to FSBaseRelation.buildScan
5f423d3 [Cheng Lian] Bug fixes. Lets data source to customize OutputCommitter rather than OutputFormat
54c3d7b [Cheng Lian] Enforces that FileOutputFormat must be used
be0c268 [Cheng Lian] Uses TaskAttempContext rather than Configuration in OutputWriter.init
0bc6ad1 [Cheng Lian] Resorts to new Hadoop API, and now FSBasedRelation can customize output format class
f320766 [Cheng Lian] Adds prepareForWrite() hook, refactored writer containers
422ff4a [Cheng Lian] Fixes style issue
ce52353 [Cheng Lian] Adds new SQLContext.load() overload with user defined dynamic partition columns
8d2ff71 [Cheng Lian] Merges partition columns when reading partitioned relations
ca1805b [Cheng Lian] Removes duplicated partition discovery code in new Parquet
f18dec2 [Cheng Lian] More strict schema checking
b746ab5 [Cheng Lian] More tests
9b487bf [Cheng Lian] Fixes compilation errors introduced while rebasing
ea6c8dd [Cheng Lian] Removes remote debugging stuff
327bb1d [Cheng Lian] Implements partitioning support for data sources API
3c5073a [Cheng Lian] Fixes SaveModes used in test cases
fb5a607 [Cheng Lian] Fixes compilation error
9d17607 [Cheng Lian] Adds the contract that OutputWriter should have zero-arg constructor
5de194a [Cheng Lian] Forgot Apache licence header
95d0b4d [Cheng Lian] Renames PartitionedSchemaRelationProvider to FSBasedRelationProvider
770b5ba [Cheng Lian] Adds tests for FSBasedRelation
3ba9bbf [Cheng Lian] Adds DataFrame.saveAsTable() overrides which support partitioning
1b8231f [Cheng Lian] Renames FSBasedPrunedFilteredScan to FSBasedRelation
aa8ba9a [Cheng Lian] Javadoc fix
012ed2d [Cheng Lian] Adds PartitioningOptions
7dd8dd5 [Cheng Lian] Adds new interfaces and stub methods for data sources API partitioning support