This integrates the Interaction feature transformer with SparkR R formula support (i.e. support `:`).
To generate reasonable ML attribute names for feature interactions, it was necessary to add the ability to read attribute the original attribute names back from `StructField`, and also to specify custom group prefixes in `VectorAssembler`. This also has the side-benefit of cleaning up the double-underscores in the attributes generated for non-interaction terms.
mengxr
Author: Eric Liang <ekl@databricks.com>
Closes#8830 from ericl/interaction-2.
This makes two changes:
- Allow reduce tasks to fetch multiple map output partitions -- this is a pretty small change to HashShuffleFetcher
- Move shuffle locality computation out of DAGScheduler and into ShuffledRDD / MapOutputTracker; this was needed because the code in DAGScheduler wouldn't work for RDDs that fetch multiple map output partitions from each reduce task
I also added an AdaptiveSchedulingSuite that creates RDDs depending on multiple map output partitions.
Author: Matei Zaharia <matei@databricks.com>
Closes#8844 from mateiz/spark-9852.
The DiskBlockObjectWriter constructor took a BlockId parameter but never used it. As part of some general cleanup in these interfaces, this patch refactors its constructor to eliminate this parameter.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#8871 from JoshRosen/disk-block-object-writer-blockid-cleanup.
JIRA: https://issues.apache.org/jira/browse/SPARK-10705
As described in the JIRA ticket, `DataFrame.toJSON` uses `DataFrame.mapPartitions`, which converts internal rows to external rows. We should use `queryExecution.toRdd.mapPartitions` that directly uses internal rows for better performance.
Author: Liang-Chi Hsieh <viirya@appier.com>
Closes#8865 from viirya/df-tojson-internalrow.
As introduced in https://issues.apache.org/jira/browse/SPARK-10630 we now have an easier way to create dataframes from local Java lists. Lets update the tests to use those.
Author: Holden Karau <holden@pigscanfly.ca>
Closes#8886 from holdenk/SPARK-10763-update-java-mllib-ml-tests-to-use-simplified-dataframe-construction.
Slightly modified version of #8818, all credit goes to zsxwing
Author: zsxwing <zsxwing@gmail.com>
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes#8892 from tdas/SPARK-10692.
This patch reverts most of the changes in a previous fix#8827.
The real cause of the issue is that in `TungstenAggregate`'s prepare method we only reserve 1 page, but later when we switch to sort-based aggregation we try to acquire 1 page AND a pointer array. The longer-term fix should be to reserve also the pointer array, but for now ***we will simply not track the pointer array***. (Note that elsewhere we already don't track the pointer array, e.g. [here](a18208047f/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java (L88)))
Note: This patch reuses the unit test added in #8827 so it doesn't show up in the diff.
Author: Andrew Or <andrew@databricks.com>
Closes#8888 from andrewor14/dont-track-pointer-array.
Python DataFrame.head/take now requires scanning all the partitions. This pull request changes them to delegate the actual implementation to Scala DataFrame (by calling DataFrame.take).
This is more of a hack for fixing this issue in 1.5.1. A more proper fix is to change executeCollect and executeTake to return InternalRow rather than Row, and thus eliminate the extra round-trip conversion.
Author: Reynold Xin <rxin@databricks.com>
Closes#8876 from rxin/SPARK-10731.
Currently use can set ```checkpointInterval``` to specify how often should the cache be check-pointed. But we also need the function that users can disable it. This PR supports that users can disable checkpoint if user setting ```checkpointInterval = -1```.
We also add documents for GBT ```cacheNodeIds``` to make users can understand more clearly about checkpoint.
Author: Yanbo Liang <ybliang8@gmail.com>
Closes#8820 from yanboliang/spark-10699.
By default ```quantilesCol``` should be empty. If ```quantileProbabilities``` is set, we should append quantiles as a new column (of type Vector).
Author: Yanbo Liang <ybliang8@gmail.com>
Closes#8836 from yanboliang/spark-10686.
All prediction models should store `numFeatures` indicating the number of features the model was trained on. Default value of -1 added for backwards compatibility.
Author: sethah <seth.hendrickson16@gmail.com>
Closes#8675 from sethah/SPARK-9715.
This patch attempts to fix an issue where Spark SQL's UnsafeRowSerializer was incompatible with the `tungsten-sort` ShuffleManager.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#8873 from JoshRosen/SPARK-10403.
Fixed the following failure in https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/1787/testReport/junit/org.apache.spark.streaming/CheckpointSuite/recovery_maintains_rate_controller/
```
sbt.ForkMain$ForkError: The code passed to eventually never returned normally. Attempted 660 times over 10.000044392000001 seconds. Last failure message: 9223372036854775807 did not equal 200.
at org.scalatest.concurrent.Eventually$class.tryTryAgain$1(Eventually.scala:420)
at org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:438)
at org.scalatest.concurrent.Eventually$.eventually(Eventually.scala:478)
at org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:336)
at org.scalatest.concurrent.Eventually$.eventually(Eventually.scala:478)
at org.apache.spark.streaming.CheckpointSuite$$anonfun$15.apply$mcV$sp(CheckpointSuite.scala:413)
at org.apache.spark.streaming.CheckpointSuite$$anonfun$15.apply(CheckpointSuite.scala:396)
at org.apache.spark.streaming.CheckpointSuite$$anonfun$15.apply(CheckpointSuite.scala:396)
at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
at org.scalatest.Transformer.apply(Transformer.scala:22)
```
In this test, it calls `advanceTimeWithRealDelay(ssc, 2)` to run two batch jobs. However, one race condition is these two jobs can finish before the receiver is registered. Then `UpdateRateLimit` won't be sent to the receiver and `getDefaultBlockGeneratorRateLimit` cannot be updated.
Here are the logs related to this issue:
```
15/09/22 19:28:26.154 pool-1-thread-1-ScalaTest-running-CheckpointSuite INFO CheckpointSuite: Manual clock before advancing = 2500
15/09/22 19:28:26.869 JobScheduler INFO JobScheduler: Finished job streaming job 3000 ms.0 from job set of time 3000 ms
15/09/22 19:28:26.869 JobScheduler INFO JobScheduler: Total delay: 1442975303.869 s for time 3000 ms (execution: 0.711 s)
15/09/22 19:28:26.873 JobScheduler INFO JobScheduler: Finished job streaming job 3500 ms.0 from job set of time 3500 ms
15/09/22 19:28:26.873 JobScheduler INFO JobScheduler: Total delay: 1442975303.373 s for time 3500 ms (execution: 0.004 s)
15/09/22 19:28:26.879 sparkDriver-akka.actor.default-dispatcher-3 INFO ReceiverTracker: Registered receiver for stream 0 from localhost:57749
15/09/22 19:28:27.154 pool-1-thread-1-ScalaTest-running-CheckpointSuite INFO CheckpointSuite: Manual clock after advancing = 3500
```
`advanceTimeWithRealDelay(ssc, 2)` triggered job 3000ms and 3500ms but the receiver was registered after job 3000ms and 3500ms finished.
So we should make sure the receiver online before running `advanceTimeWithRealDelay(ssc, 2)`.
Author: zsxwing <zsxwing@gmail.com>
Closes#8877 from zsxwing/SPARK-10769.
`blockIntervalTimer.stop(interruptTimer = false)` doesn't guarantee calling `updateCurrentBuffer`. So it's possible that `blockIntervalTimer` will exit when `updateCurrentBuffer` is not empty. Then the data in `currentBuffer` will be lost.
To reproduce it, you can add `Thread.sleep(200)` in this line (69c9c17716/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala (L100)) and run `StreamingContexSuite`.
I cannot write a unit test to reproduce it because I cannot find an approach to force `RecurringTimer` suspend at this line for a few milliseconds.
There was a failure in Jenkins here: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/41455/console
This PR updates RecurringTimer to make sure `stop(interruptTimer = false)` will call `callback` at least once after the `stop` method is called.
Author: zsxwing <zsxwing@gmail.com>
Closes#8417 from zsxwing/SPARK-10224.
The Scala example under the "Example: Pipeline" heading in this
document initializes the "test" variable to a DataFrame. Because test
is already a DF, there is not need to call test.toDF as the example
does in a subsequent line: model.transform(test.toDF). So, I removed
the extraneous toDF invocation.
Author: Matt Hagen <anonz3000@gmail.com>
Closes#8875 from hagenhaus/SPARK-10663.
**Please attribute this PR to `Zhichao Li <zhichao.liintel.com>`.**
This PR is based on PR #8476 authored by zhichao-li. It fixes SPARK-10310 by adding field delimiter SerDe property to the default `LazySimpleSerDe`, and enabling default record reader/writer classes.
Currently, we only support `LazySimpleSerDe`, used together with `TextRecordReader` and `TextRecordWriter`, and don't support customizing record reader/writer using `RECORDREADER`/`RECORDWRITER` clauses. This should be addressed in separate PR(s).
Author: Cheng Lian <lian@databricks.com>
Closes#8860 from liancheng/spark-10310/fix-script-trans-delimiters.
This patch refactors Python UDF handling:
1. Extract the per-partition Python UDF calling logic from PythonRDD into a PythonRunner. PythonRunner itself expects iterator as input/output, and thus has no dependency on RDD. This way, we can use PythonRunner directly in a mapPartitions call, or in the future in an environment without RDDs.
2. Use PythonRunner in Spark SQL's BatchPythonEvaluation.
3. Updated BatchPythonEvaluation to only use its input once, rather than twice. This should fix Python UDF performance regression in Spark 1.5.
There are a number of small cleanups I wanted to do when I looked at the code, but I kept most of those out so the diff looks small.
This basically implements the approach in https://github.com/apache/spark/pull/8833, but with some code moving around so the correctness doesn't depend on the inner workings of Spark serialization and task execution.
Author: Reynold Xin <rxin@databricks.com>
Closes#8835 from rxin/python-iter-refactor.
https://issues.apache.org/jira/browse/SPARK-10672
With changes in this PR, we will fallback to same the metadata of a table in Spark SQL specific way if we fail to save it in a hive compatible way (Hive throws an exception because of its internal restrictions, e.g. binary and decimal types cannot be saved to parquet if the metastore is running Hive 0.13). I manually tested the fix with the following test in `DataSourceWithHiveMetastoreCatalogSuite` (`spark.sql.hive.metastore.version=0.13` and `spark.sql.hive.metastore.jars`=`maven`).
```
test(s"fail to save metadata of a parquet table in hive 0.13") {
withTempPath { dir =>
withTable("t") {
val path = dir.getCanonicalPath
sql(
s"""CREATE TABLE t USING $provider
|OPTIONS (path '$path')
|AS SELECT 1 AS d1, cast("val_1" as binary) AS d2
""".stripMargin)
sql(
s"""describe formatted t
""".stripMargin).collect.foreach(println)
sqlContext.table("t").show
}
}
}
}
```
Without this fix, we will fail with the following error.
```
org.apache.hadoop.hive.ql.metadata.HiveException: java.lang.UnsupportedOperationException: Unknown field type: binary
at org.apache.hadoop.hive.ql.metadata.Hive.createTable(Hive.java:619)
at org.apache.hadoop.hive.ql.metadata.Hive.createTable(Hive.java:576)
at org.apache.spark.sql.hive.client.ClientWrapper$$anonfun$createTable$1.apply$mcV$sp(ClientWrapper.scala:359)
at org.apache.spark.sql.hive.client.ClientWrapper$$anonfun$createTable$1.apply(ClientWrapper.scala:357)
at org.apache.spark.sql.hive.client.ClientWrapper$$anonfun$createTable$1.apply(ClientWrapper.scala:357)
at org.apache.spark.sql.hive.client.ClientWrapper$$anonfun$withHiveState$1.apply(ClientWrapper.scala:256)
at org.apache.spark.sql.hive.client.ClientWrapper.retryLocked(ClientWrapper.scala:211)
at org.apache.spark.sql.hive.client.ClientWrapper.withHiveState(ClientWrapper.scala:248)
at org.apache.spark.sql.hive.client.ClientWrapper.createTable(ClientWrapper.scala:357)
at org.apache.spark.sql.hive.HiveMetastoreCatalog.createDataSourceTable(HiveMetastoreCatalog.scala:358)
at org.apache.spark.sql.hive.execution.CreateMetastoreDataSourceAsSelect.run(commands.scala:285)
at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57)
at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57)
at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:69)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:58)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:58)
at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:144)
at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:129)
at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:51)
at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:725)
at org.apache.spark.sql.test.SQLTestUtils$$anonfun$sql$1.apply(SQLTestUtils.scala:56)
at org.apache.spark.sql.test.SQLTestUtils$$anonfun$sql$1.apply(SQLTestUtils.scala:56)
at org.apache.spark.sql.hive.DataSourceWithHiveMetastoreCatalogSuite$$anonfun$4$$anonfun$apply$1$$anonfun$apply$mcV$sp$2$$anonfun$apply$2.apply$mcV$sp(HiveMetastoreCatalogSuite.scala:165)
at org.apache.spark.sql.test.SQLTestUtils$class.withTable(SQLTestUtils.scala:150)
at org.apache.spark.sql.hive.DataSourceWithHiveMetastoreCatalogSuite.withTable(HiveMetastoreCatalogSuite.scala:52)
at org.apache.spark.sql.hive.DataSourceWithHiveMetastoreCatalogSuite$$anonfun$4$$anonfun$apply$1$$anonfun$apply$mcV$sp$2.apply(HiveMetastoreCatalogSuite.scala:162)
at org.apache.spark.sql.hive.DataSourceWithHiveMetastoreCatalogSuite$$anonfun$4$$anonfun$apply$1$$anonfun$apply$mcV$sp$2.apply(HiveMetastoreCatalogSuite.scala:161)
at org.apache.spark.sql.test.SQLTestUtils$class.withTempPath(SQLTestUtils.scala:125)
at org.apache.spark.sql.hive.DataSourceWithHiveMetastoreCatalogSuite.withTempPath(HiveMetastoreCatalogSuite.scala:52)
at org.apache.spark.sql.hive.DataSourceWithHiveMetastoreCatalogSuite$$anonfun$4$$anonfun$apply$1.apply$mcV$sp(HiveMetastoreCatalogSuite.scala:161)
at org.apache.spark.sql.hive.DataSourceWithHiveMetastoreCatalogSuite$$anonfun$4$$anonfun$apply$1.apply(HiveMetastoreCatalogSuite.scala:161)
at org.apache.spark.sql.hive.DataSourceWithHiveMetastoreCatalogSuite$$anonfun$4$$anonfun$apply$1.apply(HiveMetastoreCatalogSuite.scala:161)
at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
at org.scalatest.Transformer.apply(Transformer.scala:22)
at org.scalatest.Transformer.apply(Transformer.scala:20)
at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:166)
at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:42)
at org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:163)
at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175)
at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175)
at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:175)
at org.scalatest.FunSuite.runTest(FunSuite.scala:1555)
at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208)
at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208)
at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:413)
at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:401)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:396)
at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:483)
at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:208)
at org.scalatest.FunSuite.runTests(FunSuite.scala:1555)
at org.scalatest.Suite$class.run(Suite.scala:1424)
at org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1555)
at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212)
at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212)
at org.scalatest.SuperEngine.runImpl(Engine.scala:545)
at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:212)
at org.apache.spark.sql.hive.DataSourceWithHiveMetastoreCatalogSuite.org$scalatest$BeforeAndAfterAll$$super$run(HiveMetastoreCatalogSuite.scala:52)
at org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:257)
at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:256)
at org.apache.spark.sql.hive.DataSourceWithHiveMetastoreCatalogSuite.run(HiveMetastoreCatalogSuite.scala:52)
at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:462)
at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:671)
at sbt.ForkMain$Run$2.call(ForkMain.java:294)
at sbt.ForkMain$Run$2.call(ForkMain.java:284)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.UnsupportedOperationException: Unknown field type: binary
at org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector.getObjectInspector(ArrayWritableObjectInspector.java:108)
at org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector.<init>(ArrayWritableObjectInspector.java:60)
at org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe.initialize(ParquetHiveSerDe.java:113)
at org.apache.hadoop.hive.metastore.MetaStoreUtils.getDeserializer(MetaStoreUtils.java:339)
at org.apache.hadoop.hive.ql.metadata.Table.getDeserializerFromMetaStore(Table.java:288)
at org.apache.hadoop.hive.ql.metadata.Table.checkValidity(Table.java:194)
at org.apache.hadoop.hive.ql.metadata.Hive.createTable(Hive.java:597)
... 76 more
```
Author: Yin Huai <yhuai@databricks.com>
Closes#8824 from yhuai/datasourceMetadata.
The current shuffle code has an interface named ShuffleReader with only one implementation, HashShuffleReader. This naming is confusing, since the same read path code is used for both sort- and hash-based shuffle. This patch addresses this by renaming HashShuffleReader to BlockStoreShuffleReader.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#8825 from JoshRosen/shuffle-reader-cleanup.
If we cache the InputFormat, all tasks on the same executor will share it.
Some InputFormat is thread safety, but some are not, such as HiveHBaseTableInputFormat. If tasks share a non thread safe InputFormat, unexpected error may be occurs.
To avoid it, I think we should delete the input format caching.
Author: xutingjun <xutingjun@huawei.com>
Author: meiyoula <1039320815@qq.com>
Author: Xutingjun <xutingjun@huawei.com>
Closes#7918 from XuTingjun/cached_inputFormat.
Currently when you set illegal value for params of array type (such as IntArrayParam, DoubleArrayParam, StringArrayParam), it will throw IllegalArgumentException but with incomprehensible error information.
Take ```VectorSlicer.setNames``` as an example:
```scala
val vectorSlicer = new VectorSlicer().setInputCol("features").setOutputCol("result")
// The value of setNames must be contain distinct elements, so the next line will throw exception.
vectorSlicer.setIndices(Array.empty).setNames(Array("f1", "f4", "f1"))
```
It will throw IllegalArgumentException as:
```
vectorSlicer_b3b4d1a10f43 parameter names given invalid value [Ljava.lang.String;798256c5.
java.lang.IllegalArgumentException: vectorSlicer_b3b4d1a10f43 parameter names given invalid value [Ljava.lang.String;798256c5.
```
We should distinguish the value of array type from primitive type at Param.validate(value: T), and we will get better error information.
```
vectorSlicer_3b744ea277b2 parameter names given invalid value [f1,f4,f1].
java.lang.IllegalArgumentException: vectorSlicer_3b744ea277b2 parameter names given invalid value [f1,f4,f1].
```
Author: Yanbo Liang <ybliang8@gmail.com>
Closes#8863 from yanboliang/spark-10750.
NodeIdCache: prevNodeIdsForInstances.unpersist() needs to be called at end of training.
Author: Holden Karau <holden@pigscanfly.ca>
Closes#8541 from holdenk/SPARK-9962-decission-tree-training-prevNodeIdsForiNstances-unpersist-at-end-of-training.
JIRA: https://issues.apache.org/jira/browse/SPARK-10446
Currently the method `join(right: DataFrame, usingColumns: Seq[String])` only supports inner join. It is more convenient to have it support other join types.
Author: Liang-Chi Hsieh <viirya@appier.com>
Closes#8600 from viirya/usingcolumns_df.
Reading from Microsoft SQL Server over jdbc fails when the table contains datetimeoffset types.
This patch registers a SQLServer JDBC Dialect that maps datetimeoffset to a String, as Microsoft suggest.
Author: Ewan Leith <ewan.leith@realitymine.com>
Closes#8575 from realitymine-coordinator/sqlserver.
Remove ._SUCCESS.crc hidden file that may cause problems in distribution tar archive, and is not used
Author: Sean Owen <sowen@cloudera.com>
Closes#8846 from srowen/SPARK-10716.
from the issue:
In Scala, I can supply a custom partitioner to reduceByKey (and other aggregation/repartitioning methods like aggregateByKey and combinedByKey), but as far as I can tell from the Pyspark API, there's no way to do the same in Python.
Here's an example of my code in Scala:
weblogs.map(s => (getFileType(s), 1)).reduceByKey(new FileTypePartitioner(),_+_)
But I can't figure out how to do the same in Python. The closest I can get is to call repartition before reduceByKey like so:
weblogs.map(lambda s: (getFileType(s), 1)).partitionBy(3,hash_filetype).reduceByKey(lambda v1,v2: v1+v2).collect()
But that defeats the purpose, because I'm shuffling twice instead of once, so my performance is worse instead of better.
Author: Holden Karau <holden@pigscanfly.ca>
Closes#8569 from holdenk/SPARK-9821-pyspark-reduceByKey-should-take-a-custom-partitioner.
In ```RUtils.sparkRPackagePath()``` we
1. Call ``` sys.props("spark.submit.deployMode")``` which returns null if ```spark.submit.deployMode``` is not suet
2. Call ``` sparkConf.get("spark.submit.deployMode")``` which throws ```NoSuchElementException``` if ```spark.submit.deployMode``` is not set. This patch simply passes a default value ("cluster") for ```spark.submit.deployMode```.
cc rxin
Author: Hossein <hossein@databricks.com>
Closes#8832 from falaki/SPARK-10711.
The job group, and job descriptions information is passed through thread local properties, and get inherited by child threads. In case of spark streaming, the streaming jobs inherit these properties from the thread that called streamingContext.start(). This may not make sense.
1. Job group: This is mainly used for cancelling a group of jobs together. It does not make sense to cancel streaming jobs like this, as the effect will be unpredictable. And its not a valid usecase any way, to cancel a streaming context, call streamingContext.stop()
2. Job description: This is used to pass on nice text descriptions for jobs to show up in the UI. The job description of the thread that calls streamingContext.start() is not useful for all the streaming jobs, as it does not make sense for all of the streaming jobs to have the same description, and the description may or may not be related to streaming.
The solution in this PR is meant for the Spark master branch, where local properties are inherited by cloning the properties. The job group and job description in the thread that starts the streaming scheduler are explicitly removed, so that all the subsequent child threads does not inherit them. Also, the starting is done in a new child thread, so that setting the job group and description for streaming, does not change those properties in the thread that called streamingContext.start().
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes#8781 from tdas/SPARK-10649.
It would be nice to support creating a DataFrame directly from a Java List of Row.
Author: Holden Karau <holden@pigscanfly.ca>
Closes#8779 from holdenk/SPARK-10630-create-DataFrame-from-Java-List.