https://issues.apache.org/jira/browse/SPARK-7737
cc liancheng
Author: Yin Huai <yhuai@databricks.com>
Closes#6329 from yhuai/spark-7737 and squashes the following commits:
7e0dfc7 [Yin Huai] Use leaf dirs having data files to discover partitions.
(cherry picked from commit 347b50106b)
Signed-off-by: Cheng Lian <lian@databricks.com>
According to yhuai we spent 6-7 seconds cleaning closures in a partitioning job that takes 12 seconds. Since we provide these closures in Spark we know for sure they are serializable, so we can bypass the cleaning.
Author: Andrew Or <andrew@databricks.com>
Closes#6256 from andrewor14/sql-partition-speed-up and squashes the following commits:
a82b451 [Andrew Or] Fix style
10f7e3e [Andrew Or] Avoid getting call sites and cleaning closures
17e2943 [Andrew Or] Merge branch 'master' of github.com:apache/spark into sql-partition-speed-up
523f042 [Andrew Or] Skip unnecessary Utils.getCallSites too
f7fe143 [Andrew Or] Avoid unnecessary closure cleaning
(cherry picked from commit 5287eec5a6)
Signed-off-by: Yin Huai <yhuai@databricks.com>
Having a SQLContext singleton would make it easier for applications to use a lazily instantiated single shared instance of SQLContext when needed. It would avoid problems like
1. In REPL/notebook environment, rerunning the line {{val sqlContext = new SQLContext}} multiple times created different contexts while overriding the reference to previous context, leading to issues like registered temp tables going missing.
2. In Streaming, creating SQLContext directly leads to serialization/deserialization issues when attempting to recover from DStream checkpoints. See [SPARK-6770]. Also to get around this problem I had to suggest creating a singleton instance - https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/SqlNetworkWordCount.scala
This can be solved by {{SQLContext.getOrCreate}} which get or creates a new singleton instance of SQLContext using either a given SparkContext or a given SparkConf.
rxin marmbrus
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes#6006 from tdas/SPARK-7478 and squashes the following commits:
25f4da9 [Tathagata Das] Addressed comments.
79fe069 [Tathagata Das] Added comments.
c66ca76 [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-7478
48adb14 [Tathagata Das] Removed HiveContext.getOrCreate
bf8cf50 [Tathagata Das] Fix more bug
dec5594 [Tathagata Das] Fixed bug
b4e9721 [Tathagata Das] Remove unnecessary import
4ef513b [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-7478
d3ea8e4 [Tathagata Das] Added HiveContext
83bc950 [Tathagata Das] Updated tests
f82ae81 [Tathagata Das] Fixed test
bc72868 [Tathagata Das] Added SQLContext.getOrCreate
(cherry picked from commit 3d0cccc858)
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
Author: Yin Huai <yhuai@databricks.com>
Author: Cheng Lian <lian@databricks.com>
Closes#6285 from liancheng/spark-7763 and squashes the following commits:
bb2829d [Yin Huai] Fix hashCode.
d677f7d [Cheng Lian] Fixes Scala style issue
44b283f [Cheng Lian] Adds test case for SPARK-7616
6733276 [Yin Huai] Fix a bug that potentially causes https://issues.apache.org/jira/browse/SPARK-7616.
6cabf3c [Yin Huai] Update unit test.
7e02910 [Yin Huai] Use metastore partition columns and do not hijack maybePartitionSpec.
e9a03ec [Cheng Lian] Persists partition columns into metastore
(cherry picked from commit 30f3f556f7)
Signed-off-by: Yin Huai <yhuai@databricks.com>
java.lang.Math.exp(1.0) has different result between jdk versions. so do not use createQueryTest, write a separate test for it.
```
jdk version result
1.7.0_11 2.7182818284590455
1.7.0_05 2.7182818284590455
1.7.0_71 2.718281828459045
```
Author: scwf <wangfei1@huawei.com>
Closes#6274 from scwf/java_method and squashes the following commits:
3dd2516 [scwf] address comments
5fa1459 [scwf] style
df46445 [scwf] fix test error
fcb6d22 [scwf] fix udf_java_method
(cherry picked from commit f6c486aa4b)
Signed-off-by: Michael Armbrust <michael@databricks.com>
When no partition columns can be found, we should have an empty `PartitionSpec`, rather than a `PartitionSpec` with empty partition columns.
This PR together with #6285 should fix SPARK-7749.
Author: Cheng Lian <lian@databricks.com>
Author: Yin Huai <yhuai@databricks.com>
Closes#6287 from liancheng/spark-7749 and squashes the following commits:
a799ff3 [Cheng Lian] Adds test cases for SPARK-7749
c4949be [Cheng Lian] Minor refactoring, and tolerant _TEMPORARY directory name
5aa87ea [Yin Huai] Make parsePartitions more robust.
fc56656 [Cheng Lian] Returns empty PartitionSpec if no partition columns can be inferred
19ae41e [Cheng Lian] Don't list base directory as leaf directory
(cherry picked from commit 8730fbb47b)
Signed-off-by: Yin Huai <yhuai@databricks.com>
The key of Map in JsonRDD should be converted into UTF8String (also failed records), Thanks to yhuai viirya
Closes#6084
Author: Davies Liu <davies@databricks.com>
Closes#6299 from davies/string_in_json and squashes the following commits:
0dbf559 [Davies Liu] improve test, fix corrupt record
6836a80 [Davies Liu] move unit tests into Scala
b97af11 [Davies Liu] fix MapType in JsonRDD
(cherry picked from commit a25c1ab8f0)
Signed-off-by: Yin Huai <yhuai@databricks.com>
Follow up of #6340, to avoid the test report missing once it fails.
Author: Cheng Hao <hao.cheng@intel.com>
Closes#6312 from chenghao-intel/rollup_minor and squashes the following commits:
b03a25f [Cheng Hao] simplify the testData instantiation
09b7e8b [Cheng Hao] move the testData into beforeAll()
(cherry picked from commit feb3a9d3f8)
Signed-off-by: Yin Huai <yhuai@databricks.com>
JIRA: https://issues.apache.org/jira/browse/SPARK-7746
Looks like an easy to add parameter but can show significant performance improvement if the JDBC driver accepts it.
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#6283 from viirya/jdbc_fetchsize and squashes the following commits:
de47f94 [Liang-Chi Hsieh] Don't keep fetchSize as single parameter.
b7bff2f [Liang-Chi Hsieh] Add FetchSize parameter for JDBC driver.
(cherry picked from commit d0eb9ffe97)
Signed-off-by: Reynold Xin <rxin@databricks.com>
This is a follow up for #6257, which broke the maven test.
Add cube & rollup for DataFrame
For example:
```scala
testData.rollup($"a" + $"b", $"b").agg(sum($"a" - $"b"))
testData.cube($"a" + $"b", $"b").agg(sum($"a" - $"b"))
```
Author: Cheng Hao <hao.cheng@intel.com>
Closes#6304 from chenghao-intel/rollup and squashes the following commits:
04bb1de [Cheng Hao] move the table register/unregister into beforeAll/afterAll
a6069f1 [Cheng Hao] cancel the implicit keyword
ced4b8f [Cheng Hao] remove the unnecessary code changes
9959dfa [Cheng Hao] update the code as comments
e1d88aa [Cheng Hao] update the code as suggested
03bc3d9 [Cheng Hao] Remove the CubedData & RollupedData
5fd62d0 [Cheng Hao] hiden the CubedData & RollupedData
5ffb196 [Cheng Hao] Add Cube / Rollup for dataframe
(cherry picked from commit 42c592adb3)
Signed-off-by: Yin Huai <yhuai@databricks.com>
https://issues.apache.org/jira/browse/SPARK-7713
I tested the performance with the following code:
```scala
import sqlContext._
import sqlContext.implicits._
(1 to 5000).foreach { i =>
val df = (1 to 1000).map(j => (j, s"str$j")).toDF("a", "b").save(s"/tmp/partitioned/i=$i")
}
sqlContext.sql("""
CREATE TEMPORARY TABLE partitionedParquet
USING org.apache.spark.sql.parquet
OPTIONS (
path '/tmp/partitioned'
)""")
table("partitionedParquet").explain(true)
```
In our master `explain` takes 40s in my laptop. With this PR, `explain` takes 14s.
Author: Yin Huai <yhuai@databricks.com>
Closes#6252 from yhuai/broadcastHadoopConf and squashes the following commits:
6fa73df [Yin Huai] Address comments of Josh and Andrew.
807fbf9 [Yin Huai] Make the new buildScan and SqlNewHadoopRDD private sql.
e393555 [Yin Huai] Cheng's comments.
2eb53bb [Yin Huai] Use a shared broadcast Hadoop Configuration for partitioned HadoopFsRelations.
(cherry picked from commit b631bf73b9)
Signed-off-by: Yin Huai <yhuai@databricks.com>
follow up for #5806
Author: scwf <wangfei1@huawei.com>
Closes#6164 from scwf/FunctionRegistry and squashes the following commits:
15e6697 [scwf] use catalogconf in FunctionRegistry
(cherry picked from commit 60336e3bc0)
Signed-off-by: Michael Armbrust <michael@databricks.com>
```
select explode(map(value, key)) from src;
```
Throws exception
```
org.apache.spark.sql.AnalysisException: The number of aliases supplied in the AS clause does not match the number of columns output by the UDTF expected 2 aliases but got _c0 ;
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:38)
at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:43)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveGenerate$.org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveGenerate$$makeGeneratorOutput(Analyzer.scala:605)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveGenerate$$anonfun$apply$16$$anonfun$22.apply(Analyzer.scala:562)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveGenerate$$anonfun$apply$16$$anonfun$22.apply(Analyzer.scala:548)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveGenerate$$anonfun$apply$16.applyOrElse(Analyzer.scala:548)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveGenerate$$anonfun$apply$16.applyOrElse(Analyzer.scala:538)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:222)
```
Author: Cheng Hao <hao.cheng@intel.com>
Closes#6178 from chenghao-intel/explode and squashes the following commits:
916fbe9 [Cheng Hao] add more strict rules for TGF alias
5c3f2c5 [Cheng Hao] fix bug in unit test
e1d93ab [Cheng Hao] Add more unit test
19db09e [Cheng Hao] resolve names for generator in projection
(cherry picked from commit bcb1ff8146)
Signed-off-by: Michael Armbrust <michael@databricks.com>
In `DataFrame.describe()`, the `count` aggregate produces an integer, the `avg` and `stdev` aggregates produce doubles, and `min` and `max` aggregates can produce varying types depending on what type of column they're applied to. As a result, we should cast all aggregate results to String so that `describe()`'s output types match its declared output schema.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#6218 from JoshRosen/SPARK-7687 and squashes the following commits:
146b615 [Josh Rosen] Fix R test.
2974bd5 [Josh Rosen] Cast to string type instead
f206580 [Josh Rosen] Cast to double to fix SPARK-7687
307ecbf [Josh Rosen] Add failing regression test for SPARK-7687
(cherry picked from commit c9fa870a6d)
Signed-off-by: Reynold Xin <rxin@databricks.com>
This PR is based on #6081, thanks adrian-wang.
Closes#6081
Author: Daoyuan Wang <daoyuan.wang@intel.com>
Author: Davies Liu <davies@databricks.com>
Closes#6230 from davies/range and squashes the following commits:
d3ce5fe [Davies Liu] add tests
789eda5 [Davies Liu] add range() in Python
4590208 [Davies Liu] Merge commit 'refs/pull/6081/head' of github.com:apache/spark into range
cbf5200 [Daoyuan Wang] let's add python support in a separate PR
f45e3b2 [Daoyuan Wang] remove redundant toLong
617da76 [Daoyuan Wang] fix safe marge for corner cases
867c417 [Daoyuan Wang] fix
13dbe84 [Daoyuan Wang] update
bd998ba [Daoyuan Wang] update comments
d3a0c1b [Daoyuan Wang] add range api()
(cherry picked from commit c2437de189)
Signed-off-by: Reynold Xin <rxin@databricks.com>
A follow-up to #6244.
Author: Michael Armbrust <michael@databricks.com>
Closes#6247 from marmbrus/fixOrcTests and squashes the following commits:
e39ee1b [Michael Armbrust] [SQL] Fix serializability of ORC table scan
(cherry picked from commit eb4632f282)
Signed-off-by: Yin Huai <yhuai@databricks.com>
Fix break caused by merging #6225 and #6194.
Author: Michael Armbrust <michael@databricks.com>
Closes#6244 from marmbrus/fixOrcBuildBreak and squashes the following commits:
b10e47b [Michael Armbrust] [HOTFIX] Fix ORC Build break
(cherry picked from commit fcf90b75cc)
Signed-off-by: Andrew Or <andrew@databricks.com>
This PR revert #5404, change to pass the version of python in driver into JVM, check it in worker before deserializing closure, then it can works with different major version of Python.
Author: Davies Liu <davies@databricks.com>
Closes#6203 from davies/py_version and squashes the following commits:
b8fb76e [Davies Liu] fix test
6ce5096 [Davies Liu] use string for version
47c6278 [Davies Liu] check python version of worker with driver
(cherry picked from commit 32fbd297dd)
Signed-off-by: Josh Rosen <joshrosen@databricks.com>
This PR introduces several performance optimizations to `HadoopFsRelation` and `ParquetRelation2`:
1. Moving `FileStatus` listing from `DataSourceStrategy` into a cache within `HadoopFsRelation`.
This new cache generalizes and replaces the one used in `ParquetRelation2`.
This also introduces an interface change: to reuse cached `FileStatus` objects, `HadoopFsRelation.buildScan` methods now receive `Array[FileStatus]` instead of `Array[String]`.
1. When Parquet task side metadata reading is enabled, skip reading row group information when reading Parquet footers.
This is basically what PR #5334 does. Also, now we uses `ParquetFileReader.readAllFootersInParallel` to read footers in parallel.
Another optimization in question is, instead of asking `HadoopFsRelation.buildScan` to return an `RDD[Row]` for a single selected partition and then union them all, we ask it to return an `RDD[Row]` for all selected partitions. This optimization is based on the fact that Hadoop configuration broadcasting used in `NewHadoopRDD` takes 34% time in the following microbenchmark. However, this complicates data source user code because user code must merge partition values manually.
To check the cost of broadcasting in `NewHadoopRDD`, I also did microbenchmark after removing the `broadcast` call in `NewHadoopRDD`. All results are shown below.
### Microbenchmark
#### Preparation code
Generating a partitioned table with 50k partitions, 1k rows per partition:
```scala
import sqlContext._
import sqlContext.implicits._
for (n <- 0 until 500) {
val data = for {
p <- (n * 10) until ((n + 1) * 10)
i <- 0 until 1000
} yield (i, f"val_$i%04d", f"$p%04d")
data.
toDF("a", "b", "p").
write.
partitionBy("p").
mode("append").
parquet(path)
}
```
#### Benchmarking code
```scala
import sqlContext._
import sqlContext.implicits._
import org.apache.spark.sql.types._
import com.google.common.base.Stopwatch
val path = "hdfs://localhost:9000/user/lian/5k"
def benchmark(n: Int)(f: => Unit) {
val stopwatch = new Stopwatch()
def run() = {
stopwatch.reset()
stopwatch.start()
f
stopwatch.stop()
stopwatch.elapsedMillis()
}
val records = (0 until n).map(_ => run())
(0 until n).foreach(i => println(s"Round $i: ${records(i)} ms"))
println(s"Average: ${records.sum / n.toDouble} ms")
}
benchmark(3) { read.parquet(path).explain(extended = true) }
```
#### Results
Before:
```
Round 0: 72528 ms
Round 1: 68938 ms
Round 2: 65372 ms
Average: 68946.0 ms
```
After:
```
Round 0: 59499 ms
Round 1: 53645 ms
Round 2: 53844 ms
Round 3: 49093 ms
Round 4: 50555 ms
Average: 53327.2 ms
```
Also removing Hadoop configuration broadcasting:
(Note that I was testing on a local laptop, thus network cost is pretty low.)
```
Round 0: 15806 ms
Round 1: 14394 ms
Round 2: 14699 ms
Round 3: 15334 ms
Round 4: 14123 ms
Average: 14871.2 ms
```
Author: Cheng Lian <lian@databricks.com>
Closes#6225 from liancheng/spark-7673 and squashes the following commits:
2d58a2b [Cheng Lian] Skips reading row group information when using task side metadata reading
7aa3748 [Cheng Lian] Optimizes FileStatusCache by introducing a map from parent directories to child files
ba41250 [Cheng Lian] Reuses HadoopFsRelation FileStatusCache in ParquetRelation2
3d278f7 [Cheng Lian] Fixes a bug when reading a single Parquet data file
b84612a [Cheng Lian] Fixes Scala style issue
6a08b02 [Cheng Lian] WIP: Moves file status cache into HadoopFSRelation
(cherry picked from commit 9dadf019b9)
Signed-off-by: Yin Huai <yhuai@databricks.com>
cc liancheng marmbrus
Author: Yin Huai <yhuai@databricks.com>
Closes#6130 from yhuai/directOutput and squashes the following commits:
312b07d [Yin Huai] A data source can use spark.sql.sources.outputCommitterClass to override the output committer.
(cherry picked from commit 530397ba2f)
Signed-off-by: Michael Armbrust <michael@databricks.com>
A modified version of https://github.com/apache/spark/pull/6110, use `semanticEquals` to make it more efficient.
Author: Wenchen Fan <cloud0fan@outlook.com>
Closes#6173 from cloud-fan/7269 and squashes the following commits:
e4a3cc7 [Wenchen Fan] address comments
cc02045 [Wenchen Fan] consider elements length equal
d7ff8f4 [Wenchen Fan] fix 7269
(cherry picked from commit 103c863c2e)
Signed-off-by: Michael Armbrust <michael@databricks.com>
spark-sql>
> explain extended
> select * from (
> select key from src union all
> select key from src) t;
now the spark plan will print children in argString
```
== Physical Plan ==
Union[ HiveTableScan key#1, (MetastoreRelation default, src, None), None,
HiveTableScan key#3, (MetastoreRelation default, src, None), None]
HiveTableScan key#1, (MetastoreRelation default, src, None), None
HiveTableScan key#3, (MetastoreRelation default, src, None), None
```
after this patch:
```
== Physical Plan ==
Union
HiveTableScan [key#1], (MetastoreRelation default, src, None), None
HiveTableScan [key#3], (MetastoreRelation default, src, None), None
```
I have tested this locally
Author: scwf <wangfei1@huawei.com>
Closes#6144 from scwf/fix-argString and squashes the following commits:
1a642e0 [scwf] fix treenode argString
(cherry picked from commit fc2480ed13)
Signed-off-by: Michael Armbrust <michael@databricks.com>
This PR updates PR #6135 authored by zhzhan from Hortonworks.
----
This PR implements a Spark SQL data source for accessing ORC files.
> **NOTE**
>
> Although ORC is now an Apache TLP, the codebase is still tightly coupled with Hive. That's why the new ORC data source is under `org.apache.spark.sql.hive` package, and must be used with `HiveContext`. However, it doesn't require existing Hive installation to access ORC files.
1. Saving/loading ORC files without contacting Hive metastore
1. Support for complex data types (i.e. array, map, and struct)
1. Aware of common optimizations provided by Spark SQL:
- Column pruning
- Partitioning pruning
- Filter push-down
1. Schema evolution support
1. Hive metastore table conversion
This PR also include initial work done by scwf from Huawei (PR #3753).
Author: Zhan Zhang <zhazhan@gmail.com>
Author: Cheng Lian <lian@databricks.com>
Closes#6194 from liancheng/polishing-orc and squashes the following commits:
55ecd96 [Cheng Lian] Reorganizes ORC test suites
d4afeed [Cheng Lian] Addresses comments
21ada22 [Cheng Lian] Adds @since and @Experimental annotations
128bd3b [Cheng Lian] ORC filter bug fix
d734496 [Cheng Lian] Polishes the ORC data source
2650a42 [Zhan Zhang] resolve review comments
3c9038e [Zhan Zhang] resolve review comments
7b3c7c5 [Zhan Zhang] save mode fix
f95abfd [Zhan Zhang] reuse test suite
7cc2c64 [Zhan Zhang] predicate fix
4e61c16 [Zhan Zhang] minor change
305418c [Zhan Zhang] orc data source support
(cherry picked from commit aa31e431fc)
Signed-off-by: Michael Armbrust <michael@databricks.com>
Author: Wenchen Fan <cloud0fan@outlook.com>
Closes#6235 from cloud-fan/tmp and squashes the following commits:
8f16367 [Wenchen Fan] use private[this]
(cherry picked from commit 56ede88485)
Signed-off-by: Michael Armbrust <michael@databricks.com>
<!-- Reviewable:start -->
[<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/6091)
<!-- Reviewable:end -->
Author: Cheng Lian <lian@databricks.com>
Closes#6091 from liancheng/spark-7570 and squashes the following commits:
8ff07e8 [Cheng Lian] Ignores _temporary during partition discovery
(cherry picked from commit 010a1c2780)
Signed-off-by: Michael Armbrust <michael@databricks.com>