## What changes were proposed in this pull request?
These three condition descriptions should be updated, follow #23228 :
<li>no Ordering is specified,</li>
<li>no Aggregator is specified, and</li>
<li>the number of partitions is less than
<code>spark.shuffle.sort.bypassMergeThreshold</code>.
</li>
1、If the shuffle dependency specifies aggregation, but it only aggregates at the reduce-side, BypassMergeSortShuffle can still be used.
2、If the number of output partitions is spark.shuffle.sort.bypassMergeThreshold(eg.200), we can use BypassMergeSortShuffle.
## How was this patch tested?
N/A
Closes#23281 from lcqzte10192193/wid-lcq-1211.
Authored-by: lichaoqun <li.chaoqun@zte.com.cn>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
When Kafka delegation token obtained, SCRAM `sasl.mechanism` has to be configured for authentication. This can be configured on the related source/sink which is inconvenient from user perspective. Such granularity is not required and this configuration can be implemented with one central parameter.
In this PR `spark.kafka.sasl.token.mechanism` added to configure this centrally (default: `SCRAM-SHA-512`).
## How was this patch tested?
Existing unit tests + on cluster.
Closes#23274 from gaborgsomogyi/SPARK-26322.
Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
## What changes were proposed in this pull request?
YARN applicationMaster metrics registration introduced in SPARK-24594 causes further registration of static metrics (Codegenerator and HiveExternalCatalog) and of JVM metrics, which I believe do not belong in this context.
This looks like an unintended side effect of using the start method of [[MetricsSystem]].
A possible solution proposed here, is to introduce startNoRegisterSources to avoid these additional registrations of static sources and of JVM sources in the case of YARN applicationMaster metrics (this could be useful for other metrics that may be added in the future).
## How was this patch tested?
Manually tested on a YARN cluster,
Closes#22279 from LucaCanali/YarnMetricsRemoveExtraSourceRegistration.
Lead-authored-by: Luca Canali <luca.canali@cern.ch>
Co-authored-by: LucaCanali <luca.canali@cern.ch>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
## What changes were proposed in this pull request?
Follow up pr for #23207, include following changes:
- Rename `SQLShuffleMetricsReporter` to `SQLShuffleReadMetricsReporter` to make it match with write side naming.
- Display text changes for read side for naming consistent.
- Rename function in `ShuffleWriteProcessor`.
- Delete `private[spark]` in execution package.
## How was this patch tested?
Existing tests.
Closes#23286 from xuanyuanking/SPARK-26193-follow.
Authored-by: Yuanjian Li <xyliyuanjian@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This proposes an alternative way to load secret keys into a Spark application that is running on Kubernetes. Instead of automatically generating the secret, the secret key can reside in a file that is shared between both the driver and executor containers.
Unit tests.
Closes#23252 from mccheah/auth-secret-with-file.
Authored-by: mcheah <mcheah@palantir.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
## What changes were proposed in this pull request?
In `BytesToBytesMap.MapIterator.advanceToNextPage`, We will first lock this `MapIterator` and then `TaskMemoryManager` when going to free a memory page by calling `freePage`. At the same time, it is possibly that another memory consumer first locks `TaskMemoryManager` and then this `MapIterator` when it acquires memory and causes spilling on this `MapIterator`.
So it ends with the `MapIterator` object holds lock to the `MapIterator` object and waits for lock on `TaskMemoryManager`, and the other consumer holds lock to `TaskMemoryManager` and waits for lock on the `MapIterator` object.
To avoid deadlock here, this patch proposes to keep reference to the page to free and free it after releasing the lock of `MapIterator`.
## How was this patch tested?
Added test and manually test by running the test 100 times to make sure there is no deadlock.
Closes#23272 from viirya/SPARK-26265.
Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
… incorrect.
## What changes were proposed in this pull request?
In the reported heartbeat information, the unit of the memory data is bytes, which is converted by the formatBytes() function in the utils.js file before being displayed in the interface. The cardinality of the unit conversion in the formatBytes function is 1000, which should be 1024.
Change the cardinality of the unit conversion in the formatBytes function to 1024.
## How was this patch tested?
manual tests
Please review http://spark.apache.org/contributing.html before opening a pull request.
Closes#22683 from httfighter/SPARK-25696.
Lead-authored-by: 韩田田00222924 <han.tiantian@zte.com.cn>
Co-authored-by: han.tiantian@zte.com.cn <han.tiantian@zte.com.cn>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
This adds the entire memory used by spark’s executor (as measured by procfs) to the executor metrics. The memory usage is collected from the entire process tree under the executor. The metrics are subdivided into memory used by java, by python, and by other processes, to aid users in diagnosing the source of high memory usage.
The additional metrics are sent to the driver in heartbeats, using the mechanism introduced by SPARK-23429. This also slightly extends that approach to allow one ExecutorMetricType to collect multiple metrics.
Added unit tests and also tested on a live cluster.
Closes#22612 from rezasafi/ptreememory2.
Authored-by: Reza Safi <rezasafi@cloudera.com>
Signed-off-by: Imran Rashid <irashid@cloudera.com>
## What changes were proposed in this pull request?
`1. The shuffle dependency specifies no aggregation or output ordering.`
If the shuffle dependency specifies aggregation, but it only aggregates at the reduce-side, serialized shuffle can still be used.
`3. The shuffle produces fewer than 16777216 output partitions.`
If the number of output partitions is 16777216 , we can use serialized shuffle.
We can see this mothod: `canUseSerializedShuffle`
## How was this patch tested?
N/A
Closes#23228 from 10110346/SerializedShuffle_doc.
Authored-by: liuxian <liu.xian3@zte.com.cn>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## What changes were proposed in this pull request?
Add MAXIMUM_PAGE_SIZE_BYTES Exception test
## How was this patch tested?
Existing tests
(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)
Please review http://spark.apache.org/contributing.html before opening a pull request.
Closes#23226 from wangjiaochun/BytesToBytesMapSuite.
Authored-by: 10087686 <wang.jiaochun@zte.com.cn>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
## What changes were proposed in this pull request?
If there are no records in memory, then we don't need to create an empty temp spill file.
## How was this patch tested?
Existing tests
(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)
Please review http://spark.apache.org/contributing.html before opening a pull request.
Closes#23225 from wangjiaochun/ShufflSorter.
Authored-by: 10087686 <wang.jiaochun@zte.com.cn>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
## What changes were proposed in this pull request?
Root cause: Prior to Spark2.4, When we enable zst for eventLog compression, for inprogress application, It always throws exception in the Application UI, when we open from the history server. But after 2.4 it will display the UI information based on the completed frames in the zstd compressed eventLog. But doesn't read incomplete frames for inprogress application.
In this PR, we have added 'setContinous(true)' for reading input stream from eventLog, so that it can read from open frames also. (By default 'isContinous=false' for zstd inputStream and when we try to read an open frame, it throws truncated error)
## How was this patch tested?
Test steps:
1) Add the configurations in the spark-defaults.conf
(i) spark.eventLog.compress true
(ii) spark.io.compression.codec zstd
2) Restart history server
3) bin/spark-shell
4) sc.parallelize(1 to 1000, 1000).count
5) Open app UI from the history server UI
**Before fix**
![screenshot from 2018-12-06 00-01-38](https://user-images.githubusercontent.com/23054875/49537340-bfe28b00-f8ee-11e8-9fca-6d42fdc89e1a.png)
**After fix:**
![screenshot from 2018-12-06 00-34-39](https://user-images.githubusercontent.com/23054875/49537353-ca9d2000-f8ee-11e8-803d-645897b9153b.png)
Closes#23241 from shahidki31/zstdEventLog.
Authored-by: Shahid <shahidki31@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
1. Implement `SQLShuffleWriteMetricsReporter` on the SQL side as the customized `ShuffleWriteMetricsReporter`.
2. Add shuffle write metrics to `ShuffleExchangeExec`, and use these metrics to create corresponding `SQLShuffleWriteMetricsReporter` in shuffle dependency.
3. Rework on `ShuffleMapTask` to add new class named `ShuffleWriteProcessor` which control shuffle write process, we use sql shuffle write metrics by customizing a ShuffleWriteProcessor on SQL side.
## How was this patch tested?
Add UT in SQLMetricsSuite.
Manually test locally, update screen shot to document attached in JIRA.
Closes#23207 from xuanyuanking/SPARK-26193.
Authored-by: Yuanjian Li <xyliyuanjian@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## What changes were proposed in this pull request?
spark.kafka.sasl.kerberos.service.name is an optional parameter but most of the time value `kafka` has to be set. As I've written in the jira the following reasoning is behind:
* Kafka's configuration guide suggest the same value: https://kafka.apache.org/documentation/#security_sasl_kerberos_brokerconfig
* It would be easier for spark users by providing less configuration
* Other streaming engines are doing the same
In this PR I've changed the parameter from optional to `WithDefault` and set `kafka` as default value.
## How was this patch tested?
Available unit tests + on cluster.
Closes#23254 from gaborgsomogyi/SPARK-26304.
Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
## What changes were proposed in this pull request?
Delete unnecessary If statement, because it Impossible execution when
records less than or equal to zero.it is only execution when records begin zero.
...................
if (inMemSorter == null || inMemSorter.numRecords() <= 0) {
return 0L;
}
....................
if (inMemSorter.numRecords() > 0) {
.....................
}
## How was this patch tested?
Existing tests
(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)
Please review http://spark.apache.org/contributing.html before opening a pull request.
Closes#23247 from wangjiaochun/inMemSorter.
Authored-by: 10087686 <wang.jiaochun@zte.com.cn>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
Adds a new method to SparkAppHandle called getError which returns
the exception (if present) that caused the underlying Spark app to
fail.
New tests added to SparkLauncherSuite for the new method.
Closes#21849Closes#23221 from vanzin/SPARK-24243.
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
## What changes were proposed in this pull request?
`enablePerfMetrics `was originally designed in `BytesToBytesMap `to control `getNumHashCollisions getTimeSpentResizingNs getAverageProbesPerLookup`.
However, as the Spark version gradual progress. this parameter is only used for `getAverageProbesPerLookup ` and always given to true when using `BytesToBytesMap`.
it is also dangerous to determine whether `getAverageProbesPerLookup `opens and throws an `IllegalStateException `exception.
So this pr will be remove `enablePerfMetrics `parameter from `BytesToBytesMap`. thanks.
## How was this patch tested?
the existed test cases.
Closes#23244 from heary-cao/enablePerfMetrics.
Authored-by: caoxuewen <cao.xuewen@zte.com.cn>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This change modifies the logic in the SecurityManager to do two
things:
- generate unique app secrets also when k8s is being used
- only store the secret in the user's UGI on YARN
The latter is needed so that k8s won't unnecessarily create
k8s secrets for the UGI credentials when only the auth token
is stored there.
On the k8s side, the secret is propagated to executors using
an environment variable instead. This ensures it works in both
client and cluster mode.
Security doc was updated to mention the feature and clarify that
proper access control in k8s should be enabled for it to be secure.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#23174 from vanzin/SPARK-26194.
## What changes were proposed in this pull request?
We explicitly avoid files with hdfs erasure coding for the streaming WAL
and for event logs, as hdfs EC does not support all relevant apis.
However, the new builder api used has different semantics -- it does not
create parent dirs, and it does not resolve relative paths. This
updates createNonEcFile to have similar semantics to the old api.
## How was this patch tested?
Ran tests with the WAL pointed at a non-existent dir, which failed before this change. Manually tested the new function with a relative path as well.
Unit tests via jenkins.
Closes#23092 from squito/SPARK-26094.
Authored-by: Imran Rashid <irashid@cloudera.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
## What changes were proposed in this pull request?
In my local setup, I set log4j root category as ERROR (https://stackoverflow.com/questions/27781187/how-to-stop-info-messages-displaying-on-spark-console , first item show up if we google search "set spark log level".) When I run such command
```
spark-submit --class foo bar.jar
```
Nothing shows up, and the script exits.
After quick investigation, I think the log level for ClassNotFoundException/NoClassDefFoundError in SparkSubmit should be ERROR instead of WARN. Since the whole process exit because of the exception/error.
Before https://github.com/apache/spark/pull/20925, the message is not controlled by `log4j.rootCategory`.
## How was this patch tested?
Manual check.
Closes#23189 from gengliangwang/changeLogLevel.
Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
Correct some document description errors.
## How was this patch tested?
N/A
Closes#23162 from 10110346/docerror.
Authored-by: liuxian <liu.xian3@zte.com.cn>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
Currently, the common `withTempDir` function is used in Spark SQL test cases. To handle `val dir = Utils. createTempDir()` and `Utils. deleteRecursively (dir)`. Unfortunately, the `withTempDir` function cannot be used in the Spark Core test case. This PR Sharing `withTempDir` function in Spark Sql and SparkCore to clean up SparkCore test cases. thanks.
## How was this patch tested?
N / A
Closes#23151 from heary-cao/withCreateTempDir.
Authored-by: caoxuewen <cao.xuewen@zte.com.cn>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
## What changes were proposed in this pull request?
Python with rpc and disk encryption enabled along with a python broadcast variable and just read the value back on the driver side the job failed with:
Traceback (most recent call last): File "broadcast.py", line 37, in <module> words_new.value File "/pyspark.zip/pyspark/broadcast.py", line 137, in value File "pyspark.zip/pyspark/broadcast.py", line 122, in load_from_path File "pyspark.zip/pyspark/broadcast.py", line 128, in load EOFError: Ran out of input
To reproduce use configs: --conf spark.network.crypto.enabled=true --conf spark.io.encryption.enabled=true
Code:
words_new = sc.broadcast(["scala", "java", "hadoop", "spark", "akka"])
words_new.value
print(words_new.value)
## How was this patch tested?
words_new = sc.broadcast([“scala”, “java”, “hadoop”, “spark”, “akka”])
textFile = sc.textFile(“README.md”)
wordCounts = textFile.flatMap(lambda line: line.split()).map(lambda word: (word + words_new.value[1], 1)).reduceByKey(lambda a, b: a+b)
count = wordCounts.count()
print(count)
words_new.value
print(words_new.value)
Closes#23166 from redsanket/SPARK-26201.
Authored-by: schintap <schintap@oath.com>
Signed-off-by: Thomas Graves <tgraves@apache.org>
## What changes were proposed in this pull request?
It adds kafka delegation token support for structured streaming. Please see the relevant [SPIP](https://docs.google.com/document/d/1ouRayzaJf_N5VQtGhVq9FURXVmRpXzEEWYHob0ne3NY/edit?usp=sharing)
What this PR contains:
* Configuration parameters for the feature
* Delegation token fetching from broker
* Usage of token through dynamic JAAS configuration
* Minor refactoring in the existing code
What this PR doesn't contain:
* Documentation changes because design can change
## How was this patch tested?
Existing tests + added small amount of additional unit tests.
Because it's an external service integration mainly tested on cluster.
* 4 node cluster
* Kafka broker version 1.1.0
* Topic with 4 partitions
* security.protocol = SASL_SSL
* sasl.mechanism = SCRAM-SHA-256
An example of obtaining a token:
```
18/10/01 01:07:49 INFO kafka010.TokenUtil: TOKENID HMAC OWNER RENEWERS ISSUEDATE EXPIRYDATE MAXDATE
18/10/01 01:07:49 INFO kafka010.TokenUtil: D1-v__Q5T_uHx55rW16Jwg [hidden] User:user [] 2018-10-01T01:07 2018-10-02T01:07 2018-10-08T01:07
18/10/01 01:07:49 INFO security.KafkaDelegationTokenProvider: Get token from Kafka: Kind: KAFKA_DELEGATION_TOKEN, Service: kafka.server.delegation.token, Ident: 44 31 2d 76 5f 5f 51 35 54 5f 75 48 78 35 35 72 57 31 36 4a 77 67
```
An example token usage:
```
18/10/01 01:08:07 INFO kafka010.KafkaSecurityHelper: Scram JAAS params: org.apache.kafka.common.security.scram.ScramLoginModule required tokenauth=true serviceName="kafka" username="D1-v__Q5T_uHx55rW16Jwg" password="[hidden]";
18/10/01 01:08:07 INFO kafka010.KafkaSourceProvider: Delegation token detected, using it for login.
```
Closes#22598 from gaborgsomogyi/SPARK-25501.
Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
## What changes were proposed in this pull request?
In `BlockManager`, `getRemoteValues` gets a `ChunkedByteBuffer` (by calling `getRemoteBytes`) and creates an `InputStream` from it. `getRemoteBytes`, in turn, gets a `ManagedBuffer` and converts it to a `ChunkedByteBuffer`.
Instead, expose a `getRemoteManagedBuffer` method so `getRemoteValues` can just get this `ManagedBuffer` and use its `InputStream`.
When reading a remote cache block from disk, this reduces heap memory usage significantly.
Retain `getRemoteBytes` for other callers.
## How was this patch tested?
Imran Rashid wrote an application (https://github.com/squito/spark_2gb_test/blob/master/src/main/scala/com/cloudera/sparktest/LargeBlocks.scala), that among other things, tests reading remote cache blocks. I ran this application, using 2500MB blocks, to test reading a cache block on disk. Without this change, with `--executor-memory 5g`, the test fails with `java.lang.OutOfMemoryError: Java heap space`. With the change, the test passes with `--executor-memory 2g`.
I also ran the unit tests in core. In particular, `DistributedSuite` has a set of tests that exercise the `getRemoteValues` code path. `BlockManagerSuite` has several tests that call `getRemoteBytes`; I left these unchanged, so `getRemoteBytes` still gets exercised.
Closes#23058 from wypoon/SPARK-25905.
Authored-by: Wing Yew Poon <wypoon@cloudera.com>
Signed-off-by: Imran Rashid <irashid@cloudera.com>
## What changes were proposed in this pull request?
This PR changes the broadcast object in TorrentBroadcast from a strong reference to a weak reference. This allows it to be garbage collected even if the Dataset is held in memory. This is ok, because the broadcast object can always be re-read.
## How was this patch tested?
Tested in Spark shell by taking a heap dump, full repro steps listed in https://issues.apache.org/jira/browse/SPARK-25998.
Closes#22995 from bkrieger/bk/torrent-broadcast-weak.
Authored-by: Brandon Krieger <bkrieger@palantir.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
… of hard coded "/" in DependencyUtils
## What changes were proposed in this pull request?
Use Java system property "file.separator" instead of hard coded "/" in DependencyUtils.
## How was this patch tested?
Manual test:
Submit Spark application via REST API that reads data from Elasticsearch using spark-elasticsearch library.
Without fix application fails with error:
18/11/22 10:36:20 ERROR Version: Multiple ES-Hadoop versions detected in the classpath; please use only one
jar:file:/C:/<...>/spark-2.4.0-bin-hadoop2.6/work/driver-20181122103610-0001/myApp-assembly-1.0.jar
jar:file:/C:/<...>/myApp-assembly-1.0.jar
18/11/22 10:36:20 ERROR Main: Application [MyApp] failed:
java.lang.Error: Multiple ES-Hadoop versions detected in the classpath; please use only one
jar:file:/C:/<...>/spark-2.4.0-bin-hadoop2.6/work/driver-20181122103610-0001/myApp-assembly-1.0.jar
jar:file:/C:/<...>/myApp-assembly-1.0.jar
at org.elasticsearch.hadoop.util.Version.<clinit>(Version.java:73)
at org.elasticsearch.hadoop.rest.RestService.findPartitions(RestService.java:214)
at org.elasticsearch.spark.rdd.AbstractEsRDD.esPartitions$lzycompute(AbstractEsRDD.scala:73)
at org.elasticsearch.spark.rdd.AbstractEsRDD.esPartitions(AbstractEsRDD.scala:72)
at org.elasticsearch.spark.rdd.AbstractEsRDD.getPartitions(AbstractEsRDD.scala:44)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
...
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:65)
at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
With fix application runs successfully.
Closes#23102 from markpavey/JIRA_SPARK-26137_DependencyUtilsFileSeparatorFix.
Authored-by: Mark Pavey <markpavey@exabre.co.uk>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
This pull request fixes [SPARK-26114](https://issues.apache.org/jira/browse/SPARK-26114) issue that occurs when trying to reduce the number of partitions by means of coalesce without shuffling after shuffle-based transformations.
The leak occurs because of not cleaning up `ExternalSorter`'s `readingIterator` field as it's done for its `map` and `buffer` fields.
Additionally there are changes to the `CompletionIterator` to prevent capturing its `sub`-iterator and holding it even after the completion iterator completes. It is necessary because in some cases, e.g. in case of standard scala's `flatMap` iterator (which is used is `CoalescedRDD`'s `compute` method) the next value of the main iterator is assigned to `flatMap`'s `cur` field only after it is available.
For DAGs where ShuffledRDD is a parent of CoalescedRDD it means that the data should be fetched from the map-side of the shuffle, but the process of fetching this data consumes quite a lot of memory in addition to the memory already consumed by the iterator held by `flatMap`'s `cur` field (until it is reassigned).
For the following data
```scala
import org.apache.hadoop.io._
import org.apache.hadoop.io.compress._
import org.apache.commons.lang._
import org.apache.spark._
// generate 100M records of sample data
sc.makeRDD(1 to 1000, 1000)
.flatMap(item => (1 to 100000)
.map(i => new Text(RandomStringUtils.randomAlphanumeric(3).toLowerCase) -> new Text(RandomStringUtils.randomAlphanumeric(1024))))
.saveAsSequenceFile("/tmp/random-strings", Some(classOf[GzipCodec]))
```
and the following job
```scala
import org.apache.hadoop.io._
import org.apache.spark._
import org.apache.spark.storage._
val rdd = sc.sequenceFile("/tmp/random-strings", classOf[Text], classOf[Text])
rdd
.map(item => item._1.toString -> item._2.toString)
.repartitionAndSortWithinPartitions(new HashPartitioner(1000))
.coalesce(10,false)
.count
```
... executed like the following
```bash
spark-shell \
--num-executors=5 \
--executor-cores=2 \
--master=yarn \
--deploy-mode=client \
--conf spark.executor.memoryOverhead=512 \
--conf spark.executor.memory=1g \
--conf spark.dynamicAllocation.enabled=false \
--conf spark.executor.extraJavaOptions='-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp -Dio.netty.noUnsafe=true'
```
... executors are always failing with OutOfMemoryErrors.
The main issue is multiple leaks of ExternalSorter references.
For example, in case of 2 tasks per executor it is expected to be 2 simultaneous instances of ExternalSorter per executor but heap dump generated on OutOfMemoryError shows that there are more ones.
![run1-noparams-dominator-tree-externalsorter](https://user-images.githubusercontent.com/1523889/48703665-782ce580-ec05-11e8-95a9-d6c94e8285ab.png)
P.S. This PR does not cover cases with CoGroupedRDDs which use ExternalAppendOnlyMap internally, which itself can lead to OutOfMemoryErrors in many places.
## How was this patch tested?
- Existing unit tests
- New unit tests
- Job executions on the live environment
Here is the screenshot before applying this patch
![run3-noparams-failure-ui-5x2-repartition-and-sort](https://user-images.githubusercontent.com/1523889/48700395-f769eb80-ebfc-11e8-831b-e94c757d416c.png)
Here is the screenshot after applying this patch
![run3-noparams-success-ui-5x2-repartition-and-sort](https://user-images.githubusercontent.com/1523889/48700610-7a8b4180-ebfd-11e8-9761-baaf38a58e66.png)
And in case of reducing the number of executors even more the job is still stable
![run3-noparams-success-ui-2x2-repartition-and-sort](https://user-images.githubusercontent.com/1523889/48700619-82e37c80-ebfd-11e8-98ed-a38e1f1f1fd9.png)
Closes#23083 from szhem/SPARK-26114-externalsorter-leak.
Authored-by: Sergey Zhemzhitsky <szhemzhitski@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## What changes were proposed in this pull request?
This is the write side counterpart to https://github.com/apache/spark/pull/23105
## How was this patch tested?
No behavior change expected, as it is a straightforward refactoring. Updated all existing test cases.
Closes#23106 from rxin/SPARK-26141.
Authored-by: Reynold Xin <rxin@databricks.com>
Signed-off-by: Reynold Xin <rxin@databricks.com>
## What changes were proposed in this pull request?
In https://github.com/apache/spark/pull/23105, due to working on two parallel PRs at once, I made the mistake of committing the copy of the PR that used the name ShuffleMetricsReporter for the interface, rather than the appropriate one ShuffleReadMetricsReporter. This patch fixes that.
## How was this patch tested?
This should be fine as long as compilation passes.
Closes#23147 from rxin/ShuffleReadMetricsReporter.
Authored-by: Reynold Xin <rxin@databricks.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
Total tasks in the aggregated table and the tasks table are not matching some times in the WEBUI.
We need to force update the executor summary of the particular executorId, when ever last task of that executor has reached. Currently it force update based on last task on the stage end. So, for some particular executorId task might miss at the stage end.
Tests to reproduce:
```
bin/spark-shell --master yarn --conf spark.executor.instances=3
sc.parallelize(1 to 10000, 10).map{ x => throw new RuntimeException("Bad executor")}.collect()
```
Before patch:
![screenshot from 2018-11-15 02-24-05](https://user-images.githubusercontent.com/23054875/48511776-b0d36480-e87d-11e8-89a8-ab97216e2c21.png)
After patch:
![screenshot from 2018-11-15 02-32-38](https://user-images.githubusercontent.com/23054875/48512141-c39a6900-e87e-11e8-8535-903e1d11d13e.png)
Closes#23038 from shahidki31/SPARK-25451.
Authored-by: Shahid <shahidki31@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
Support column sort, pagination and search for Stage Page using jQuery DataTable and REST API. Before this commit, the Stage page generated a hard-coded HTML table that could not support search. Supporting search and sort (over all applications rather than the 20 entries in the current page) in any case will greatly improve the user experience.
Created the stagespage-template.html for displaying application information in datables. Added REST api endpoint and javascript code to fetch data from the endpoint and display it on the data table.
Because of the above change, certain functionalities in the page had to be modified to support the addition of datatables. For example, the toggle checkbox 'Select All' previously would add the checked fields as columns in the Task table and as rows in the Summary Metrics table, but after the change, only columns are added in the Task Table as it got tricky to add rows dynamically in the datatables.
## How was this patch tested?
I have attached the screenshots of the Stage Page UI before and after the fix.
**Before:**
<img width="1419" alt="30564304-35991e1c-9c8a-11e7-850f-2ac7a347f600" src="https://user-images.githubusercontent.com/22228190/42137915-52054558-7d3a-11e8-8c85-433b2c94161d.png">
<img width="1435" alt="31360592-cbaa2bae-ad14-11e7-941d-95b4c7d14970" src="https://user-images.githubusercontent.com/22228190/42137928-79df500a-7d3a-11e8-9068-5630afe46ff3.png">
**After:**
<img width="1432" alt="31360591-c5650ee4-ad14-11e7-9665-5a08d8f21830" src="https://user-images.githubusercontent.com/22228190/42137936-a3fb9f42-7d3a-11e8-8502-22b3897cbf64.png">
<img width="1388" alt="31360604-d266b6b0-ad14-11e7-94b5-dcc4bb5443f4" src="https://user-images.githubusercontent.com/22228190/42137970-0fabc58c-7d3b-11e8-95ad-383b1bd1f106.png">
Closes#21688 from pgandhi999/SPARK-21809-2.3.
Authored-by: pgandhi <pgandhi@oath.com>
Signed-off-by: Thomas Graves <tgraves@apache.org>
## What changes were proposed in this pull request?
The DOI foundation recommends [this new resolver](https://www.doi.org/doi_handbook/3_Resolution.html#3.8). Accordingly, this PR re`sed`s all static DOI links ;-)
## How was this patch tested?
It wasn't, since it seems as safe as a "[typo fix](https://spark.apache.org/contributing.html)".
In case any of the files is included from other projects, and should be updated there, please let me know.
Closes#23129 from katrinleinweber/resolve-DOIs-securely.
Authored-by: Katrin Leinweber <9948149+katrinleinweber@users.noreply.github.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
In the summary section of stage page:
![image](https://user-images.githubusercontent.com/1097932/48935518-ebef2b00-ef42-11e8-8672-eaa4cac92c5e.png)
1. the following metrics names can be revised:
Output => Output Size / Records
Shuffle Read: => Shuffle Read Size / Records
Shuffle Write => Shuffle Write Size / Records
After changes, the names are more clear, and consistent with the other names in the same page.
2. The associated job id URL should not contain the 3 tails spaces. Reduce the number of spaces to one, and exclude the space from link. This is consistent with SQL execution page.
## How was this patch tested?
Manual check:
![image](https://user-images.githubusercontent.com/1097932/48935538-f7425680-ef42-11e8-8b2a-a4f388d3ea52.png)
Closes#23125 from gengliangwang/reviseStagePage.
Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
`deserialize` for kryo, the type of input parameter is ByteBuffer, if it is not backed by an accessible byte array. it will throw `UnsupportedOperationException`
Exception Info:
```
java.lang.UnsupportedOperationException was thrown.
java.lang.UnsupportedOperationException
at java.nio.ByteBuffer.array(ByteBuffer.java:994)
at org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:362)
```
## How was this patch tested?
Added a unit test
Closes#22779 from 10110346/InputStreamKryo.
Authored-by: liuxian <liu.xian3@zte.com.cn>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
This patch defines an internal Spark interface for reporting shuffle metrics and uses that in shuffle reader. Before this patch, shuffle metrics is tied to a specific implementation (using a thread local temporary data structure and accumulators). After this patch, callers that define their own shuffle RDDs can create a custom metrics implementation.
With this patch, we would be able to create a better metrics for the SQL layer, e.g. reporting shuffle metrics in the SQL UI, for each exchange operator.
Note that I'm separating read side and write side implementations, as they are very different, to simplify code review. Write side change is at https://github.com/apache/spark/pull/23106
## How was this patch tested?
No behavior change expected, as it is a straightforward refactoring. Updated all existing test cases.
Closes#23105 from rxin/SPARK-26140.
Authored-by: Reynold Xin <rxin@databricks.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
## What changes were proposed in this pull request?
the pr #20014 which introduced `SparkOutOfMemoryError` to avoid killing the entire executor when an `OutOfMemoryError `is thrown.
so apply for memory using `MemoryConsumer. allocatePage `when catch exception, use `SparkOutOfMemoryError `instead of `OutOfMemoryError`
## How was this patch tested?
N / A
Closes#23084 from heary-cao/SparkOutOfMemoryError.
Authored-by: caoxuewen <cao.xuewen@zte.com.cn>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## What changes were proposed in this pull request?
This PR is a follow-up PR of #21600 to fix the unnecessary UI redirect.
## How was this patch tested?
Local verification
Closes#23116 from jerryshao/SPARK-24553.
Authored-by: jerryshao <jerryshao@tencent.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
## What changes were proposed in this pull request?
In the PR, I propose:
- new SQL config `spark.sql.debug.maxToStringFields` to control maximum number fields up to which `truncatedString` cuts its input sequences.
- Moving `truncatedString` out of `core` to `sql/catalyst` because it is used only in the `sql/catalyst` packages for restricting number of fields converted to strings from `TreeNode` and expressions of`StructType`.
## How was this patch tested?
Added a test to `QueryExecutionSuite` to check that `spark.sql.debug.maxToStringFields` impacts to behavior of `truncatedString`.
Closes#23039 from MaxGekk/truncated-string-catalyst.
Lead-authored-by: Maxim Gekk <maxim.gekk@databricks.com>
Co-authored-by: Maxim Gekk <max.gekk@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
## What changes were proposed in this pull request?
Task summary table displays the summary of the task table in the stage page. However, the 'Duration' metrics of 'task summary' table and 'task table' are not matching. The reason is because, in the 'task summary' we display 'executorRunTime' as the duration, and in the 'task table' the actual duration of the task. Except duration metrics, all other metrics are properly displaying in the task summary.
In Spark2.2, used to show 'executorRunTime' as duration in the 'taskTable'. That is why, in summary metrics also the 'exeuctorRunTime' shows as the duration. So, we need to show 'executorRunTime' as the duration in the tasks table to follow the same behaviour as the previous versions of spark.
## How was this patch tested?
Before patch:
![screenshot from 2018-11-19 04-32-06](https://user-images.githubusercontent.com/23054875/48679263-1e4fff80-ebb4-11e8-9ed5-16d892039e01.png)
After patch:
![screenshot from 2018-11-19 04-37-39](https://user-images.githubusercontent.com/23054875/48679343-e39a9700-ebb4-11e8-8df9-9dc3a28d4bce.png)
Closes#23081 from shahidki31/duratinSummary.
Authored-by: Shahid <shahidki31@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
Hotfix a change to SparkHadoopUtil that doesn't work in 2.11
## How was this patch tested?
Existing tests.
Closes#23097 from srowen/SPARK-26043.2.
Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
Introducing spark.ui.requestHeaderSize for configuring Jetty's HTTP requestHeaderSize.
This way long authorization field does not lead to HTTP 413.
## How was this patch tested?
Manually with curl (which version must be at least 7.55).
With the original default value (8k limit):
```bash
# Starting history server with default requestHeaderSize
$ ./sbin/start-history-server.sh
starting org.apache.spark.deploy.history.HistoryServer, logging to /Users/attilapiros/github/spark/logs/spark-attilapiros-org.apache.spark.deploy.history.HistoryServer-1-apiros-MBP.lan.out
# Creating huge header
$ echo -n "X-Custom-Header: " > cookie
$ printf 'A%.0s' {1..9500} >> cookie
# HTTP GET with huge header fails with 431
$ curl -H cookie http://458apiros-MBP.lan:18080/
<h1>Bad Message 431</h1><pre>reason: Request Header Fields Too Large</pre>
# The log contains the error
$ tail -1 /Users/attilapiros/github/spark/logs/spark-attilapiros-org.apache.spark.deploy.history.HistoryServer-1-apiros-MBP.lan.out
18/11/19 21:24:28 WARN HttpParser: Header is too large 8193>8192
```
After:
```bash
# Creating the history properties file with the increased requestHeaderSize
$ echo spark.ui.requestHeaderSize=10000 > history.properties
# Starting Spark History Server with the settings
$ ./sbin/start-history-server.sh --properties-file history.properties
starting org.apache.spark.deploy.history.HistoryServer, logging to /Users/attilapiros/github/spark/logs/spark-attilapiros-org.apache.spark.deploy.history.HistoryServer-1-apiros-MBP.lan.out
# HTTP GET with huge header gives back HTML5 (I have added here only just a part of the response)
$ curl -H cookie http://458apiros-MBP.lan:18080/
<!DOCTYPE html><html>
<head>...
<link rel="shortcut icon" href="/static/spark-logo-77x50px-hd.png"></link>
<title>History Server</title>
</head>
<body>
...
```
Closes#23090 from attilapiros/JettyHeaderSize.
Authored-by: “attilapiros” <piros.attila.zsolt@gmail.com>
Signed-off-by: Imran Rashid <irashid@cloudera.com>
## What changes were proposed in this pull request?
The build has a lot of deprecation warnings. Some are new in Scala 2.12 and Java 11. We've fixed some, but I wanted to take a pass at fixing lots of easy miscellaneous ones here.
They're too numerous and small to list here; see the pull request. Some highlights:
- `BeanInfo` is deprecated in 2.12, and BeanInfo classes are pretty ancient in Java. Instead, case classes can explicitly declare getters
- Eta expansion of zero-arg methods; foo() becomes () => foo() in many cases
- Floating-point Range is inexact and deprecated, like 0.0 to 100.0 by 1.0
- finalize() is finally deprecated (just needs to be suppressed)
- StageInfo.attempId was deprecated and easiest to remove here
I'm not now going to touch some chunks of deprecation warnings:
- Parquet deprecations
- Hive deprecations (particularly serde2 classes)
- Deprecations in generated code (mostly Thriftserver CLI)
- ProcessingTime deprecations (we may need to revive this class as internal)
- many MLlib deprecations because they concern methods that may be removed anyway
- a few Kinesis deprecations I couldn't figure out
- Mesos get/setRole, which I don't know well
- Kafka/ZK deprecations (e.g. poll())
- Kinesis
- a few other ones that will probably resolve by deleting a deprecated method
## How was this patch tested?
Existing tests, including manual testing with the 2.11 build and Java 11.
Closes#23065 from srowen/SPARK-26090.
Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
Empty chunk in ChunkedByteBuffer will truncate the ChunkedByteBufferInputStream.
The detail reason is described in: https://issues.apache.org/jira/browse/SPARK-26068
## How was this patch tested?
Modified current UT to cover this case.
Closes#23040 from LinhongLiu/fix-empty-chunked-byte-buffer.
Lead-authored-by: Liu,Linhong <liulinhong@baidu.com>
Co-authored-by: Xianjin YE <yexianjin@baidu.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>