## What changes were proposed in this pull request?
This PR fixes possible overflow in int add or multiply. In particular, their overflows in multiply are detected by [Spotbugs](https://spotbugs.github.io/)
The following assignments may cause overflow in right hand side. As a result, the result may be negative.
```
long = int * int
long = int + int
```
To avoid this problem, this PR performs cast from int to long in right hand side.
## How was this patch tested?
Existing UTs.
Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>
Closes#21481 from kiszk/SPARK-24452.
`WebUI` defines `addStaticHandler` that web UIs don't use (and simply introduce duplication). Let's clean them up and remove duplications.
Local build and waiting for Jenkins
Author: Jacek Laskowski <jacek@japila.pl>
Closes#21510 from jaceklaskowski/SPARK-24490-Use-WebUI.addStaticHandler.
## What changes were proposed in this pull request?
Previously, the scheduler backend was maintaining state in many places, not only for reading state but also writing to it. For example, state had to be managed in both the watch and in the executor allocator runnable. Furthermore, one had to keep track of multiple hash tables.
We can do better here by:
1. Consolidating the places where we manage state. Here, we take inspiration from traditional Kubernetes controllers. These controllers tend to follow a level-triggered mechanism. This means that the controller will continuously monitor the API server via watches and polling, and on periodic passes, the controller will reconcile the current state of the cluster with the desired state. We implement this by introducing the concept of a pod snapshot, which is a given state of the executors in the Kubernetes cluster. We operate periodically on snapshots. To prevent overloading the API server with polling requests to get the state of the cluster (particularly for executor allocation where we want to be checking frequently to get executors to launch without unbearably bad latency), we use watches to populate snapshots by applying observed events to a previous snapshot to get a new snapshot. Whenever we do poll the cluster, the polled state replaces any existing snapshot - this ensures eventual consistency and mirroring of the cluster, as is desired in a level triggered architecture.
2. Storing less specialized in-memory state in general. Previously we were creating hash tables to represent the state of executors. Instead, it's easier to represent state solely by the snapshots.
## How was this patch tested?
Integration tests should test there's no regressions end to end. Unit tests to be updated, in particular focusing on different orderings of events, particularly accounting for when events come in unexpected ordering.
Author: mcheah <mcheah@palantir.com>
Closes#21366 from mccheah/event-queue-driven-scheduling.
## What changes were proposed in this pull request?
We don't require specific ordering of the input data, the sort action is not necessary and misleading.
## How was this patch tested?
Existing test suite.
Author: Xingbo Jiang <xingbo.jiang@databricks.com>
Closes#21536 from jiangxb1987/sorterSuite.
## What changes were proposed in this pull request?
This patch measures and logs elapsed time for each operation which communicate with file system (mostly remote HDFS in production) in HDFSBackedStateStoreProvider to help investigating any latency issue.
## How was this patch tested?
Manually tested.
Author: Jungtaek Lim <kabhwan@gmail.com>
Closes#21506 from HeartSaVioR/SPARK-24485.
## What changes were proposed in this pull request?
This PR enables using a grouped aggregate pandas UDFs as window functions. The semantics is the same as using SQL aggregation function as window functions.
```
>>> from pyspark.sql.functions import pandas_udf, PandasUDFType
>>> from pyspark.sql import Window
>>> df = spark.createDataFrame(
... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
... ("id", "v"))
>>> pandas_udf("double", PandasUDFType.GROUPED_AGG)
... def mean_udf(v):
... return v.mean()
>>> w = Window.partitionBy('id')
>>> df.withColumn('mean_v', mean_udf(df['v']).over(w)).show()
+---+----+------+
| id| v|mean_v|
+---+----+------+
| 1| 1.0| 1.5|
| 1| 2.0| 1.5|
| 2| 3.0| 6.0|
| 2| 5.0| 6.0|
| 2|10.0| 6.0|
+---+----+------+
```
The scope of this PR is somewhat limited in terms of:
(1) Only supports unbounded window, which acts essentially as group by.
(2) Only supports aggregation functions, not "transform" like window functions (n -> n mapping)
Both of these are left as future work. Especially, (1) needs careful thinking w.r.t. how to pass rolling window data to python efficiently. (2) is a bit easier but does require more changes therefore I think it's better to leave it as a separate PR.
## How was this patch tested?
WindowPandasUDFTests
Author: Li Jin <ice.xelloss@gmail.com>
Closes#21082 from icexelloss/SPARK-22239-window-udf.
## What changes were proposed in this pull request?
Currently, `spark.ui.filters` are not applied to the handlers added after binding the server. This means that every page which is added after starting the UI will not have the filters configured on it. This can allow unauthorized access to the pages.
The PR adds the filters also to the handlers added after the UI starts.
## How was this patch tested?
manual tests (without the patch, starting the thriftserver with `--conf spark.ui.filters=org.apache.hadoop.security.authentication.server.AuthenticationFilter --conf spark.org.apache.hadoop.security.authentication.server.AuthenticationFilter.params="type=simple"` you can access `http://localhost:4040/sqlserver`; with the patch, 401 is the response as for the other pages).
Author: Marco Gaido <marcogaido91@gmail.com>
Closes#21523 from mgaido91/SPARK-24506.
## What changes were proposed in this pull request?
When user create a aggregator object in scala and pass the aggregator to Spark Dataset's agg() method, Spark's will initialize TypedAggregateExpression with the nodeName field as aggregator.getClass.getSimpleName. However, getSimpleName is not safe in scala environment, depending on how user creates the aggregator object. For example, if the aggregator class full qualified name is "com.my.company.MyUtils$myAgg$2$", the getSimpleName will throw java.lang.InternalError "Malformed class name". This has been reported in scalatest https://github.com/scalatest/scalatest/pull/1044 and discussed in many scala upstream jiras such as SI-8110, SI-5425.
To fix this issue, we follow the solution in https://github.com/scalatest/scalatest/pull/1044 to add safer version of getSimpleName as a util method, and TypedAggregateExpression will invoke this util method rather than getClass.getSimpleName.
## How was this patch tested?
added unit test
Author: Fangshi Li <fli@linkedin.com>
Closes#21276 from fangshil/SPARK-24216.
## What changes were proposed in this pull request?
When HadoopMapRedCommitProtocol is used (e.g., when using saveAsTextFile() or
saveAsHadoopFile() with RDDs), it's not easy to determine which output committer
class was used, so this PR simply logs the class that was used, similarly to what
is done in SQLHadoopMapReduceCommitProtocol.
## How was this patch tested?
Built Spark then manually inspected logging when calling saveAsTextFile():
```scala
scala> sc.setLogLevel("INFO")
scala> sc.textFile("README.md").saveAsTextFile("/tmp/out")
...
18/05/29 10:06:20 INFO HadoopMapRedCommitProtocol: Using output committer class org.apache.hadoop.mapred.FileOutputCommitter
```
Author: Jonathan Kelly <jonathak@amazon.com>
Closes#21452 from ejono/master.
## What changes were proposed in this pull request?
Introducing Python Bindings for PySpark.
- [x] Running PySpark Jobs
- [x] Increased Default Memory Overhead value
- [ ] Dependency Management for virtualenv/conda
## How was this patch tested?
This patch was tested with
- [x] Unit Tests
- [x] Integration tests with [this addition](https://github.com/apache-spark-on-k8s/spark-integration/pull/46)
```
KubernetesSuite:
- Run SparkPi with no resources
- Run SparkPi with a very long application name.
- Run SparkPi with a master URL without a scheme.
- Run SparkPi with an argument.
- Run SparkPi with custom labels, annotations, and environment variables.
- Run SparkPi with a test secret mounted into the driver and executor pods
- Run extraJVMOptions check on driver
- Run SparkRemoteFileTest using a remote data file
- Run PySpark on simple pi.py example
- Run PySpark with Python2 to test a pyfiles example
- Run PySpark with Python3 to test a pyfiles example
Run completed in 4 minutes, 28 seconds.
Total number of tests run: 11
Suites: completed 2, aborted 0
Tests: succeeded 11, failed 0, canceled 0, ignored 0, pending 0
All tests passed.
```
Author: Ilan Filonenko <if56@cornell.edu>
Author: Ilan Filonenko <ifilondz@gmail.com>
Closes#21092 from ifilonenko/master.
change runTasks to submitTasks in the TaskSchedulerImpl.scala 's comment
Author: xueyu <xueyu@yidian-inc.com>
Author: Xue Yu <278006819@qq.com>
Closes#21485 from xueyumusic/fixtypo1.
## What changes were proposed in this pull request?
Currently we only clean up the local directories on application removed. However, when executors die and restart repeatedly, many temp files are left untouched in the local directories, which is undesired behavior and could cause disk space used up gradually.
We can detect executor death in the Worker, and clean up the non-shuffle files (files not ended with ".index" or ".data") in the local directories, we should not touch the shuffle files since they are expected to be used by the external shuffle service.
Scope of this PR is limited to only implement the cleanup logic on a Standalone cluster, we defer to experts familiar with other cluster managers(YARN/Mesos/K8s) to determine whether it's worth to add similar support.
## How was this patch tested?
Add new test suite to cover.
Author: Xingbo Jiang <xingbo.jiang@databricks.com>
Closes#21390 from jiangxb1987/cleanupNonshuffleFiles.
This PR adds a debugging log for YARN-specific credential providers which is loaded by service loader mechanism.
It took me a while to debug if it's actually loaded or not. I had to explicitly set the deprecated configuration and check if that's actually being loaded.
The change scope is manually tested. Logs are like:
```
Using the following builtin delegation token providers: hadoopfs, hive, hbase.
Using the following YARN-specific credential providers: yarn-test.
```
Author: hyukjinkwon <gurwls223@apache.org>
Closes#21466 from HyukjinKwon/minor-log.
Change-Id: I18e2fb8eeb3289b148f24c47bb3130a560a881cf
## What changes were proposed in this pull request?
This adds a new API `TaskContext.getLocalProperty(key)` to the Python TaskContext. It mirrors the Java TaskContext API of returning a string value if the key exists, or None if the key does not exist.
## How was this patch tested?
New test added.
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes#21437 from tdas/SPARK-24397.
This change takes into account all non-pending tasks when calculating
the number of tasks to be shown. This also means that when the stage
is pending, the task table (or, in fact, most of the data in the stage
page) will not be rendered.
I also fixed the label when the known number of tasks is larger than
the recorded number of tasks (it was inverted).
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#21457 from vanzin/SPARK-24414.
As discussed separately, this avoids the possibility of XSS on certain request param keys.
CC vanzin
Author: Sean Owen <srowen@gmail.com>
Closes#21464 from srowen/XSS2.
## What changes were proposed in this pull request?
Improve the exception messages when retrieving Spark conf values to include the key name when the value is invalid.
## How was this patch tested?
Unit tests for all get* operations in SparkConf that require a specific value format
Author: William Sheu <william.sheu@databricks.com>
Closes#21454 from PenguinToast/SPARK-24337-spark-config-errors.
## What changes were proposed in this pull request?
In client side before context initialization specifically, .py file doesn't work in client side before context initialization when the application is a Python file. See below:
```
$ cat /home/spark/tmp.py
def testtest():
return 1
```
This works:
```
$ cat app.py
import pyspark
pyspark.sql.SparkSession.builder.getOrCreate()
import tmp
print("************************%s" % tmp.testtest())
$ ./bin/spark-submit --master yarn --deploy-mode client --py-files /home/spark/tmp.py app.py
...
************************1
```
but this doesn't:
```
$ cat app.py
import pyspark
import tmp
pyspark.sql.SparkSession.builder.getOrCreate()
print("************************%s" % tmp.testtest())
$ ./bin/spark-submit --master yarn --deploy-mode client --py-files /home/spark/tmp.py app.py
Traceback (most recent call last):
File "/home/spark/spark/app.py", line 2, in <module>
import tmp
ImportError: No module named tmp
```
### How did it happen?
In client mode specifically, the paths are being added into PythonRunner as are:
628c7b5179/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala (L430)628c7b5179/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala (L49-L88)
The problem here is, .py file shouldn't be added as are since `PYTHONPATH` expects a directory or an archive like zip or egg.
### How does this PR fix?
We shouldn't simply just add its parent directory because other files in the parent directory could also be added into the `PYTHONPATH` in client mode before context initialization.
Therefore, we copy .py files into a temp directory for .py files and add it to `PYTHONPATH`.
## How was this patch tested?
Unit tests are added and manually tested in both standalond and yarn client modes with submit.
Author: hyukjinkwon <gurwls223@apache.org>
Closes#21426 from HyukjinKwon/SPARK-24384.
## What changes were proposed in this pull request?
For some Spark applications, though they're a java program, they require not only jar dependencies, but also python dependencies. One example is Livy remote SparkContext application, this application is actually an embedded REPL for Scala/Python/R, it will not only load in jar dependencies, but also python and R deps, so we should specify not only "--jars", but also "--py-files".
Currently for a Spark application, --py-files can only be worked for a pyspark application, so it will not be worked in the above case. So here propose to remove such restriction.
Also we tested that "spark.submit.pyFiles" only supports quite limited scenario (client mode with local deps), so here also expand the usage of "spark.submit.pyFiles" to be alternative of --py-files.
## How was this patch tested?
UT added.
Author: jerryshao <sshao@hortonworks.com>
Closes#21420 from jerryshao/SPARK-24377.
## What changes were proposed in this pull request?
SPARK-17874 introduced a new configuration to set the port where SSL services bind to. We missed to update the scaladoc and the `toString` method, though. The PR adds it in the missing places
## How was this patch tested?
checked the `toString` output in the logs
Author: Marco Gaido <marcogaido91@gmail.com>
Closes#21429 from mgaido91/minor_ssl.
## What changes were proposed in this pull request?
Cleanup unused vals in `DAGScheduler.handleTaskCompletion` to reduce the code complexity slightly.
## How was this patch tested?
Existing test cases.
Author: Xingbo Jiang <xingbo.jiang@databricks.com>
Closes#21406 from jiangxb1987/handleTaskCompletion.
## What changes were proposed in this pull request?
When OutOfMemoryError thrown from BroadcastExchangeExec, scala.concurrent.Future will hit scala bug – https://github.com/scala/bug/issues/9554, and hang until future timeout:
We could wrap the OOM inside SparkException to resolve this issue.
## How was this patch tested?
Manually tested.
Author: jinxing <jinxing6042@126.com>
Closes#21342 from jinxing64/SPARK-24294.
## What changes were proposed in this pull request?
This change changes spark behavior to use the correct environment variable set by Mesos in the container on startup.
Author: Jake Charland <jakec@uber.com>
Closes#18894 from jakecharland/MesosSandbox.
## What changes were proposed in this pull request?
The ultimate goal is for listeners to onTaskEnd to receive metrics when a task is killed intentionally, since the data is currently just thrown away. This is already done for ExceptionFailure, so this just copies the same approach.
## How was this patch tested?
Updated existing tests.
This is a rework of https://github.com/apache/spark/pull/17422, all credits should go to noodle-fb
Author: Xianjin YE <advancedxy@gmail.com>
Author: Charles Lewis <noodle@fb.com>
Closes#21165 from advancedxy/SPARK-20087.
## What changes were proposed in this pull request?
The PR retrieves the proxyBase automatically from the header `X-Forwarded-Context` (if available). This is the header used by Knox to inform the proxied service about the base path.
This provides 0-configuration support for Knox gateway (instead of having to properly set `spark.ui.proxyBase`) and it allows to access directly SHS when it is proxied by Knox. In the previous scenario, indeed, after setting `spark.ui.proxyBase`, direct access to SHS was not working fine (due to bad link generated).
## How was this patch tested?
added UT + manual tests
Author: Marco Gaido <marcogaido91@gmail.com>
Closes#21268 from mgaido91/SPARK-24209.
EventListeners can interrupt the event queue thread. In particular,
when the EventLoggingListener writes to hdfs, hdfs can interrupt the
thread. When there is an interrupt, the queue should be removed and stop
accepting any more events. Before this change, the queue would continue
to take more events (till it was full), and then would not stop when the
application was complete because the PoisonPill couldn't be added.
Added a unit test which failed before this change.
Author: Imran Rashid <irashid@cloudera.com>
Closes#21356 from squito/SPARK-24309.
re-submit https://github.com/apache/spark/pull/21299 which broke build.
A few new commits are added to fix the SQLConf problem in `JsonSchemaInference.infer`, and prevent us to access `SQLConf` in DAGScheduler event loop thread.
## What changes were proposed in this pull request?
Previously in #20136 we decided to forbid tasks to access `SQLConf`, because it doesn't work and always give you the default conf value. In #21190 we fixed the check and all the places that violate it.
Currently the pattern of accessing configs at the executor side is: read the configs at the driver side, then access the variables holding the config values in the RDD closure, so that they will be serialized to the executor side. Something like
```
val someConf = conf.getXXX
child.execute().mapPartitions {
if (someConf == ...) ...
...
}
```
However, this pattern is hard to apply if the config needs to be propagated via a long call stack. An example is `DataType.sameType`, and see how many changes were made in #21190 .
When it comes to code generation, it's even worse. I tried it locally and we need to change a ton of files to propagate configs to code generators.
This PR proposes to allow tasks to access `SQLConf`. The idea is, we can save all the SQL configs to job properties when an SQL execution is triggered. At executor side we rebuild the `SQLConf` from job properties.
## How was this patch tested?
a new test suite
Author: Wenchen Fan <wenchen@databricks.com>
Closes#21376 from cloud-fan/config.
## What changes were proposed in this pull request?
- Removes the check for the keytab when we are running in mesos cluster mode.
- Keeps the check for client mode since in cluster mode we eventually launch the driver within the cluster in client mode. In the latter case we want to have the check done when the container starts, the keytab should be checked if it exists within the container's local filesystem.
## How was this patch tested?
This was manually tested by running spark submit in mesos cluster mode.
Author: Stavros <st.kontopoulos@gmail.com>
Closes#20967 from skonto/fix_mesos_keytab_susbmit.
## What changes were proposed in this pull request?
Previously in #20136 we decided to forbid tasks to access `SQLConf`, because it doesn't work and always give you the default conf value. In #21190 we fixed the check and all the places that violate it.
Currently the pattern of accessing configs at the executor side is: read the configs at the driver side, then access the variables holding the config values in the RDD closure, so that they will be serialized to the executor side. Something like
```
val someConf = conf.getXXX
child.execute().mapPartitions {
if (someConf == ...) ...
...
}
```
However, this pattern is hard to apply if the config needs to be propagated via a long call stack. An example is `DataType.sameType`, and see how many changes were made in #21190 .
When it comes to code generation, it's even worse. I tried it locally and we need to change a ton of files to propagate configs to code generators.
This PR proposes to allow tasks to access `SQLConf`. The idea is, we can save all the SQL configs to job properties when an SQL execution is triggered. At executor side we rebuild the `SQLConf` from job properties.
## How was this patch tested?
a new test suite
Author: Wenchen Fan <wenchen@databricks.com>
Closes#21299 from cloud-fan/config.
The old code was relying on a core configuration and extended its
default value to include things that redact desired things in the
app's environment. Instead, add a SQL-specific option for which
options to redact, and apply both the core and SQL-specific rules
when redacting the options in the save command.
This is a little sub-optimal since it adds another config, but it
retains the current default behavior.
While there I also fixed a typo and a couple of minor config API
usage issues in the related redaction option that SQL already had.
Tested with existing unit tests, plus checking the env page on
a shell UI.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#21158 from vanzin/SPARK-23850.
## What changes were proposed in this pull request?
In HadoopMapReduceCommitProtocol and FileFormatWriter, there are unnecessary settings in hadoop configuration.
Also clean up some code in SQL module.
## How was this patch tested?
Unit test
Author: Gengliang Wang <gengliang.wang@databricks.com>
Closes#21329 from gengliangwang/codeCleanWrite.
## What changes were proposed in this pull request?
According to the discussion in https://github.com/apache/spark/pull/21175 , this PR proposes 2 improvements:
1. add comments to explain why we call `limit` to write out `ByteBuffer` with slices.
2. remove the `try ... finally`
## How was this patch tested?
existing tests
Author: Wenchen Fan <wenchen@databricks.com>
Closes#21327 from cloud-fan/minor.
## What changes were proposed in this pull request?
There's a period of time when an accumulator has been garbage collected, but hasn't been removed from AccumulatorContext.originals by ContextCleaner. When an update is received for such accumulator it will throw an exception and kill the whole job. This can happen when a stage completes, but there're still running tasks from other attempts, speculation etc. Since AccumulatorContext.get() returns an option we can just return None in such case.
## How was this patch tested?
Unit test.
Author: Artem Rudoy <artem.rudoy@gmail.com>
Closes#21114 from artemrd/SPARK-22371.
## What changes were proposed in this pull request?
```
~/spark-2.3.0-bin-hadoop2.7$ bin/spark-sql --num-executors 0 --conf spark.dynamicAllocation.enabled=true
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option PermSize=1024m; support was removed in 8.0
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=1024m; support was removed in 8.0
Error: Number of executors must be a positive number
Run with --help for usage help or --verbose for debug output
```
Actually, we could start up with min executor number with 0 before if dynamically
## How was this patch tested?
ut added
Author: Kent Yao <yaooqinn@hotmail.com>
Closes#21290 from yaooqinn/SPARK-24241.
## What changes were proposed in this pull request?
When multiple clients attempt to resolve artifacts via the `--packages` parameter, they could run into race condition when they each attempt to modify the dummy `org.apache.spark-spark-submit-parent-default.xml` file created in the default ivy cache dir.
This PR changes the behavior to encode UUID in the dummy module descriptor so each client will operate on a different resolution file in the ivy cache dir. In addition, this patch changes the behavior of when and which resolution files are cleaned to prevent accumulation of resolution files in the default ivy cache dir.
Since this PR is a successor of #18801, close#18801. Many codes were ported from #18801. **Many efforts were put here. I think this PR should credit to Victsm .**
## How was this patch tested?
added UT into `SparkSubmitUtilsSuite`
Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>
Closes#21251 from kiszk/SPARK-10878.
## What changes were proposed in this pull request?
Sometimes "SparkListenerSuite.local metrics" test fails because the average of executorDeserializeTime is too short. As squito suggested to avoid these situations in one of the task a reference introduced to an object implementing a custom Externalizable.readExternal which sleeps 1ms before returning.
## How was this patch tested?
With unit tests (and checking the effect of this change to the average with a much larger sleep time).
Author: “attilapiros” <piros.attila.zsolt@gmail.com>
Author: Attila Zsolt Piros <2017933+attilapiros@users.noreply.github.com>
Closes#21280 from attilapiros/SPARK-19181.
## What changes were proposed in this pull request?
Drastically improves performance and won't cause Spark applications to fail because they write too much data to the Docker image's specific file system. The file system's directories that back emptydir volumes are generally larger and more performant.
## How was this patch tested?
Has been in use via the prototype version of Kubernetes support, but lost in the transition to here.
Author: mcheah <mcheah@palantir.com>
Closes#21238 from mccheah/mount-local-dirs.
## What changes were proposed in this pull request?
In method *CoarseGrainedSchedulerBackend.killExecutors()*, `numPendingExecutors` should add
`executorsToKill.size` rather than `knownExecutors.size` if we do not adjust target number of executors.
## How was this patch tested?
N/A
Author: wuyi <ngone_5451@163.com>
Closes#21209 from Ngone51/SPARK-24141.
It was missing the jax-rs annotation.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#21245 from vanzin/SPARK-24188.
Change-Id: Ib338e34b363d7c729cc92202df020dc51033b719
## What changes were proposed in this pull request?
SPARK-24160 is causing a compilation failure (after SPARK-24143 was merged). This fixes the issue.
## How was this patch tested?
building successfully
Author: Marco Gaido <marcogaido91@gmail.com>
Closes#21256 from mgaido91/SPARK-24160_FOLLOWUP.
## What changes were proposed in this pull request?
This patch modifies `ShuffleBlockFetcherIterator` so that the receipt of zero-size blocks is treated as an error. This is done as a preventative measure to guard against a potential source of data loss bugs.
In the shuffle layer, we guarantee that zero-size blocks will never be requested (a block containing zero records is always 0 bytes in size and is marked as empty such that it will never be legitimately requested by executors). However, the existing code does not fully take advantage of this invariant in the shuffle-read path: the existing code did not explicitly check whether blocks are non-zero-size.
Additionally, our decompression and deserialization streams treat zero-size inputs as empty streams rather than errors (EOF might actually be treated as "end-of-stream" in certain layers (longstanding behavior dating to earliest versions of Spark) and decompressors like Snappy may be tolerant to zero-size inputs).
As a result, if some other bug causes legitimate buffers to be replaced with zero-sized buffers (due to corruption on either the send or receive sides) then this would translate into silent data loss rather than an explicit fail-fast error.
This patch addresses this problem by adding a `buf.size != 0` check. See code comments for pointers to tests which guarantee the invariants relied on here.
## How was this patch tested?
Existing tests (which required modifications, since some were creating empty buffers in mocks). I also added a test to make sure we fail on zero-size blocks.
To test that the zero-size blocks are indeed a potential corruption source, I manually ran a workload in `spark-shell` with a modified build which replaces all buffers with zero-size buffers in the receive path.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#21219 from JoshRosen/SPARK-24160.
## What changes were proposed in this pull request?
In current code(`MapOutputTracker.convertMapStatuses`), mapstatus are converted to (blockId, size) pair for all blocks – no matter the block is empty or not, which result in OOM when there are lots of consecutive empty blocks, especially when adaptive execution is enabled.
(blockId, size) pair is only used in `ShuffleBlockFetcherIterator` to control shuffle-read and only non-empty block request is sent. Can we just filter out the empty blocks in MapOutputTracker.convertMapStatuses and save memory?
## How was this patch tested?
not added yet.
Author: jinxing <jinxing6042@126.com>
Closes#21212 from jinxing64/SPARK-24143.
…path and set permissions properly
## What changes were proposed in this pull request?
Spark history server should create spark.history.store.path and set permissions properly. Note createdDirectories doesn't do anything if the directories are already created. This does not stomp on the permissions if the user had manually created the directory before the history server could.
## How was this patch tested?
Manually tested in a 100 node cluster. Ensured directories created with proper permissions. Ensured restarted worked apps/temp directories worked as apps were read.
Author: Thomas Graves <tgraves@thirteenroutine.corp.gq1.yahoo.com>
Closes#21234 from tgravescs/SPARK-24124.
## What changes were proposed in this pull request?
It's possible that Accumulators of Spark 1.x may no longer work with Spark 2.x. This is because `LegacyAccumulatorWrapper.isZero` may return wrong answer if `AccumulableParam` doesn't define equals/hashCode.
This PR fixes this by using reference equality check in `LegacyAccumulatorWrapper.isZero`.
## How was this patch tested?
a new test
Author: Wenchen Fan <wenchen@databricks.com>
Closes#21229 from cloud-fan/accumulator.
Fetch failure lead to multiple tasksets which are active for a given
stage. While there is only one "active" version of the taskset, the
earlier attempts can still have running tasks, which can complete
successfully. So a task completion needs to update every taskset
so that it knows the partition is completed. That way the final active
taskset does not try to submit another task for the same partition,
and so that it knows when it is completed and when it should be
marked as a "zombie".
Added a regression test.
Author: Imran Rashid <irashid@cloudera.com>
Closes#21131 from squito/SPARK-23433.
JIRA Issue: https://issues.apache.org/jira/browse/SPARK-24107?jql=text%20~%20%22ChunkedByteBuffer%22
ChunkedByteBuffer.writeFully method has not reset the limit value. When
chunks larger than bufferWriteChunkSize, such as 80 * 1024 * 1024 larger than
config.BUFFER_WRITE_CHUNK_SIZE(64 * 1024 * 1024),only while once, will lost 16 * 1024 * 1024 byte
Author: WangJinhai02 <jinhai.wang02@ele.me>
Closes#21175 from manbuyun/bugfix-ChunkedByteBuffer.