## What changes were proposed in this pull request?
Currently shuffle repartition uses RoundRobinPartitioning, the generated result is nondeterministic since the sequence of input rows are not determined.
The bug can be triggered when there is a repartition call following a shuffle (which would lead to non-deterministic row ordering), as the pattern shows below:
upstream stage -> repartition stage -> result stage
(-> indicate a shuffle)
When one of the executors process goes down, some tasks on the repartition stage will be retried and generate inconsistent ordering, and some tasks of the result stage will be retried generating different data.
The following code returns 931532, instead of 1000000:
```
import scala.sys.process._
import org.apache.spark.TaskContext
val res = spark.range(0, 1000 * 1000, 1).repartition(200).map { x =>
x
}.repartition(200).map { x =>
if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 2) {
throw new Exception("pkill -f java".!!)
}
x
}
res.distinct().count()
```
In this PR, we propose a most straight-forward way to fix this problem by performing a local sort before partitioning, after we make the input row ordering deterministic, the function from rows to partitions is fully deterministic too.
The downside of the approach is that with extra local sort inserted, the performance of repartition() will go down, so we add a new config named `spark.sql.execution.sortBeforeRepartition` to control whether this patch is applied. The patch is default enabled to be safe-by-default, but user may choose to manually turn it off to avoid performance regression.
This patch also changes the output rows ordering of repartition(), that leads to a bunch of test cases failure because they are comparing the results directly.
## How was this patch tested?
Add unit test in ExchangeSuite.
With this patch(and `spark.sql.execution.sortBeforeRepartition` set to true), the following query returns 1000000:
```
import scala.sys.process._
import org.apache.spark.TaskContext
spark.conf.set("spark.sql.execution.sortBeforeRepartition", "true")
val res = spark.range(0, 1000 * 1000, 1).repartition(200).map { x =>
x
}.repartition(200).map { x =>
if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 2) {
throw new Exception("pkill -f java".!!)
}
x
}
res.distinct().count()
res7: Long = 1000000
```
Author: Xingbo Jiang <xingbo.jiang@databricks.com>
Closes#20393 from jiangxb1987/shuffle-repartition.
## What changes were proposed in this pull request?
`ColumnVector` is very flexible about how to implement array type. As a result `ColumnVector` has 3 abstract methods for array type: `arrayData`, `getArrayOffset`, `getArrayLength`. For example, in `WritableColumnVector` we use the first child vector as the array data vector, and store offsets and lengths in 2 arrays in the parent vector. `ArrowColumnVector` has a different implementation.
This PR simplifies `ColumnVector` by using only one abstract method for array type: `getArray`.
## How was this patch tested?
existing tests.
rerun `ColumnarBatchBenchmark`, there is no performance regression.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#20395 from cloud-fan/vector.
## What changes were proposed in this pull request?
The code logic between `MemoryStore.putIteratorAsValues` and `Memory.putIteratorAsBytes` are almost same, so we should reduce the duplicate code between them.
## How was this patch tested?
Existing UT.
Author: Xianyang Liu <xianyang.liu@intel.com>
Closes#19285 from ConeyLiu/rmemorystore.
## What changes were proposed in this pull request?
Bucketizer support multi-column in the python side
## How was this patch tested?
existing tests and added tests
Author: Zheng RuiFeng <ruifengz@foxmail.com>
Closes#19892 from zhengruifeng/20542_py.
## What changes were proposed in this pull request?
Currently there is a mixed situation when both single- and multi-column are supported. In some cases exceptions are thrown, in others only a warning log is emitted. In this discussion https://issues.apache.org/jira/browse/SPARK-8418?focusedCommentId=16275049&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16275049, the decision was to throw an exception.
The PR throws an exception in `Bucketizer`, instead of logging a warning.
## How was this patch tested?
modified UT
Author: Marco Gaido <marcogaido91@gmail.com>
Author: Joseph K. Bradley <joseph@databricks.com>
Closes#19993 from mgaido91/SPARK-22799.
## What changes were proposed in this pull request?
When using the Kubernetes cluster-manager and spawning a Streaming workload, it is important to reset many spark.kubernetes.* properties that are generated by spark-submit but which would get rewritten when restoring a Checkpoint. This is so, because the spark-submit codepath creates Kubernetes resources, such as a ConfigMap, a Secret and other variables, which have an autogenerated name and the previous one will not resolve anymore.
In short, this change enables checkpoint restoration for streaming workloads, and thus enables Spark Streaming workloads in Kubernetes, which were not possible to restore from a checkpoint before if the workload went down.
## How was this patch tested?
This patch was tested with the twitter-streaming example in AWS, using checkpoints in s3 with the s3a:// protocol, as supported by Hadoop.
This is similar to the YARN related code for resetting a Spark Streaming workload, but for the Kubernetes scheduler. I'm adding the initcontainers properties because even if the discussion is not completely settled on the mailing list, my understanding is that at this moment they are going forward for the moment.
For a previous discussion, see the non-rebased work at: https://github.com/apache-spark-on-k8s/spark/pull/516
Author: Santiago Saavedra <ssaavedra@openshine.com>
Closes#20383 from ssaavedra/fix-k8s-checkpointing.
Third time is the charm?
There was still a race that was left in previous attempts. If the handle
closes the connection, the close() implementation would clean up state
that would prevent the thread from waiting on the connection thread to
finish. That could cause the race causing the test flakiness reported
in the bug.
The fix is to move the "wait for connection thread" code to a separate
close method that is used by the handle; that also simplifies the code
a bit and makes it also easier to follow.
I included an unrelated, but correct, change to a YARN test so that
it triggers when the PR is built.
Tested by inserting a sleep in the connection thread to mimic the race;
test failed reliably with the sleep, passes now. (Sleep not included in
the patch.) Also ran YARN tests to make sure.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#20388 from vanzin/SPARK-23020.
## What changes were proposed in this pull request?
When parsing raw image data in ImageSchema.decode(), we use a [java.awt.Color](https://docs.oracle.com/javase/7/docs/api/java/awt/Color.html#Color(int)) constructor that sets alpha = 255, even for four-channel images (which may have different alpha values). This PR fixes this issue & adds a unit test to verify correctness of reading four-channel images.
## How was this patch tested?
Updates an existing unit test ("readImages pixel values test" in `ImageSchemaSuite`) to also verify correctness when reading a four-channel image.
Author: Sid Murching <sid.murching@databricks.com>
Closes#20389 from smurching/image-schema-bugfix.
## What changes were proposed in this pull request?
**Proposal**
Add a per-query ID to the codegen stages as represented by `WholeStageCodegenExec` operators. This ID will be used in
- the explain output of the physical plan, and in
- the generated class name.
Specifically, this ID will be stable within a query, counting up from 1 in depth-first post-order for all the `WholeStageCodegenExec` inserted into a plan.
The ID value 0 is reserved for "free-floating" `WholeStageCodegenExec` objects, which may have been created for one-off purposes, e.g. for fallback handling of codegen stages that failed to codegen the whole stage and wishes to codegen a subset of the children operators (as seen in `org.apache.spark.sql.execution.FileSourceScanExec#doExecute`).
Example: for the following query:
```scala
scala> spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 1)
scala> val df1 = spark.range(10).select('id as 'x, 'id + 1 as 'y).orderBy('x).select('x + 1 as 'z, 'y)
df1: org.apache.spark.sql.DataFrame = [z: bigint, y: bigint]
scala> val df2 = spark.range(5)
df2: org.apache.spark.sql.Dataset[Long] = [id: bigint]
scala> val query = df1.join(df2, 'z === 'id)
query: org.apache.spark.sql.DataFrame = [z: bigint, y: bigint ... 1 more field]
```
The explain output before the change is:
```scala
scala> query.explain
== Physical Plan ==
*SortMergeJoin [z#9L], [id#13L], Inner
:- *Sort [z#9L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(z#9L, 200)
: +- *Project [(x#3L + 1) AS z#9L, y#4L]
: +- *Sort [x#3L ASC NULLS FIRST], true, 0
: +- Exchange rangepartitioning(x#3L ASC NULLS FIRST, 200)
: +- *Project [id#0L AS x#3L, (id#0L + 1) AS y#4L]
: +- *Range (0, 10, step=1, splits=8)
+- *Sort [id#13L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(id#13L, 200)
+- *Range (0, 5, step=1, splits=8)
```
Note how codegen'd operators are annotated with a prefix `"*"`. See how the `SortMergeJoin` operator and its direct children `Sort` operators are adjacent and all annotated with the `"*"`, so it's hard to tell they're actually in separate codegen stages.
and after this change it'll be:
```scala
scala> query.explain
== Physical Plan ==
*(6) SortMergeJoin [z#9L], [id#13L], Inner
:- *(3) Sort [z#9L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(z#9L, 200)
: +- *(2) Project [(x#3L + 1) AS z#9L, y#4L]
: +- *(2) Sort [x#3L ASC NULLS FIRST], true, 0
: +- Exchange rangepartitioning(x#3L ASC NULLS FIRST, 200)
: +- *(1) Project [id#0L AS x#3L, (id#0L + 1) AS y#4L]
: +- *(1) Range (0, 10, step=1, splits=8)
+- *(5) Sort [id#13L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(id#13L, 200)
+- *(4) Range (0, 5, step=1, splits=8)
```
Note that the annotated prefix becomes `"*(id) "`. See how the `SortMergeJoin` operator and its direct children `Sort` operators have different codegen stage IDs.
It'll also show up in the name of the generated class, as a suffix in the format of `GeneratedClass$GeneratedIterator$id`.
For example, note how `GeneratedClass$GeneratedIteratorForCodegenStage3` and `GeneratedClass$GeneratedIteratorForCodegenStage6` in the following stack trace corresponds to the IDs shown in the explain output above:
```
"Executor task launch worker for task 42412957" daemon prio=5 tid=0x58 nid=NA runnable
java.lang.Thread.State: RUNNABLE
at org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:109)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.sort_addToSorter$(generated.java:32)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(generated.java:41)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$9$$anon$1.hasNext(WholeStageCodegenExec.scala:494)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage6.findNextInnerJoinRows$(generated.java:42)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage6.processNext(generated.java:101)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$2.hasNext(WholeStageCodegenExec.scala:513)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:828)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:828)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
```
**Rationale**
Right now, the codegen from Spark SQL lacks the means to differentiate between a couple of things:
1. It's hard to tell which physical operators are in the same WholeStageCodegen stage. Note that this "stage" is a separate notion from Spark's RDD execution stages; this one is only to delineate codegen units.
There can be adjacent physical operators that are both codegen'd but are in separate codegen stages. Some of this is due to hacky implementation details, such as the case with `SortMergeJoin` and its `Sort` inputs -- they're hard coded to be split into separate stages although both are codegen'd.
When printing out the explain output of the physical plan, you'd only see the codegen'd physical operators annotated with a preceding star (`'*'`) but would have no way to figure out if they're in the same stage.
2. Performance/error diagnosis
The generated code has class/method names that are hard to differentiate between queries or even between codegen stages within the same query. If we use a Java-level profiler to collect profiles, or if we encounter a Java-level exception with a stack trace in it, it's really hard to tell which part of a query it's at.
By introducing a per-query codegen stage ID, we'd at least be able to know which codegen stage (and in turn, which group of physical operators) was a profile tick or an exception happened.
The reason why this proposal uses a per-query ID is because it's stable within a query, so that multiple runs of the same query will see the same resulting IDs. This both benefits understandability for users, and also it plays well with the codegen cache in Spark SQL which uses the generated source code as the key.
The downside to using per-query IDs as opposed to a per-session or globally incrementing ID is of course we can't tell apart different query runs with this ID alone. But for now I believe this is a good enough tradeoff.
## How was this patch tested?
Existing tests. This PR does not involve any runtime behavior changes other than some name changes.
The SQL query test suites that compares explain outputs have been updates to ignore the newly added `codegenStageId`.
Author: Kris Mok <kris.mok@databricks.com>
Closes#20224 from rednaxelafx/wsc-codegenstageid.
## What changes were proposed in this pull request?
Add colRegex API to PySpark
## How was this patch tested?
add a test in sql/tests.py
Author: Huaxin Gao <huaxing@us.ibm.com>
Closes#20390 from huaxingao/spark-23081.
Update ML user guide with highlights and migration guide for `2.3`.
## How was this patch tested?
Doc only.
Author: Nick Pentreath <nickp@za.ibm.com>
Closes#20363 from MLnick/SPARK-23112-ml-guide.
## What changes were proposed in this pull request?
It has been observed in SPARK-21603 that whole-stage codegen suffers performance degradation, if the generated functions are too long to be optimized by JIT.
We basically produce a single function to incorporate generated codes from all physical operators in whole-stage. Thus, it is possibly to grow the size of generated function over a threshold that we can't have JIT optimization for it anymore.
This patch is trying to decouple the logic of consuming rows in physical operators to avoid a giant function processing rows.
## How was this patch tested?
Added tests.
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#18931 from viirya/SPARK-21717.
## What changes were proposed in this pull request?
This syncs the ML Python API with Scala for differences found after the 2.3 QA audit.
## How was this patch tested?
NA
Author: Bryan Cutler <cutlerb@gmail.com>
Closes#20354 from BryanCutler/pyspark-ml-doc-sync-23163.
## What changes were proposed in this pull request?
The `GenArrayData.genCodeToCreateArrayData` produces illegal java code when code splitting is enabled. This is used in `CreateArray` and `CreateMap` expressions for complex object arrays.
This issue is caused by a typo.
## How was this patch tested?
Added a regression test in `complexTypesSuite`.
Author: Herman van Hovell <hvanhovell@databricks.com>
Closes#20391 from hvanhovell/SPARK-23208.
## What changes were proposed in this pull request?
Currently,the deserializeStream in ExternalAppendOnlyMap#DiskMapIterator init when DiskMapIterator instance created.This will cause memory use overhead when ExternalAppendOnlyMap spill too much times.
We can avoid this by making deserializeStream init when it is used the first time.
This patch make deserializeStream init lazily.
## How was this patch tested?
Exist tests
Author: zhoukang <zhoukang199191@gmail.com>
Closes#20292 from caneGuy/zhoukang/lay-diskmapiterator.
## What changes were proposed in this pull request?
This PR is repaired as follows
1、update y -> x in "left outer join" test case ,maybe is mistake.
2、add a new test case:"left outer join and left sides are limited"
3、add a new test case:"left outer join and right sides are limited"
4、add a new test case: "right outer join and right sides are limited"
5、add a new test case: "right outer join and left sides are limited"
6、Remove annotations without code implementation
## How was this patch tested?
add new unit test case.
Author: caoxuewen <cao.xuewen@zte.com.cn>
Closes#20381 from heary-cao/LimitPushdownSuite.
## What changes were proposed in this pull request?
Currently, `KafkaContinuousSourceStressForDontFailOnDataLossSuite` runs on `MicroBatchExecution`. It should test `ContinuousExecution`.
## How was this patch tested?
Pass the updated test suite.
Author: Dongjoon Hyun <dongjoon@apache.org>
Closes#20374 from dongjoon-hyun/SPARK-23198.
## What changes were proposed in this pull request?
[Ticket](https://issues.apache.org/jira/browse/SPARK-22297)
- one of the tests seems to produce unreliable results due to execution speed variability
Since the original test was trying to connect to the test server with `40 ms` timeout, and the test server replied after `50 ms`, the error might be produced under the following conditions:
- it might occur that the test server replies correctly after `50 ms`
- but the client does only receive the timeout after `51 ms`s
- this might happen if the executor has to schedule a big number of threads, and decides to delay the thread/actor that is responsible to watch the timeout, because of high CPU load
- running an entire test suite usually produces high loads on the CPU executing the tests
## How was this patch tested?
The test's check cases remain the same and the set-up emulates the previous version's.
Author: Mark Petruska <petruska.mark@gmail.com>
Closes#19671 from mpetruska/SPARK-22297.
## What changes were proposed in this pull request?
Correctly guard against empty datasets in `org.apache.spark.ml.classification.Classifier`
## How was this patch tested?
existing tests
Author: Matthew Tovbin <mtovbin@salesforce.com>
Closes#20321 from tovbinm/SPARK-23152.
## What changes were proposed in this pull request?
Currently we do not call the `super.init(hiveConf)` in `SparkSQLSessionManager.init`. So we do not load the config `HIVE_SERVER2_SESSION_CHECK_INTERVAL HIVE_SERVER2_IDLE_SESSION_TIMEOUT HIVE_SERVER2_IDLE_SESSION_CHECK_OPERATION` , which cause the session timeout checker does not work.
## How was this patch tested?
manual tests
Author: zuotingbing <zuo.tingbing9@zte.com.cn>
Closes#20025 from zuotingbing/SPARK-22837.
## What changes were proposed in this pull request?
This is a follow-up of #20297 which broke lint-java checks.
This pr fixes the lint-java issues.
```
[ERROR] src/test/java/org/apache/spark/launcher/BaseSuite.java:[21,8] (imports) UnusedImports: Unused import - java.util.concurrent.TimeUnit.
[ERROR] src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java:[27,8] (imports) UnusedImports: Unused import - java.util.concurrent.TimeUnit.
```
## How was this patch tested?
Checked manually in my local environment.
Author: Takuya UESHIN <ueshin@databricks.com>
Closes#20376 from ueshin/issues/SPARK-23020/fup1.
## What changes were proposed in this pull request?
doc only changes
## How was this patch tested?
manual
Author: Felix Cheung <felixcheung_m@hotmail.com>
Closes#20380 from felixcheung/rclrdoc.
## What changes were proposed in this pull request?
In this PR stage blacklisting is propagated to UI by introducing a new Spark listener event (SparkListenerExecutorBlacklistedForStage) which indicates the executor is blacklisted for a stage. Either because of the number of failures are exceeded a limit given for an executor (spark.blacklist.stage.maxFailedTasksPerExecutor) or because of the whole node is blacklisted for a stage (spark.blacklist.stage.maxFailedExecutorsPerNode). In case of the node is blacklisting all executors will listed as blacklisted for the stage.
Blacklisting state for a selected stage can be seen "Aggregated Metrics by Executor" table's blacklisting column, where after this change three possible labels could be found:
- "for application": when the executor is blacklisted for the application (see the configuration spark.blacklist.application.maxFailedTasksPerExecutor for details)
- "for stage": when the executor is **only** blacklisted for the stage
- "false" : when the executor is not blacklisted at all
## How was this patch tested?
It is tested both manually and with unit tests.
#### Unit tests
- HistoryServerSuite
- TaskSetBlacklistSuite
- AppStatusListenerSuite
#### Manual test for executor blacklisting
Running Spark as a local cluster:
```
$ bin/spark-shell --master "local-cluster[2,1,1024]" --conf "spark.blacklist.enabled=true" --conf "spark.blacklist.stage.maxFailedTasksPerExecutor=1" --conf "spark.blacklist.application.maxFailedTasksPerExecutor=10" --conf "spark.eventLog.enabled=true"
```
Executing:
``` scala
import org.apache.spark.SparkEnv
sc.parallelize(1 to 10, 10).map { x =>
if (SparkEnv.get.executorId == "0") throw new RuntimeException("Bad executor")
else (x % 3, x)
}.reduceByKey((a, b) => a + b).collect()
```
To see result check the "Aggregated Metrics by Executor" section at the bottom of picture:
![UI screenshot for stage level blacklisting executor](https://issues.apache.org/jira/secure/attachment/12905283/stage_blacklisting.png)
#### Manual test for node blacklisting
Running Spark as on a cluster:
``` bash
./bin/spark-shell --master yarn --deploy-mode client --executor-memory=2G --num-executors=8 --conf "spark.blacklist.enabled=true" --conf "spark.blacklist.stage.maxFailedTasksPerExecutor=1" --conf "spark.blacklist.stage.maxFailedExecutorsPerNode=1" --conf "spark.blacklist.application.maxFailedTasksPerExecutor=10" --conf "spark.eventLog.enabled=true"
```
And the job was:
``` scala
import org.apache.spark.SparkEnv
sc.parallelize(1 to 10000, 10).map { x =>
if (SparkEnv.get.executorId.toInt >= 4) throw new RuntimeException("Bad executor")
else (x % 3, x)
}.reduceByKey((a, b) => a + b).collect()
```
The result is:
![UI screenshot for stage level node blacklisting](https://issues.apache.org/jira/secure/attachment/12906833/node_blacklisting_for_stage.png)
Here you can see apiros3.gce.test.com was node blacklisted for the stage because of failures on executor 4 and 5. As expected executor 3 is also blacklisted even it has no failures itself but sharing the node with 4 and 5.
Author: “attilapiros” <piros.attila.zsolt@gmail.com>
Author: Attila Zsolt Piros <2017933+attilapiros@users.noreply.github.com>
Closes#20203 from attilapiros/SPARK-22577.
…JSON / text
## What changes were proposed in this pull request?
Fix for JSON and CSV data sources when file names include characters
that would be changed by URL encoding.
## How was this patch tested?
New unit tests for JSON, CSV and text suites
Author: Henry Robinson <henry@cloudera.com>
Closes#20355 from henryr/spark-23148.
## What changes were proposed in this pull request?
Referencing latest python code style checking from PyPi/pycodestyle
Removed pending TODO
For now, in tox.ini excluded the additional style error discovered on existing python due to latest style checker (will fallback on review comment to finalize exclusion or fix py)
Any further code styling requirement needs to be part of pycodestyle, not in SPARK.
## How was this patch tested?
./dev/run-tests
Author: Rekha Joshi <rekhajoshm@gmail.com>
Author: rjoshi2 <rekhajoshm@gmail.com>
Closes#20338 from rekhajoshm/SPARK-11222.
## What changes were proposed in this pull request?
A fix to https://issues.apache.org/jira/browse/SPARK-21727, "Operating on an ArrayType in a SparkR DataFrame throws error"
## How was this patch tested?
- Ran tests at R\pkg\tests\run-all.R (see below attached results)
- Tested the following lines in SparkR, which now seem to execute without error:
```
indices <- 1:4
myDf <- data.frame(indices)
myDf$data <- list(rep(0, 20))
mySparkDf <- as.DataFrame(myDf)
collect(mySparkDf)
```
[2018-01-22 SPARK-21727 Test Results.txt](https://github.com/apache/spark/files/1653535/2018-01-22.SPARK-21727.Test.Results.txt)
felixcheung yanboliang sun-rui shivaram
_The contribution is my original work and I license the work to the project under the project’s open source license_
Author: neilalex <neil@neilalex.com>
Closes#20352 from neilalex/neilalex-sparkr-arraytype.
## What changes were proposed in this pull request?
We extract Python UDFs in logical aggregate which depends on aggregate expression or grouping key in ExtractPythonUDFFromAggregate rule. But Python UDFs which don't depend on above expressions should also be extracted to avoid the issue reported in the JIRA.
A small code snippet to reproduce that issue looks like:
```python
import pyspark.sql.functions as f
df = spark.createDataFrame([(1,2), (3,4)])
f_udf = f.udf(lambda: str("const_str"))
df2 = df.distinct().withColumn("a", f_udf())
df2.show()
```
Error exception is raised as:
```
: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding attribute, tree: pythonUDF0#50
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:91)
at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:90)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256)
at org.apache.spark.sql.catalyst.expressions.BindReferences$.bindReference(BoundAttribute.scala:90)
at org.apache.spark.sql.execution.aggregate.HashAggregateExec$$anonfun$38.apply(HashAggregateExec.scala:514)
at org.apache.spark.sql.execution.aggregate.HashAggregateExec$$anonfun$38.apply(HashAggregateExec.scala:513)
```
This exception raises because `HashAggregateExec` tries to bind the aliased Python UDF expression (e.g., `pythonUDF0#50 AS a#44`) to grouping key.
## How was this patch tested?
Added test.
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#20360 from viirya/SPARK-23177.
## What changes were proposed in this pull request?
Increased timeout from 50 ms to 300 ms (50 ms was really too low).
## How was this patch tested?
Multiple rounds of tests.
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes#20371 from tdas/SPARK-23197.
## What changes were proposed in this pull request?
The broadcast hint of the cached plan is lost if we cache the plan. This PR is to correct it.
```Scala
val df1 = spark.createDataFrame(Seq((1, "4"), (2, "2"))).toDF("key", "value")
val df2 = spark.createDataFrame(Seq((1, "1"), (2, "2"))).toDF("key", "value")
broadcast(df2).cache()
df2.collect()
val df3 = df1.join(df2, Seq("key"), "inner")
```
## How was this patch tested?
Added a test.
Author: gatorsmile <gatorsmile@gmail.com>
Closes#20368 from gatorsmile/cachedBroadcastHint.
## What changes were proposed in this pull request?
The hint of the plan segment is lost, if the plan segment is replaced by the cached data.
```Scala
val df1 = spark.createDataFrame(Seq((1, "4"), (2, "2"))).toDF("key", "value")
val df2 = spark.createDataFrame(Seq((1, "1"), (2, "2"))).toDF("key", "value")
df2.cache()
val df3 = df1.join(broadcast(df2), Seq("key"), "inner")
```
This PR is to fix it.
## How was this patch tested?
Added a test
Author: gatorsmile <gatorsmile@gmail.com>
Closes#20365 from gatorsmile/fixBroadcastHintloss.
## What changes were proposed in this pull request?
Added documentation for new transformer.
Author: Bago Amirbekian <bago@databricks.com>
Closes#20285 from MrBago/sizeHintDocs.
Because the call to the constructor of HiveClientImpl crosses class loader
boundaries, different versions of the same class (Configuration in this
case) were loaded, and that caused a runtime error when instantiating the
client. By using a safer type in the signature of the constructor, it's
possible to avoid the problem.
I considered removing 'sharesHadoopClasses', but it may still be desired
(even though there are 0 users of it since it was not working). When Spark
starts to support Hadoop 3, it may be necessary to use that option to
load clients for older Hive metastore versions that don't know about
Hadoop 3.
Tested with added unit test.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#20169 from vanzin/SPARK-17088.
## What changes were proposed in this pull request?
We need to override the prettyName for bit_length and octet_length for getting the expected auto-generated alias name.
## How was this patch tested?
The existing tests
Author: gatorsmile <gatorsmile@gmail.com>
Closes#20358 from gatorsmile/test2.3More.
## What changes were proposed in this pull request?
#20002 purposed a way to safe check the default partitioner, however, if `spark.default.parallelism` is set, the defaultParallelism still could be smaller than the proper number of partitions for upstreams RDDs. This PR tries to extend the approach to address the condition when `spark.default.parallelism` is set.
The requirements where the PR helps with are :
- Max partitioner is not eligible since it is atleast an order smaller, and
- User has explicitly set 'spark.default.parallelism', and
- Value of 'spark.default.parallelism' is lower than max partitioner
- Since max partitioner was discarded due to being at least an order smaller, default parallelism is worse - even though user specified.
Under the rest cases, the changes should be no-op.
## How was this patch tested?
Add corresponding test cases in `PairRDDFunctionsSuite` and `PartitioningSuite`.
Author: Xingbo Jiang <xingbo.jiang@databricks.com>
Closes#20091 from jiangxb1987/partitioner.
## What changes were proposed in this pull request?
Add support for using pandas UDFs with groupby().agg().
This PR introduces a new type of pandas UDF - group aggregate pandas UDF. This type of UDF defines a transformation of multiple pandas Series -> a scalar value. Group aggregate pandas UDFs can be used with groupby().agg(). Note group aggregate pandas UDF doesn't support partial aggregation, i.e., a full shuffle is required.
This PR doesn't support group aggregate pandas UDFs that return ArrayType, StructType or MapType. Support for these types is left for future PR.
## How was this patch tested?
GroupbyAggPandasUDFTests
Author: Li Jin <ice.xelloss@gmail.com>
Closes#19872 from icexelloss/SPARK-22274-groupby-agg.
## What changes were proposed in this pull request?
a new interface which allows data source to report partitioning and avoid shuffle at Spark side.
The design is pretty like the internal distribution/partitioing framework. Spark defines a `Distribution` interfaces and several concrete implementations, and ask the data source to report a `Partitioning`, the `Partitioning` should tell Spark if it can satisfy a `Distribution` or not.
## How was this patch tested?
new test
Author: Wenchen Fan <wenchen@databricks.com>
Closes#20201 from cloud-fan/partition-reporting.
## What changes were proposed in this pull request?
Typo fixes
## How was this patch tested?
Local build / Doc-only changes
Author: Jacek Laskowski <jacek@japila.pl>
Closes#20344 from jaceklaskowski/typo-fixes.
## What changes were proposed in this pull request?
The allJobs and the job pages attempt to use stage attempt and DAG visualization from the store, but for long running jobs they are not guaranteed to be retained, leading to exceptions when these pages are rendered.
To fix it `store.lastStageAttempt(stageId)` and `store.operationGraphForJob(jobId)` are wrapped in `store.asOption` and default values are used if the info is missing.
## How was this patch tested?
Manual testing of the UI, also using the test command reported in SPARK-23121:
./bin/spark-submit --class org.apache.spark.examples.streaming.HdfsWordCount ./examples/jars/spark-examples_2.11-2.4.0-SNAPSHOT.jar /spark
Closes#20287
Author: Sandor Murakozi <smurakozi@gmail.com>
Closes#20330 from smurakozi/SPARK-23121.
## What changes were proposed in this pull request?
ClosureCleaner moved from warning to debug
## How was this patch tested?
Existing tests
Author: Rekha Joshi <rekhajoshm@gmail.com>
Author: rjoshi2 <rekhajoshm@gmail.com>
Closes#20337 from rekhajoshm/SPARK-11630-1.
## What changes were proposed in this pull request?
Note that this PR was made based on the top of https://github.com/apache/spark/pull/20151. So, it almost leaves the main codes intact.
This PR proposes to add a script for the preparation of automatic PySpark coverage generation. Now, it's difficult to check the actual coverage in case of PySpark. With this script, it allows to run tests by the way we did via `run-tests` script before. The usage is exactly the same with `run-tests` script as this basically wraps it.
This script and PR alone should also be useful. I was asked about how to run this before, and seems some reviewers (including me) need this. It would be also useful to run it manually.
It usually requires a small diff in normal Python projects but PySpark cases are a bit different because apparently we are unable to track the coverage after it's forked. So, here, I made a custom worker that forces the coverage, based on the top of https://github.com/apache/spark/pull/20151.
I made a simple demo. Please take a look - https://spark-test.github.io/pyspark-coverage-site.
To show up the structure, this PR adds the files as below:
```
python
├── .coveragerc # Runtime configuration when we run the script.
├── run-tests-with-coverage # The script that has coverage support and wraps run-tests script.
└── test_coverage # Directories that have files required when running coverage.
├── conf
│ └── spark-defaults.conf # Having the configuration 'spark.python.daemon.module'.
├── coverage_daemon.py # A daemon having custom fix and wrapping our daemon.py
└── sitecustomize.py # Initiate coverage with COVERAGE_PROCESS_START
```
Note that this PR has a minor nit:
[This scope](04e44b37cc/python/pyspark/daemon.py (L148-L169)) in `daemon.py` is not in the coverage results as basically I am producing the coverage results in `worker.py` separately and then merging it. I believe it's not a big deal.
In a followup, I might have a site that has a single up-to-date PySpark coverage from the master branch as the fallback / default, or have a site that has multiple PySpark coverages and the site link will be left to each pull request.
## How was this patch tested?
Manually tested. Usage is the same with the existing Python test script - `./python/run-tests`. For example,
```
sh run-tests-with-coverage --python-executables=python3 --modules=pyspark-sql
```
Running this will generate HTMLs under `./python/test_coverage/htmlcov`.
Console output example:
```
sh run-tests-with-coverage --python-executables=python3,python --modules=pyspark-core
Running PySpark tests. Output is in /.../spark/python/unit-tests.log
Will test against the following Python executables: ['python3', 'python']
Will test the following Python modules: ['pyspark-core']
Starting test(python): pyspark.tests
Starting test(python3): pyspark.tests
...
Tests passed in 231 seconds
Combining collected coverage data under /.../spark/python/test_coverage/coverage_data
Reporting the coverage data at /...spark/python/test_coverage/coverage_data/coverage
Name Stmts Miss Branch BrPart Cover
--------------------------------------------------------------
pyspark/__init__.py 41 0 8 2 96%
...
pyspark/profiler.py 74 11 22 5 83%
pyspark/rdd.py 871 40 303 32 93%
pyspark/rddsampler.py 68 10 32 2 82%
...
--------------------------------------------------------------
TOTAL 8521 3077 2748 191 59%
Generating HTML files for PySpark coverage under /.../spark/python/test_coverage/htmlcov
```
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#20204 from HyukjinKwon/python-coverage.
## What changes were proposed in this pull request?
Several improvements:
* provide a default implementation for the batch get methods
* rename `getChildColumn` to `getChild`, which is more concise
* remove `getStruct(int, int)`, it's only used to simplify the codegen, which is an internal thing, we should not add a public API for this purpose.
## How was this patch tested?
existing tests
Author: Wenchen Fan <wenchen@databricks.com>
Closes#20277 from cloud-fan/column-vector.
## What changes were proposed in this pull request?
Revert the unneeded test case changes we made in SPARK-23000
Also fixes the test suites that do not call `super.afterAll()` in the local `afterAll`. The `afterAll()` of `TestHiveSingleton` actually reset the environments.
## How was this patch tested?
N/A
Author: gatorsmile <gatorsmile@gmail.com>
Closes#20341 from gatorsmile/testRelated.
## What changes were proposed in this pull request?
This PR is to update the docs for UDF registration
## How was this patch tested?
N/A
Author: gatorsmile <gatorsmile@gmail.com>
Closes#20348 from gatorsmile/testUpdateDoc.
## What changes were proposed in this pull request?
The example jar file is now in ./examples/jars directory of Spark distribution.
Author: Arseniy Tashoyan <tashoyan@users.noreply.github.com>
Closes#20349 from tashoyan/patch-1.
The race in the code is because the handle might update
its state to the wrong state if the connection handling
thread is still processing incoming data; so the handle
needs to wait for the connection to finish up before
checking the final state.
The race in the test is because when waiting for a handle
to reach a final state, the waitFor() method needs to wait
until all handle state is updated (which also includes
waiting for the connection thread above to finish).
Otherwise, waitFor() may return too early, which would cause
a bunch of different races (like the listener not being yet
notified of the state change, or being in the middle of
being notified, or the handle not being properly disposed
and causing postChecks() to assert).
On top of that I found, by code inspection, a couple of
potential races that could make a handle end up in the
wrong state when being killed.
The original version of this fix introduced the flipped
version of the first race described above; the connection
closing might override the handle state before the
handle might have a chance to do cleanup. The fix there
is to only dispose of the handle from the connection
when there is an error, and let the handle dispose
itself in the normal case.
The fix also caused a bug in YarnClusterSuite to be surfaced;
the code was checking for a file in the classpath that was
not expected to be there in client mode. Because of the above
issues, the error was not propagating correctly and the (buggy)
test was incorrectly passing.
Tested by running the existing unit tests a lot (and not
seeing the errors I was seeing before).
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#20297 from vanzin/SPARK-23020.
## What changes were proposed in this pull request?
This PR fixes the wrong comment on `org.apache.spark.sql.parquet.row.attributes`
which is useful for UDTs like Vector/Matrix. Please see [SPARK-22320](https://issues.apache.org/jira/browse/SPARK-22320) for the usage.
Originally, [SPARK-19411](bf493686eb (diff-ee26d4c4be21e92e92a02e9f16dbc285L314)) left this behind during removing optional column metadatas. In the same PR, the same comment was removed at line 310-311.
## How was this patch tested?
N/A (This is about comments).
Author: Dongjoon Hyun <dongjoon@apache.org>
Closes#20346 from dongjoon-hyun/minor_comment_parquet.