This patch introduces a new shuffle manager that enhances the existing sort-based shuffle with a new cache-friendly sort algorithm that operates directly on binary data. The goals of this patch are to lower memory usage and Java object overheads during shuffle and to speed up sorting. It also lays groundwork for follow-up patches that will enable end-to-end processing of serialized records.
The new shuffle manager, `UnsafeShuffleManager`, can be enabled by setting `spark.shuffle.manager=tungsten-sort` in SparkConf.
The new shuffle manager uses directly-managed memory to implement several performance optimizations for certain types of shuffles. In cases where the new performance optimizations cannot be applied, the new shuffle manager delegates to SortShuffleManager to handle those shuffles.
UnsafeShuffleManager's optimizations will apply when _all_ of the following conditions hold:
- The shuffle dependency specifies no aggregation or output ordering.
- The shuffle serializer supports relocation of serialized values (this is currently supported
by KryoSerializer and Spark SQL's custom serializers).
- The shuffle produces fewer than 16777216 output partitions.
- No individual record is larger than 128 MB when serialized.
In addition, extra spill-merging optimizations are automatically applied when the shuffle compression codec supports concatenation of serialized streams. This is currently supported by Spark's LZF serializer.
At a high-level, UnsafeShuffleManager's design is similar to Spark's existing SortShuffleManager. In sort-based shuffle, incoming records are sorted according to their target partition ids, then written to a single map output file. Reducers fetch contiguous regions of this file in order to read their portion of the map output. In cases where the map output data is too large to fit in memory, sorted subsets of the output can are spilled to disk and those on-disk files are merged to produce the final output file.
UnsafeShuffleManager optimizes this process in several ways:
- Its sort operates on serialized binary data rather than Java objects, which reduces memory consumption and GC overheads. This optimization requires the record serializer to have certain properties to allow serialized records to be re-ordered without requiring deserialization. See SPARK-4550, where this optimization was first proposed and implemented, for more details.
- It uses a specialized cache-efficient sorter (UnsafeShuffleExternalSorter) that sorts arrays of compressed record pointers and partition ids. By using only 8 bytes of space per record in the sorting array, this fits more of the array into cache.
- The spill merging procedure operates on blocks of serialized records that belong to the same partition and does not need to deserialize records during the merge.
- When the spill compression codec supports concatenation of compressed data, the spill merge simply concatenates the serialized and compressed spill partitions to produce the final output partition. This allows efficient data copying methods, like NIO's `transferTo`, to be used and avoids the need to allocate decompression or copying buffers during the merge.
The shuffle read path is unchanged.
This patch is similar to [SPARK-4550](http://issues.apache.org/jira/browse/SPARK-4550) / #4450 but uses a slightly different implementation. The `unsafe`-based implementation featured in this patch lays the groundwork for followup patches that will enable sorting to operate on serialized data pages that will be prepared by Spark SQL's new `unsafe` operators (such as the new aggregation operator introduced in #5725).
### Future work
There are several tasks that build upon this patch, which will be left to future work:
- [SPARK-7271](https://issues.apache.org/jira/browse/SPARK-7271) Redesign / extend the shuffle interfaces to accept binary data as input. The goal here is to let us bypass serialization steps in cases where the sort input is produced by an operator that operates directly on binary data.
- Extension / redesign of the `Serializer` API. We can add new methods which allow serializers to determine the size requirements for serializing objects and for serializing objects directly to a specified memory address (similar to how `UnsafeRowConverter` works in Spark SQL).
<!-- Reviewable:start -->
[<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/5868)
<!-- Reviewable:end -->
Author: Josh Rosen <joshrosen@databricks.com>
Closes#5868 from JoshRosen/unsafe-sort and squashes the following commits:
ef0a86e [Josh Rosen] Fix scalastyle errors
7610f2f [Josh Rosen] Add tests for proper cleanup of shuffle data.
d494ffe [Josh Rosen] Fix deserialization of JavaSerializer instances.
52a9981 [Josh Rosen] Fix some bugs in the address packing code.
51812a7 [Josh Rosen] Change shuffle manager sort name to tungsten-sort
4023fa4 [Josh Rosen] Add @Private annotation to some Java classes.
de40b9d [Josh Rosen] More comments to try to explain metrics code
df07699 [Josh Rosen] Attempt to clarify confusing metrics update code
5e189c6 [Josh Rosen] Track time spend closing / flushing files; split TimeTrackingOutputStream into separate file.
d5779c6 [Josh Rosen] Merge remote-tracking branch 'origin/master' into unsafe-sort
c2ce78e [Josh Rosen] Fix a missed usage of MAX_PARTITION_ID
e3b8855 [Josh Rosen] Cleanup in UnsafeShuffleWriter
4a2c785 [Josh Rosen] rename 'sort buffer' to 'pointer array'
6276168 [Josh Rosen] Remove ability to disable spilling in UnsafeShuffleExternalSorter.
57312c9 [Josh Rosen] Clarify fileBufferSize units
2d4e4f4 [Josh Rosen] Address some minor comments in UnsafeShuffleExternalSorter.
fdcac08 [Josh Rosen] Guard against overflow when expanding sort buffer.
85da63f [Josh Rosen] Cleanup in UnsafeShuffleSorterIterator.
0ad34da [Josh Rosen] Fix off-by-one in nextInt() call
56781a1 [Josh Rosen] Rename UnsafeShuffleSorter to UnsafeShuffleInMemorySorter
e995d1a [Josh Rosen] Introduce MAX_SHUFFLE_OUTPUT_PARTITIONS.
e58a6b4 [Josh Rosen] Add more tests for PackedRecordPointer encoding.
4f0b770 [Josh Rosen] Attempt to implement proper shuffle write metrics.
d4e6d89 [Josh Rosen] Update to bit shifting constants
69d5899 [Josh Rosen] Remove some unnecessary override vals
8531286 [Josh Rosen] Add tests that automatically trigger spills.
7c953f9 [Josh Rosen] Add test that covers UnsafeShuffleSortDataFormat.swap().
e1855e5 [Josh Rosen] Fix a handful of misc. IntelliJ inspections
39434f9 [Josh Rosen] Avoid integer multiplication overflow in getMemoryUsage (thanks FindBugs!)
1e3ad52 [Josh Rosen] Delete unused ByteBufferOutputStream class.
ea4f85f [Josh Rosen] Roll back an unnecessary change in Spillable.
ae538dc [Josh Rosen] Document UnsafeShuffleManager.
ec6d626 [Josh Rosen] Add notes on maximum # of supported shuffle partitions.
0d4d199 [Josh Rosen] Bump up shuffle.memoryFraction to make tests pass.
b3b1924 [Josh Rosen] Properly implement close() and flush() in DummySerializerInstance.
1ef56c7 [Josh Rosen] Revise compression codec support in merger; test cross product of configurations.
b57c17f [Josh Rosen] Disable some overly-verbose logs that rendered DEBUG useless.
f780fb1 [Josh Rosen] Add test demonstrating which compression codecs support concatenation.
4a01c45 [Josh Rosen] Remove unnecessary log message
27b18b0 [Josh Rosen] That for inserting records AT the max record size.
fcd9a3c [Josh Rosen] Add notes + tests for maximum record / page sizes.
9d1ee7c [Josh Rosen] Fix MiMa excludes for ShuffleWriter change
fd4bb9e [Josh Rosen] Use own ByteBufferOutputStream rather than Kryo's
67d25ba [Josh Rosen] Update Exchange operator's copying logic to account for new shuffle manager
8f5061a [Josh Rosen] Strengthen assertion to check partitioning
01afc74 [Josh Rosen] Actually read data in UnsafeShuffleWriterSuite
1929a74 [Josh Rosen] Update to reflect upstream ShuffleBlockManager -> ShuffleBlockResolver rename.
e8718dd [Josh Rosen] Merge remote-tracking branch 'origin/master' into unsafe-sort
9b7ebed [Josh Rosen] More defensive programming RE: cleaning up spill files and memory after errors
7cd013b [Josh Rosen] Begin refactoring to enable proper tests for spilling.
722849b [Josh Rosen] Add workaround for transferTo() bug in merging code; refactor tests.
9883e30 [Josh Rosen] Merge remote-tracking branch 'origin/master' into unsafe-sort
b95e642 [Josh Rosen] Refactor and document logic that decides when to spill.
1ce1300 [Josh Rosen] More minor cleanup
5e8cf75 [Josh Rosen] More minor cleanup
e67f1ea [Josh Rosen] Remove upper type bound in ShuffleWriter interface.
cfe0ec4 [Josh Rosen] Address a number of minor review comments:
8a6fe52 [Josh Rosen] Rename UnsafeShuffleSpillWriter to UnsafeShuffleExternalSorter
11feeb6 [Josh Rosen] Update TODOs related to shuffle write metrics.
b674412 [Josh Rosen] Merge remote-tracking branch 'origin/master' into unsafe-sort
aaea17b [Josh Rosen] Add comments to UnsafeShuffleSpillWriter.
4f70141 [Josh Rosen] Fix merging; now passes UnsafeShuffleSuite tests.
133c8c9 [Josh Rosen] WIP towards testing UnsafeShuffleWriter.
f480fb2 [Josh Rosen] WIP in mega-refactoring towards shuffle-specific sort.
57f1ec0 [Josh Rosen] WIP towards packed record pointers for use in optimized shuffle sort.
69232fd [Josh Rosen] Enable compressible address encoding for off-heap mode.
7ee918e [Josh Rosen] Re-order imports in tests
3aeaff7 [Josh Rosen] More refactoring and cleanup; begin cleaning iterator interfaces
3490512 [Josh Rosen] Misc. cleanup
f156a8f [Josh Rosen] Hacky metrics integration; refactor some interfaces.
2776aca [Josh Rosen] First passing test for ExternalSorter.
5e100b2 [Josh Rosen] Super-messy WIP on external sort
595923a [Josh Rosen] Remove some unused variables.
8958584 [Josh Rosen] Fix bug in calculating free space in current page.
f17fa8f [Josh Rosen] Add missing newline
c2fca17 [Josh Rosen] Small refactoring of SerializerPropertiesSuite to enable test re-use:
b8a09fe [Josh Rosen] Back out accidental log4j.properties change
bfc12d3 [Josh Rosen] Add tests for serializer relocation property.
240864c [Josh Rosen] Remove PrefixComputer and require prefix to be specified as part of insert()
1433b42 [Josh Rosen] Store record length as int instead of long.
026b497 [Josh Rosen] Re-use a buffer in UnsafeShuffleWriter
0748458 [Josh Rosen] Port UnsafeShuffleWriter to Java.
87e721b [Josh Rosen] Renaming and comments
d3cc310 [Josh Rosen] Flag that SparkSqlSerializer2 supports relocation
e2d96ca [Josh Rosen] Expand serializer API and use new function to help control when new UnsafeShuffle path is used.
e267cee [Josh Rosen] Fix compilation of UnsafeSorterSuite
9c6cf58 [Josh Rosen] Refactor to use DiskBlockObjectWriter.
253f13e [Josh Rosen] More cleanup
8e3ec20 [Josh Rosen] Begin code cleanup.
4d2f5e1 [Josh Rosen] WIP
3db12de [Josh Rosen] Minor simplification and sanity checks in UnsafeSorter
767d3ca [Josh Rosen] Fix invalid range in UnsafeSorter.
e900152 [Josh Rosen] Add test for empty iterator in UnsafeSorter
57a4ea0 [Josh Rosen] Make initialSize configurable in UnsafeSorter
abf7bfe [Josh Rosen] Add basic test case.
81d52c5 [Josh Rosen] WIP on UnsafeSorter
(cherry picked from commit 73bed408fb)
Signed-off-by: Reynold Xin <rxin@databricks.com>
This is meant to make the FlumePollingStreamSuite deterministic. Now we basically count the number of batches that have been completed - and then verify the results rather than sleeping for random periods of time.
Author: Hari Shreedharan <hshreedharan@apache.org>
Closes#5918 from harishreedharan/flume-test-fix and squashes the following commits:
93f24f3 [Hari Shreedharan] Add an eventually block to ensure that all received data is processed. Refactor the dstream creation and remove redundant code.
1108804 [Hari Shreedharan] [SPARK-7356][STREAMING] Fix flakey tests in FlumePollingStreamSuite using SparkSink's batch CountDownLatch.
(cherry picked from commit 61d1e87c0d)
Signed-off-by: Andrew Or <andrew@databricks.com>
zsxwing
Author: Andrew Or <andrew@databricks.com>
Closes#6134 from andrewor14/private-streaming-uiutils and squashes the following commits:
225df94 [Andrew Or] Privatize class
(cherry picked from commit bb6dec3b16)
Signed-off-by: Andrew Or <andrew@databricks.com>
This is pretty useful for MLlib.
<img src="https://cloud.githubusercontent.com/assets/2133137/7599650/c7d03dd8-f8b8-11e4-8c0a-0a89e786c90f.png" width="400px"/>
Author: Andrew Or <andrew@databricks.com>
Closes#6100 from andrewor14/dag-viz-hover and squashes the following commits:
fefe2af [Andrew Or] Link tooltips for nodes that belong to the same RDD
90c6a7e [Andrew Or] Assign classes to clusters and nodes, not IDs
(cherry picked from commit 44403414d3)
Signed-off-by: Andrew Or <andrew@databricks.com>
Subsequent fix following #5966. I tried this out locally.
Author: Andrew Or <andrew@databricks.com>
Closes#6129 from andrewor14/211-compilation and squashes the following commits:
713868f [Andrew Or] Fix compilation issue for scala 2.11
(cherry picked from commit f88ac70155)
Signed-off-by: Andrew Or <andrew@databricks.com>
This is necessary for streaming and long-running Spark applications. zsxwing tdas
Author: Andrew Or <andrew@databricks.com>
Closes#6125 from andrewor14/viz-listener-leak and squashes the following commits:
8660949 [Andrew Or] Fix thing + add tests
33c0843 [Andrew Or] Clean up old job state
(cherry picked from commit f6e18388d9)
Signed-off-by: Andrew Or <andrew@databricks.com>
JavaTypeInference into catalyst
types.DateUtils into catalyst
CacheManager into execution
DefaultParserDialect into catalyst
Author: Reynold Xin <rxin@databricks.com>
Closes#6108 from rxin/sql-rename and squashes the following commits:
3fc9613 [Reynold Xin] Fixed import ordering.
83d9ff4 [Reynold Xin] Fixed codegen tests.
e271e86 [Reynold Xin] mima
f4e24a6 [Reynold Xin] [SQL] Move some classes into packages that are more appropriate.
(cherry picked from commit e683182c3e)
Signed-off-by: Michael Armbrust <michael@databricks.com>
Optimize the case of `project(_, sort)` , a example is:
`select key from (select * from testData order by key) t`
before this PR:
```
== Parsed Logical Plan ==
'Project ['key]
'Subquery t
'Sort ['key ASC], true
'Project [*]
'UnresolvedRelation [testData], None
== Analyzed Logical Plan ==
Project [key#0]
Subquery t
Sort [key#0 ASC], true
Project [key#0,value#1]
Subquery testData
LogicalRDD [key#0,value#1], MapPartitionsRDD[1]
== Optimized Logical Plan ==
Project [key#0]
Sort [key#0 ASC], true
LogicalRDD [key#0,value#1], MapPartitionsRDD[1]
== Physical Plan ==
Project [key#0]
Sort [key#0 ASC], true
Exchange (RangePartitioning [key#0 ASC], 5), []
PhysicalRDD [key#0,value#1], MapPartitionsRDD[1]
```
after this PR
```
== Parsed Logical Plan ==
'Project ['key]
'Subquery t
'Sort ['key ASC], true
'Project [*]
'UnresolvedRelation [testData], None
== Analyzed Logical Plan ==
Project [key#0]
Subquery t
Sort [key#0 ASC], true
Project [key#0,value#1]
Subquery testData
LogicalRDD [key#0,value#1], MapPartitionsRDD[1]
== Optimized Logical Plan ==
Sort [key#0 ASC], true
Project [key#0]
LogicalRDD [key#0,value#1], MapPartitionsRDD[1]
== Physical Plan ==
Sort [key#0 ASC], true
Exchange (RangePartitioning [key#0 ASC], 5), []
Project [key#0]
PhysicalRDD [key#0,value#1], MapPartitionsRDD[1]
```
with this rule we will first do column pruning on the table and then do sorting.
Author: scwf <wangfei1@huawei.com>
This patch had conflicts when merged, resolved by
Committer: Michael Armbrust <michael@databricks.com>
Closes#5838 from scwf/pruning and squashes the following commits:
b00d833 [scwf] address michael's comment
e230155 [scwf] fix tests failure
b09b895 [scwf] improve column pruning
(cherry picked from commit 59250fe514)
Signed-off-by: Michael Armbrust <michael@databricks.com>
The missing pieces in ml.classification for Python!
cc mengxr
Author: Burak Yavuz <brkyvz@gmail.com>
Closes#6106 from brkyvz/ml-class and squashes the following commits:
dd78237 [Burak Yavuz] fix style
1048e29 [Burak Yavuz] ready for PR
(cherry picked from commit df2fb1305a)
Signed-off-by: Xiangrui Meng <meng@databricks.com>
Author: leahmcguire <lmcguire@salesforce.com>
Closes#6073 from leahmcguire/binaryCheckNB and squashes the following commits:
b8442c2 [leahmcguire] changed to if else for value checks
911bf83 [leahmcguire] undid reformat
4eedf1e [leahmcguire] moved bernoulli check
9ee9e84 [leahmcguire] fixed style error
3f3b32c [leahmcguire] fixed zero one check so only called in combiner
831fd27 [leahmcguire] got test working
f44bb3c [leahmcguire] removed changes from CV branch
67253f0 [leahmcguire] added check to bernoulli to ensure feature values are zero or one
f191c71 [leahmcguire] fixed name
58d060b [leahmcguire] changed param name and test according to comments
04f0d3c [leahmcguire] Added stats from cross validation as a val in the cross validation model to save them for user access
(cherry picked from commit 61e05fc58e)
Signed-off-by: Joseph K. Bradley <joseph@databricks.com>
Added `ml.feature.Bucketizer` to PySpark.
cc mengxr
Author: Burak Yavuz <brkyvz@gmail.com>
Closes#6124 from brkyvz/ml-bucket and squashes the following commits:
05285be [Burak Yavuz] added sphinx doc
6abb6ed [Burak Yavuz] added support for Bucketizer
(cherry picked from commit 5db18ba6e1)
Signed-off-by: Xiangrui Meng <meng@databricks.com>
This PR migrates Parquet data source to the newly introduced `FSBasedRelation`. `FSBasedParquetRelation` is created to replace `ParquetRelation2`. Major differences are:
1. Partition discovery code has been factored out to `FSBasedRelation`
1. `AppendingParquetOutputFormat` is not used now. Instead, an anonymous subclass of `ParquetOutputFormat` is used to handle appending and writing dynamic partitions
1. When scanning partitioned tables, `FSBasedParquetRelation.buildScan` only builds an `RDD[Row]` for a single selected partition
1. `FSBasedParquetRelation` doesn't rely on Catalyst expressions for filter push down, thus it doesn't extend `CatalystScan` anymore
After migrating `JSONRelation` (which extends `CatalystScan`), we can remove `CatalystScan`.
<!-- Reviewable:start -->
[<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/6090)
<!-- Reviewable:end -->
Author: Cheng Lian <lian@databricks.com>
Closes#6090 from liancheng/parquet-migration and squashes the following commits:
6063f87 [Cheng Lian] Casts to OutputCommitter rather than FileOutputCommtter
bfd1cf0 [Cheng Lian] Fixes compilation error introduced while rebasing
f9ea56e [Cheng Lian] Adds ParquetRelation2 related classes to MiMa check whitelist
261d8c1 [Cheng Lian] Minor bug fix and more tests
db65660 [Cheng Lian] Migrates Parquet data source to FSBasedRelation
(cherry picked from commit 7ff16e8abe)
Signed-off-by: Michael Armbrust <michael@databricks.com>
Some third-party UDTF extensions generate additional rows in the "GenericUDTF.close()" method, which is supported / documented by Hive.
https://cwiki.apache.org/confluence/display/Hive/DeveloperGuide+UDTF
However, Spark SQL ignores the "GenericUDTF.close()", and it causes bug while porting job from Hive to Spark SQL.
Author: Cheng Hao <hao.cheng@intel.com>
Closes#5383 from chenghao-intel/udtf_close and squashes the following commits:
98b4e4b [Cheng Hao] Support UDTF.close
(cherry picked from commit 0da254fb29)
Signed-off-by: Cheng Lian <lian@databricks.com>
Author: Cheng Lian <lian@databricks.com>
Closes#6123 from liancheng/remove-println and squashes the following commits:
03356b6 [Cheng Lian] Removes debugging println
(cherry picked from commit aa6ba3f216)
Signed-off-by: Cheng Lian <lian@databricks.com>
We need to add a log entry before calling `abortTask`/`abortJob`. Otherwise, an exception from `abortTask`/`abortJob` will shadow the real cause.
cc liancheng
Author: Yin Huai <yhuai@databricks.com>
Closes#6105 from yhuai/logCause and squashes the following commits:
8dfe0d8 [Yin Huai] Log cause.
(cherry picked from commit b061bd517a)
Signed-off-by: Cheng Lian <lian@databricks.com>
Author: Cheng Lian <lian@databricks.com>
Closes#6118 from liancheng/spark-7599 and squashes the following commits:
31e1bd6 [Cheng Lian] Don't restrict customized output committers to be subclasses of FileOutputCommitter
(cherry picked from commit 10c546e9d4)
Signed-off-by: Yin Huai <yhuai@databricks.com>
escape spaces in the arguments.
Author: Masayoshi TSUZUKI <tsudukim@oss.nttdata.co.jp>
Author: Kousuke Saruta <sarutak@oss.nttdata.co.jp>
Closes#5447 from tsudukim/feature/SPARK-6568-2 and squashes the following commits:
3f9a188 [Masayoshi TSUZUKI] modified some errors.
ed46047 [Masayoshi TSUZUKI] avoid scalastyle errors.
1784239 [Masayoshi TSUZUKI] removed Utils.formatPath.
e03f289 [Masayoshi TSUZUKI] removed testWindows from Utils.resolveURI and Utils.resolveURIs. replaced SystemUtils.IS_OS_WINDOWS to Utils.isWindows. removed Utils.formatPath from PythonRunner.scala.
84c33d0 [Masayoshi TSUZUKI] - use resolveURI in nonLocalPaths - run tests for Windows path only on Windows
016128d [Masayoshi TSUZUKI] fixed to use File.toURI()
2c62e3b [Masayoshi TSUZUKI] Merge pull request #1 from sarutak/SPARK-6568-2
7019a8a [Masayoshi TSUZUKI] Merge branch 'master' of https://github.com/apache/spark into feature/SPARK-6568-2
45946ee [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-6568-2
10f1c73 [Kousuke Saruta] Added a comment
93c3c40 [Kousuke Saruta] Merge branch 'classpath-handling-fix' of github.com:sarutak/spark into SPARK-6568-2
649da82 [Kousuke Saruta] Fix classpath handling
c7ba6a7 [Masayoshi TSUZUKI] [SPARK-6568] spark-shell.cmd --jars option does not accept the jar that has space in its path
(cherry picked from commit 50c7270801)
Signed-off-by: Sean Owen <sowen@cloudera.com>
These R process only used to communicate with JVM process on local, so binding to localhost is more reasonable then wildcard ip.
Author: linweizhong <linweizhong@huawei.com>
Closes#6053 from Sephiroth-Lin/spark-7526 and squashes the following commits:
5303af7 [linweizhong] bind to localhost rather than wildcard ip
(cherry picked from commit 98195c3031)
Signed-off-by: Shivaram Venkataraman <shivaram@cs.berkeley.edu>
Author: Sun Rui <rui.sun@intel.com>
Closes#6007 from sun-rui/SPARK-7482 and squashes the following commits:
5c5cf5e [Sun Rui] Implement alias loadDF() as a new function.
3a30c10 [Sun Rui] Rename load()/save() to read.df()/write.df(). Also add loadDF()/saveDF() as aliases.
9f569d6 [Sun Rui] [SPARK-7482][SparkR] Rename some DataFrame API methods in SparkR to match their counterparts in Scala.
(cherry picked from commit df9b94a57c)
Signed-off-by: Shivaram Venkataraman <shivaram@cs.berkeley.edu>
This makes HiveContext.analyzer overrideable.
Author: Santiago M. Mola <santi@mola.io>
Closes#6086 from smola/patch-3 and squashes the following commits:
8ece136 [Santiago M. Mola] [SPARK-7566][SQL] Add type to HiveContext.analyzer
(cherry picked from commit 208b902257)
Signed-off-by: Reynold Xin <rxin@databricks.com>
This builds on https://github.com/apache/spark/pull/5932 and should close https://github.com/apache/spark/pull/5932 as well.
As an example:
```python
df.select(when(df['age'] == 2, 3).otherwise(4).alias("age")).collect()
```
Author: Reynold Xin <rxin@databricks.com>
Author: kaka1992 <kaka_1992@163.com>
Closes#6072 from rxin/when-expr and squashes the following commits:
8f49201 [Reynold Xin] Throw exception if otherwise is applied twice.
0455eda [Reynold Xin] Reset run-tests.
bfb9d9f [Reynold Xin] Updated documentation and test cases.
762f6a5 [Reynold Xin] Merge pull request #5932 from kaka1992/IFCASE
95724c6 [kaka1992] Update
8218d0a [kaka1992] Update
801009e [kaka1992] Update
76d6346 [kaka1992] [SPARK-7321][SQL] Add Column expression for conditional statements (if, case)
(cherry picked from commit 97dee313f2)
Signed-off-by: Reynold Xin <rxin@databricks.com>
This pull request adds since tag to all public methods/classes in SQL/DataFrame to indicate which version the methods/classes were first added.
Author: Reynold Xin <rxin@databricks.com>
Closes#6101 from rxin/tbc and squashes the following commits:
ed55e11 [Reynold Xin] Add since version to all DataFrame methods.
(cherry picked from commit 8fd55358b7)
Signed-off-by: Reynold Xin <rxin@databricks.com>
#5526 uses `Job.getInstance`, which does not exist in the old Hadoop versions. Just use `new Job` to replace it.
cc liancheng
Author: zsxwing <zsxwing@gmail.com>
Closes#6095 from zsxwing/hotfix and squashes the following commits:
b0c2049 [zsxwing] Use the old Job API to support old Hadoop versions
(cherry picked from commit 247b70349c)
Signed-off-by: Cheng Lian <lian@databricks.com>
Remove `Param` and `Params` from `pyspark.ml` and add a section in the doc. brkyvz
Author: Xiangrui Meng <meng@databricks.com>
Closes#6094 from mengxr/SPARK-7572 and squashes the following commits:
022abd6 [Xiangrui Meng] do not import Param/Params under spark.ml
(cherry picked from commit 77f64c736d)
Signed-off-by: Xiangrui Meng <meng@databricks.com>
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes#6099 from tdas/SPARK-7554 and squashes the following commits:
2cd4158 [Tathagata Das] Throw exceptions on attempts to add stuff to active and stopped contexts.
(cherry picked from commit 23f7d66d51)
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
`RankingMetrics` contains a ClassTag, which is hard to create in Java. This PR adds a factory method `of` for Java users. coderxiang
Author: Xiangrui Meng <meng@databricks.com>
Closes#6098 from mengxr/SPARK-7528 and squashes the following commits:
e5d57ae [Xiangrui Meng] make RankingMetrics Java-friendly
(cherry picked from commit 2713bc65af)
Signed-off-by: Xiangrui Meng <meng@databricks.com>
In a REPL/notebook environment, its very easy to lose a reference to a StreamingContext by overriding the variable name. So if you happen to execute the following commands
```
val ssc = new StreamingContext(...) // cmd 1
ssc.start() // cmd 2
...
val ssc = new StreamingContext(...) // accidentally run cmd 1 again
```
The value of ssc will be overwritten. Now you can neither start the new context (as only one context can be started), nor stop the previous context (as the reference is lost).
Hence its best to maintain a singleton reference to the active context, so that we never loose reference for the active context.
Since this problem occurs useful in REPL environments, its best to add this as an Experimental support in the Scala API only so that it can be used in Scala REPLs and notebooks.
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes#6070 from tdas/SPARK-7553 and squashes the following commits:
731c9a1 [Tathagata Das] Fixed style
a797171 [Tathagata Das] Added more unit tests
19fc70b [Tathagata Das] Added :: Experimental :: in docs
64706c9 [Tathagata Das] Fixed test
634db5d [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-7553
3884a25 [Tathagata Das] Fixing test bug
d37a846 [Tathagata Das] Added getActive and getActiveOrCreate
(cherry picked from commit 00e7b09a0b)
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
Minor cleanups discussed with [~mengxr]:
* move OneVsRest from reduction to classification sub-package
* make model constructor private
Some doc cleanups too
CC: harsha2010 Could you please verify this looks OK? Thanks!
Author: Joseph K. Bradley <joseph@databricks.com>
Closes#6097 from jkbradley/onevsrest-cleanup and squashes the following commits:
4ecd48d [Joseph K. Bradley] org imports
430b065 [Joseph K. Bradley] moved OneVsRest from reduction subpackage to classification. small java doc style fixes
9f8b9b9 [Joseph K. Bradley] Small cleanups to OneVsRest. Made model constructor private to ml package.
(cherry picked from commit 96c4846db8)
Signed-off-by: Xiangrui Meng <meng@databricks.com>
Added feature transformer subsection to spark.ml guide, with HashingTF and Tokenizer. Added JavaHashingTFSuite to test Java examples in new guide.
I've run Scala, Python examples in the Spark/PySpark shells. I ran the Java examples via the test suite (with small modifications for printing).
CC: mengxr
Author: Joseph K. Bradley <joseph@databricks.com>
Closes#6093 from jkbradley/hashingtf-guide and squashes the following commits:
d5d213f [Joseph K. Bradley] small fix
dd6e91a [Joseph K. Bradley] fixes from code review of user guide
33c3ff9 [Joseph K. Bradley] small fix
bc6058c [Joseph K. Bradley] fix link
361a174 [Joseph K. Bradley] Added subsection for feature transformers to spark.ml guide, with HashingTF and Tokenizer. Added JavaHashingTFSuite to test Java examples in new guide
(cherry picked from commit f0c1bc3472)
Signed-off-by: Xiangrui Meng <meng@databricks.com>
jira: https://issues.apache.org/jira/browse/SPARK-7496
Update LDA subsection of clustering section of MLlib programming guide to include OnlineLDA.
Author: Yuhao Yang <hhbyyh@gmail.com>
Closes#6046 from hhbyyh/ldaDocument and squashes the following commits:
4b6fbfa [Yuhao Yang] add online paper and some comparison
fd4c983 [Yuhao Yang] update lda document for optimizers
(cherry picked from commit 1d703660d4)
Signed-off-by: Joseph K. Bradley <joseph@databricks.com>
`scala.Math` is deprecated since 2.8. This PR only touchs `Math` usages in MLlib. dbtsai
Author: Xiangrui Meng <meng@databricks.com>
Closes#6092 from mengxr/SPARK-7571 and squashes the following commits:
fe8f8d3 [Xiangrui Meng] Math -> math
(cherry picked from commit a4874b0d18)
Signed-off-by: Xiangrui Meng <meng@databricks.com>
Few jdbc drivers like SybaseIQ support passing username and password only through connection properties. So the same needs to be supported for
SQLContext.jdbc, dataframe.createJDBCTable and dataframe.insertIntoJDBC.
Added as default arguments or overrided function to support backward compatability.
Author: Venkata Ramana Gollamudi <ramana.gollamudi@huawei.com>
Closes#6009 from gvramana/add_jdbc_conn_properties and squashes the following commits:
396a0d0 [Venkata Ramana Gollamudi] fixed comments
d66dd8c [Venkata Ramana Gollamudi] fixed comments
1b8cd8c [Venkata Ramana Gollamudi] Support jdbc connection properties
(cherry picked from commit 455551d1c6)
Signed-off-by: Reynold Xin <rxin@databricks.com>
We make special treatment for +inf in `Bucketizer`. This could be simplified by always including the largest split value in the last bucket. E.g., (x1, x2, x3) defines buckets [x1, x2) and [x2, x3]. This shouldn't affect user code much, and there are applications that need to include the right-most value. For example, we can bucketize ratings from 0 to 10 to bad, neutral, and good with splits 0, 4, 6, 10. It may reads weird if the users need to put 0, 4, 6, 10.1 (or 11).
This also update the impl to use `Arrays.binarySearch` and `withClue` in test.
yinxusen jkbradley
Author: Xiangrui Meng <meng@databricks.com>
Closes#6075 from mengxr/SPARK-7559 and squashes the following commits:
e28f910 [Xiangrui Meng] update bucketizer impl
(cherry picked from commit 23b9863e2a)
Signed-off-by: Joseph K. Bradley <joseph@databricks.com>
…roblem
Pick up newer version of dependency with fix for SPARK-2018. The update involved patching the ning/compress LZF library to handle big endian systems correctly.
Credit goes to gireeshpunathil for diagnosing the problem, and cowtowncoder for fixing it.
Spark tests run clean for me.
Author: Tim Ellison <t.p.ellison@gmail.com>
Closes#6077 from tellison/UpgradeLZF and squashes the following commits:
ad8d4ef [Tim Ellison] [SPARK-2018] [CORE] Upgrade LZF library to fix endian serialization problem
(cherry picked from commit 5438f49ccf)
Signed-off-by: Sean Owen <sowen@cloudera.com>
Added LinearRegression Python API
Author: Burak Yavuz <brkyvz@gmail.com>
Closes#6016 from brkyvz/ml-reg and squashes the following commits:
11c9ef9 [Burak Yavuz] address comments
1027a40 [Burak Yavuz] fix typo
4c699ad [Burak Yavuz] added tree regressor api
8afead2 [Burak Yavuz] made mixin for DT
fa51c74 [Burak Yavuz] save additions
0640d48 [Burak Yavuz] added ml.regression
82aac48 [Burak Yavuz] added linear regression
(cherry picked from commit 8e935b0a21)
Signed-off-by: Xiangrui Meng <meng@databricks.com>
Author: Wenchen Fan <cloud0fan@outlook.com>
Closes#5831 from cloud-fan/7276 and squashes the following commits:
ee4a1e1 [Wenchen Fan] fix rebase mistake
a3b565d [Wenchen Fan] refactor
99deb5d [Wenchen Fan] add test
f1f67ad [Wenchen Fan] fix 7276
(cherry picked from commit 4e290522c2)
Signed-off-by: Michael Armbrust <michael@databricks.com>
This fixes the label bleeding issue described in the JIRA and pictured in the screenshots below. I also took the opportunity to move some code to the places that they belong more to. In particular:
(1) Drawing cluster labels is now implemented in my branch of dagre-d3 instead of in Spark
(2) All graph styling is now moved from Scala to JS
Note that these changes are related because our existing mechanism of "tacking on cluster labels" afterwards isn't flexible enough for us to fix issues like this one easily. For the other half of the changes, visit http://github.com/andrewor14/dagre-d3.
-------------------
**Before.**
<img src="https://cloud.githubusercontent.com/assets/2133137/7582769/b1423440-f845-11e4-8248-b3446a01bf79.png" width="300px"/>
-------------------
**After.**
<img src="https://cloud.githubusercontent.com/assets/2133137/7582742/74891ae6-f845-11e4-96c4-41c7b8aedbdf.png" width="400px"/>
Author: Andrew Or <andrew@databricks.com>
Closes#6076 from andrewor14/dag-viz-bleed and squashes the following commits:
5858d7a [Andrew Or] Merge branch 'master' of github.com:apache/spark into dag-viz-bleed
c686dc4 [Andrew Or] Fix tooltip placement
d908c36 [Andrew Or] Add link to dagre-d3 changes (minor)
4a4fb58 [Andrew Or] Fix bleeding + move all styling to JS
(cherry picked from commit 65697bbeaf)
Signed-off-by: Andrew Or <andrew@databricks.com>
Minor improvement, now we can use `Column` as extraction expression.
Author: Wenchen Fan <cloud0fan@outlook.com>
Closes#6080 from cloud-fan/tmp and squashes the following commits:
0fdefb7 [Wenchen Fan] support column in field accessor
(cherry picked from commit bfcaf8adcd)
Signed-off-by: Reynold Xin <rxin@databricks.com>
This PR adds partitioning support for the external data sources API. It aims to simplify development of file system based data sources, and provide first class partitioning support for both read path and write path. Existing data sources like JSON and Parquet can be simplified with this work.
## New features provided
1. Hive compatible partition discovery
This actually generalizes the partition discovery strategy used in Parquet data source in Spark 1.3.0.
1. Generalized partition pruning optimization
Now partition pruning is handled during physical planning phase. Specific data sources don't need to worry about this harness anymore.
(This also implies that we can remove `CatalystScan` after migrating the Parquet data source, since now we don't need to pass Catalyst expressions to data source implementations.)
1. Insertion with dynamic partitions
When inserting data to a `FSBasedRelation`, data can be partitioned dynamically by specified partition columns.
## New structures provided
### Developer API
1. `FSBasedRelation`
Base abstract class for file system based data sources.
1. `OutputWriter`
Base abstract class for output row writers, responsible for writing a single row object.
1. `FSBasedRelationProvider`
A new relation provider for `FSBasedRelation` subclasses. Note that data sources extending `FSBasedRelation` don't need to extend `RelationProvider` and `SchemaRelationProvider`.
### User API
New overloaded versions of
1. `DataFrame.save()`
1. `DataFrame.saveAsTable()`
1. `SQLContext.load()`
are provided to allow users to save/load DataFrames with user defined dynamic partition columns.
### Spark SQL query planning
1. `InsertIntoFSBasedRelation`
Used to implement write path for `FSBasedRelation`s.
1. New rules for `FSBasedRelation` in `DataSourceStrategy`
These are added to hook `FSBasedRelation` into physical query plan in read path, and perform partition pruning.
## TODO
- [ ] Use scratch directories when overwriting a table with data selected from itself.
Currently, this is not supported, because the table been overwritten is always deleted before writing any data to it.
- [ ] When inserting with dynamic partition columns, use external sorter to group the data first.
This ensures that we only need to open a single `OutputWriter` at a time. For data sources like Parquet, `OutputWriter`s can be quite memory consuming. One issue is that, this approach breaks the row distribution in the original DataFrame. However, we did't promise to preserve data distribution when writing a DataFrame.
- [x] More tests. Specifically, test cases for
- [x] Self-join
- [x] Loading partitioned relations with a subset of partition columns stored in data files.
- [x] `SQLContext.load()` with user defined dynamic partition columns.
## Parquet data source migration
Parquet data source migration is covered in PR https://github.com/liancheng/spark/pull/6, which is against this PR branch and for preview only. A formal PR need to be made after this one is merged.
Author: Cheng Lian <lian@databricks.com>
Closes#5526 from liancheng/partitioning-support and squashes the following commits:
5351a1b [Cheng Lian] Fixes compilation error introduced while rebasing
1f9b1a5 [Cheng Lian] Tweaks data schema passed to FSBasedRelations
43ba50e [Cheng Lian] Avoids serializing generated projection code
edf49e7 [Cheng Lian] Removed commented stale code block
348a922 [Cheng Lian] Adds projection in FSBasedRelation.buildScan(requiredColumns, inputPaths)
ad4d4de [Cheng Lian] Enables HDFS style globbing
8d12e69 [Cheng Lian] Fixes compilation error
c71ac6c [Cheng Lian] Addresses comments from @marmbrus
7552168 [Cheng Lian] Fixes typo in MimaExclude.scala
0349e09 [Cheng Lian] Fixes compilation error introduced while rebasing
52b0c9b [Cheng Lian] Adjusts project/MimaExclude.scala
c466de6 [Cheng Lian] Addresses comments
bc3f9b4 [Cheng Lian] Uses projection to separate partition columns and data columns while inserting rows
795920a [Cheng Lian] Fixes compilation error after rebasing
0b8cd70 [Cheng Lian] Adds Scala/Catalyst row conversion when writing non-partitioned tables
fa543f3 [Cheng Lian] Addresses comments
5849dd0 [Cheng Lian] Fixes doc typos. Fixes partition discovery refresh.
51be443 [Cheng Lian] Replaces FSBasedRelation.outputCommitterClass with FSBasedRelation.prepareForWrite
c4ed4fe [Cheng Lian] Bug fixes and a new test suite
a29e663 [Cheng Lian] Bug fix: should only pass actuall data files to FSBaseRelation.buildScan
5f423d3 [Cheng Lian] Bug fixes. Lets data source to customize OutputCommitter rather than OutputFormat
54c3d7b [Cheng Lian] Enforces that FileOutputFormat must be used
be0c268 [Cheng Lian] Uses TaskAttempContext rather than Configuration in OutputWriter.init
0bc6ad1 [Cheng Lian] Resorts to new Hadoop API, and now FSBasedRelation can customize output format class
f320766 [Cheng Lian] Adds prepareForWrite() hook, refactored writer containers
422ff4a [Cheng Lian] Fixes style issue
ce52353 [Cheng Lian] Adds new SQLContext.load() overload with user defined dynamic partition columns
8d2ff71 [Cheng Lian] Merges partition columns when reading partitioned relations
ca1805b [Cheng Lian] Removes duplicated partition discovery code in new Parquet
f18dec2 [Cheng Lian] More strict schema checking
b746ab5 [Cheng Lian] More tests
9b487bf [Cheng Lian] Fixes compilation errors introduced while rebasing
ea6c8dd [Cheng Lian] Removes remote debugging stuff
327bb1d [Cheng Lian] Implements partitioning support for data sources API
3c5073a [Cheng Lian] Fixes SaveModes used in test cases
fb5a607 [Cheng Lian] Fixes compilation error
9d17607 [Cheng Lian] Adds the contract that OutputWriter should have zero-arg constructor
5de194a [Cheng Lian] Forgot Apache licence header
95d0b4d [Cheng Lian] Renames PartitionedSchemaRelationProvider to FSBasedRelationProvider
770b5ba [Cheng Lian] Adds tests for FSBasedRelation
3ba9bbf [Cheng Lian] Adds DataFrame.saveAsTable() overrides which support partitioning
1b8231f [Cheng Lian] Renames FSBasedPrunedFilteredScan to FSBasedRelation
aa8ba9a [Cheng Lian] Javadoc fix
012ed2d [Cheng Lian] Adds PartitioningOptions
7dd8dd5 [Cheng Lian] Adds new interfaces and stub methods for data sources API partitioning support
(cherry picked from commit 0595b6de8f)
Signed-off-by: Cheng Lian <lian@databricks.com>
Author: Wenchen Fan <cloud0fan@outlook.com>
Closes#6079 from cloud-fan/unapply and squashes the following commits:
40da442 [Wenchen Fan] one more
7d90a05 [Wenchen Fan] cleanup unapply in DataTypes
(cherry picked from commit 831504cf6b)
Signed-off-by: Reynold Xin <rxin@databricks.com>
Author: Daoyuan Wang <daoyuan.wang@intel.com>
Closes#6003 from adrian-wang/pynareplace and squashes the following commits:
672efba [Daoyuan Wang] remove py2.7 feature
4a148f7 [Daoyuan Wang] to_replace support dict, value support single value, and add full tests
9e232e7 [Daoyuan Wang] rename scala map
af0268a [Daoyuan Wang] remove na
63ac579 [Daoyuan Wang] add na.replace in pyspark
(cherry picked from commit d86ce84584)
Signed-off-by: Reynold Xin <rxin@databricks.com>