## What changes were proposed in this pull request?
spark.task.cpus should be less or equal than spark.executor.cores when use static executor allocation
## How was this patch tested?
manual
Closes#24131 from liutang123/SPARK-27192.
Authored-by: liulijia <liutang123@yeah.net>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
When logConf is set to true, config keys that contain password were printed in cleartext in driver log. This change uses the already present redact method in Utils, to redact all the passwords based on redact pattern in SparkConf and then print the conf to driver log thus ensuring that sensitive information like passwords is not printed in clear text.
## How was this patch tested?
This patch was tested through `SparkConfSuite` & then entire unit test through sbt
Please review http://spark.apache.org/contributing.html before opening a pull request.
Closes#24196 from ninadingole/SPARK-27244.
Authored-by: Ninad Ingole <robert.wallis@example.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
I was playing with the scheduler and found this weird thing. In `TaskSchedulerImpl` we import `scala.collection.Set` without any reason. This is bad in practice, as it silently changes the actual class when we simply type `Set`, which by default should point to the immutable set.
This change only affects one method: `getExecutorsAliveOnHost`. I checked all the caller side and none of them need a general `Set` type.
## How was this patch tested?
N/A
Closes#24231 from cloud-fan/minor.
Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
## What changes were proposed in this pull request?
Raise the threshold size for serialized task size at which a warning is generated from 100KiB to 1000KiB.
As several people have noted, the original change for this JIRA highlighted that this threshold is low. Test output regularly shows:
```
- sorting on StringType with nullable=false, sortOrder=List('a DESC NULLS LAST)
22:47:53.320 WARN org.apache.spark.scheduler.TaskSetManager: Stage 80 contains a task of very large size (755 KiB). The maximum recommended task size is 100 KiB.
22:47:53.348 WARN org.apache.spark.scheduler.TaskSetManager: Stage 81 contains a task of very large size (755 KiB). The maximum recommended task size is 100 KiB.
22:47:53.417 WARN org.apache.spark.scheduler.TaskSetManager: Stage 83 contains a task of very large size (755 KiB). The maximum recommended task size is 100 KiB.
22:47:53.444 WARN org.apache.spark.scheduler.TaskSetManager: Stage 84 contains a task of very large size (755 KiB). The maximum recommended task size is 100 KiB.
...
- SPARK-20688: correctly check analysis for scalar sub-queries
22:49:10.314 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 150.8 KiB
- SPARK-21835: Join in correlated subquery should be duplicateResolved: case 1
22:49:10.595 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 150.7 KiB
22:49:10.744 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 150.7 KiB
22:49:10.894 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 150.7 KiB
- SPARK-21835: Join in correlated subquery should be duplicateResolved: case 2
- SPARK-21835: Join in correlated subquery should be duplicateResolved: case 3
- SPARK-23316: AnalysisException after max iteration reached for IN query
22:49:11.559 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 154.2 KiB
```
It seems that a larger threshold of about 1MB is more suitable.
## How was this patch tested?
Existing tests.
Closes#24226 from srowen/SPARK-26660.2.
Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
## What changes were proposed in this pull request?
As per https://docs.oracle.com/javase/8/docs/api/java/lang/ClassLoader.html
``Class loaders that support concurrent loading of classes are known as parallel capable class loaders and are required to register themselves at their class initialization time by invoking the ClassLoader.registerAsParallelCapable method. Note that the ClassLoader class is registered as parallel capable by default. However, its subclasses still need to register themselves if they are parallel capable. ``
i.e we can have finer class loading locks by registering classloaders as parallel capable. (Refer to deadlock due to macro lock https://issues.apache.org/jira/browse/SPARK-26961).
All the classloaders we have are wrapper of URLClassLoader which by itself is parallel capable.
But this cannot be achieved by scala code due to static registration Refer https://github.com/scala/bug/issues/11429
## How was this patch tested?
All Existing UT must pass
Closes#24126 from ajithme/driverlock.
Authored-by: Ajith <ajith2489@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
Currently, if we want to configure `spark.sql.files.maxPartitionBytes` to 256 megabytes, we must set `spark.sql.files.maxPartitionBytes=268435456`, which is very unfriendly to users.
And if we set it like this:`spark.sql.files.maxPartitionBytes=256M`, we will encounter this exception:
```
Exception in thread "main" java.lang.IllegalArgumentException:
spark.sql.files.maxPartitionBytes should be long, but was 256M
at org.apache.spark.internal.config.ConfigHelpers$.toNumber(ConfigBuilder.scala)
```
This PR use `bytesConf` to replace `longConf` or `intConf`, if the configuration is used to set the number of bytes.
Configuration change list:
`spark.files.maxPartitionBytes`
`spark.files.openCostInBytes`
`spark.shuffle.sort.initialBufferSize`
`spark.shuffle.spill.initialMemoryThreshold`
`spark.sql.autoBroadcastJoinThreshold`
`spark.sql.files.maxPartitionBytes`
`spark.sql.files.openCostInBytes`
`spark.sql.defaultSizeInBytes`
## How was this patch tested?
1.Existing unit tests
2.Manual testing
Closes#24187 from 10110346/bytesConf.
Authored-by: liuxian <liu.xian3@zte.com.cn>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## What changes were proposed in this pull request?
This applies some minor updates/cleaning following up SPARK-26928, notably renaming JVMCPU.scala to JVMCPUSource.scala.
## How was this patch tested?
Manually tested
Closes#24201 from LucaCanali/fixupSPARK-26928.
Authored-by: Luca Canali <luca.canali@cern.ch>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
Remove Scala 2.11 support in build files and docs, and in various parts of code that accommodated 2.11. See some targeted comments below.
## How was this patch tested?
Existing tests.
Closes#23098 from srowen/SPARK-26132.
Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
The hashSeed method allocates 64 bytes instead of 8. Other bytes are always zeros (thanks to default behavior of ByteBuffer). And they could be excluded from hash calculation because they don't differentiate inputs.
## How was this patch tested?
By running the existing tests - XORShiftRandomSuite
Closes#20793 from MaxGekk/hash-buff-size.
Lead-authored-by: Maxim Gekk <maxim.gekk@databricks.com>
Co-authored-by: Maxim Gekk <max.gekk@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
For [SPARK-27184](https://issues.apache.org/jira/browse/SPARK-27184)
In the `org.apache.spark.internal.config`, we define the variables of `FILES` and `JARS`, we can use them instead of "spark.jars" and "spark.files".
```scala
private[spark] val JARS = ConfigBuilder("spark.jars")
.stringConf
.toSequence
.createWithDefault(Nil)
```
```scala
private[spark] val FILES = ConfigBuilder("spark.files")
.stringConf
.toSequence
.createWithDefault(Nil)
```
Other :
In the `org.apache.spark.internal.config`, we define the variables of `SUBMIT_PYTHON_FILES ` and `SUBMIT_DEPLOY_MODE `, we can use them instead of "spark.submit.pyFiles" and "spark.submit.deployMode".
```scala
private[spark] val SUBMIT_PYTHON_FILES = ConfigBuilder("spark.submit.pyFiles")
.stringConf
.toSequence
.createWithDefault(Nil)
```
```scala
private[spark] val SUBMIT_DEPLOY_MODE = ConfigBuilder("spark.submit.deployMode")
.stringConf
.createWithDefault("client")
```
Closes#24123 from hehuiyuan/hehuiyuan-patch-6.
Authored-by: hehuiyuan <hehuiyuan@ZBMAC-C02WD3K5H.local>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
## What changes were proposed in this pull request?
This patch fixes the issue that ClientEndpoint in standalone cluster doesn't recognize about driver options which are passed to SparkConf instead of system properties. When `Client` is executed via cli they should be provided as system properties, but with `spark-submit` they can be provided as SparkConf. (SpartSubmit will call `ClientApp.start` with SparkConf which would contain these options.)
## How was this patch tested?
Manually tested via following steps:
1) setup standalone cluster (launch master and worker via `./sbin/start-all.sh`)
2) submit one of example app with standalone cluster mode
```
./bin/spark-submit --class org.apache.spark.examples.SparkPi --master "spark://localhost:7077" --conf "spark.driver.extraJavaOptions=-Dfoo=BAR" --deploy-mode "cluster" --num-executors 1 --driver-memory 512m --executor-memory 512m --executor-cores 1 examples/jars/spark-examples*.jar 10
```
3) check whether `foo=BAR` is provided in system properties in Spark UI
<img width="877" alt="Screen Shot 2019-03-21 at 8 18 04 AM" src="https://user-images.githubusercontent.com/1317309/54728501-97db1700-4bc1-11e9-89da-078445c71e9b.png">
Closes#24163 from HeartSaVioR/SPARK-26606.
Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
## What changes were proposed in this pull request?
This patch modifies Utils.classForName to have optional parameters - initialize, noSparkClassLoader - to let callers of Class.forName with thread context classloader to use it instead. This helps to reduce scalastyle off for Class.forName.
## How was this patch tested?
Existing UTs.
Closes#24148 from HeartSaVioR/MINOR-reduce-scalastyle-off-for-class-forname.
Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
Refactored code in tests regarding the "withLogAppender()" pattern by creating a general helper method in SparkFunSuite.
## How was this patch tested?
Passed existing tests.
Closes#24172 from maryannxue/log-appender.
Authored-by: maryannxue <maryannxue@apache.org>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
## What changes were proposed in this pull request?
[SPARK-26977](https://issues.apache.org/jira/browse/SPARK-26977) introduced very strange bug which spark-shell is no longer able to load classes which are provided via `--packages`. TBH I don't know about the details why it is broken, but looks like initializing `object class` brings the weirdness (maybe due to static initialization done twice?).
This patch removes the logic to leave warning log when main class is scala.App, to not deal with such complexity for just leaving warning message.
## How was this patch tested?
Manual test: suppose we run spark-shell with `--packages` option like below:
```
./bin/spark-shell --verbose --master "local[*]" --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0
```
Before this patch, importing class in transitive dependency fails:
```
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://localhost:4040
Spark context available as 'sc' (master = local[*], app id = local-1553005771597).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 3.0.0-SNAPSHOT
/_/
Using Scala version 2.12.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_191)
Type in expressions to have them evaluated.
Type :help for more information.
scala> import org.apache.kafka
<console>:23: error: object kafka is not a member of package org.apache
import org.apache.kafka
```
After this patch, importing class in transitive dependency succeeds:
```
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://localhost:4040
Spark context available as 'sc' (master = local[*], app id = local-1553004095542).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 3.0.0-SNAPSHOT
/_/
Using Scala version 2.12.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_191)
Type in expressions to have them evaluated.
Type :help for more information.
scala> import org.apache.kafka
import org.apache.kafka
```
Closes#24147 from HeartSaVioR/SPARK-27205.
Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
```scala
val KRYO_USE_UNSAFE = ConfigBuilder("spark.kyro.unsafe")
.booleanConf
.createWithDefault(false)
val KRYO_USE_POOL = ConfigBuilder("spark.kyro.pool")
.booleanConf
.createWithDefault(true)
```
**kyro should be kryo**
## How was this patch tested?
no need
Closes#24156 from LantaoJin/SPARK-27215.
Authored-by: Lantao Jin <jinlantao@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
Environment Page in SparkUI have all the configuration sorted by key. But this is not the case in History server case, to keep UX same, we can have it sorted in history server too
## What changes were proposed in this pull request?
On render of Env page the properties are sorted before creating page
## How was this patch tested?
Manually tested in UI
Closes#24143 from ajithme/historyenv.
Authored-by: Ajith <ajith2489@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
## What changes were proposed in this pull request?
As we all know that spark on Yarn uses DB https://github.com/apache/spark/pull/7943 to record RegisteredExecutors information which can be reloaded and used again when the ExternalShuffleService is restarted .
The RegisteredExecutors information can't be recorded both in the mode of spark's standalone and spark on k8s , which will cause the RegisteredExecutors information to be lost ,when the ExternalShuffleService is restarted.
To solve the problem above, a method is proposed and is committed .
## How was this patch tested?
new unit tests
Closes#23393 from weixiuli/SPARK-26288.
Authored-by: weixiuli <weixiuli@jd.com>
Signed-off-by: Imran Rashid <irashid@cloudera.com>
…when trying to kill executors either due to dynamic allocation or blacklisting
## What changes were proposed in this pull request?
There are two deadlocks as a result of the interplay between three different threads:
**task-result-getter thread**
**spark-dynamic-executor-allocation thread**
**dispatcher-event-loop thread(makeOffers())**
The fix ensures ordering synchronization constraint by acquiring lock on `TaskSchedulerImpl` before acquiring lock on `CoarseGrainedSchedulerBackend` in `makeOffers()` as well as killExecutors() method. This ensures resource ordering between the threads and thus, fixes the deadlocks.
## How was this patch tested?
Manual Tests
Closes#24072 from pgandhi999/SPARK-27112-2.
Authored-by: pgandhi <pgandhi@verizonmedia.com>
Signed-off-by: Imran Rashid <irashid@cloudera.com>
## What changes were proposed in this pull request?
When Result stage has zero tasks, the Job End event is never fired, hence the Job is always running in UI. Example: sc.emptyRDD[Int].countApprox(1000) never finishes even it has no tasks to launch
## How was this patch tested?
Added UT
Closes#24100 from ajithme/emptyRDD.
Authored-by: Ajith <ajith2489@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
This time tested against Scala 2.11 as well
Closes#24116 from fitermay/master.
Authored-by: fitermay <fiterman@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
When we run YarnSchedulerBackendSuite, the class path seems to be made from the classes folder(resource-managers/yarn/target/scala-2.12/classes) instead of jar (resource-managers/yarn/target/spark-yarn_2.12-3.0.0-SNAPSHOT.jar) . ui.getHandlers is in spark-core and its loaded from spark-core.jar which is shaded and hence refers to org.spark_project.jetty.servlet.ServletContextHandler
Here in org.apache.spark.scheduler.cluster.YarnSchedulerBackend, as its not shaded, it expects org.eclipse.jetty.servlet.ServletContextHandler
Refer discussion https://issues.apache.org/jira/browse/SPARK-27122?focusedCommentId=16792318&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16792318
Hence as a fix, org.apache.spark.ui.WebUI must only return a wrapper class instance or references so that Jetty classes can be avoided in getters which are accessed outside spark-core
## How was this patch tested?
Existing UT can pass
Closes#24088 from ajithme/shadebug.
Authored-by: Ajith <ajith2489@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
this pr add check this spark.diskStore.subDirectories > 0.This value need to be checked before it can be used.
## How was this patch tested?
N/A
Please review http://spark.apache.org/contributing.html before opening a pull request.
Closes#24024 from lcqzte10192193/wid-lcq-190308.
Authored-by: lichaoqun <li.chaoqun@zte.com.cn>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
Currently, when enabled streaming dynamic allocation for streaming applications, the maxNumExecutorFailures in ApplicationMaster is still computed with `spark.dynamicAllocation.maxExecutors`.
Actually, we should consider `spark.streaming.dynamicAllocation.maxExecutors` instead.
Related codes:
f87153a3ac/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala (L101)
## How was this patch tested?
NA
Please review http://spark.apache.org/contributing.html before opening a pull request.
Closes#23845 from liupc/Fix-incorrect-maxNumExecutorFailures-for-streaming.
Lead-authored-by: Liupengcheng <liupengcheng@xiaomi.com>
Co-authored-by: liupengcheng <liupengcheng@xiaomi.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
Executor heartbeat interval is configurable by `"spark.executor.heartbeatInterval"`. But in a comment, heartbeat interval is presented as a constant `10s`. This pr tries to correct the description.
## How was this patch tested?
Existing unit tests.
Closes#24101 from SongYadong/heartbeat_interval_comment.
Authored-by: SongYadong <song.yadong1@zte.com.cn>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
When trying to coalesce a UnionRDD of two large FileScanRDDs
(each with a few million partitions) into around 8k partitions
the driver can stall for over an hour.
Profiler shows that over 90% of the time is spent in TimSort
which is invoked by `pickBin`. This patch replaces sorting with a more
efficient `min` for the purpose of finding the least occupied
PartitionGroup
Closes#23986 from fitermay/SPARK-27070.
Authored-by: fitermay <fiterman@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
The race between org.apache.spark.deploy.DeployMessages.WorkDirCleanup event and org.apache.spark.deploy.worker.Worker#onStop. Here its possible that while the WorkDirCleanup event is being processed, org.apache.spark.deploy.worker.Worker#cleanupThreadExecutor was shutdown. hence any submission after ThreadPoolExecutor will result in java.util.concurrent.RejectedExecutionException
## How was this patch tested?
Manually
Closes#24056 from ajithme/workercleanup.
Authored-by: Ajith <ajith2489@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
This patch changes the schema of url from http to https for bintray spark-packages repository. Looks like we already changed the schema of repository url for pom.xml but missed inside the code.
## How was this patch tested?
Manually ran the `--package` via `./bin/spark-shell --verbose --packages "RedisLabs:spark-redis:0.3.2"`
```
...
Ivy Default Cache set to: /Users/jlim/.ivy2/cache
The jars for the packages stored in: /Users/jlim/.ivy2/jars
:: loading settings :: url = jar:file:/Users/jlim/WorkArea/ScalaProjects/spark/dist/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
RedisLabs#spark-redis added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-2fee2e18-7832-4a4d-9e97-7b3d0fef766d;1.0
confs: [default]
found RedisLabs#spark-redis;0.3.2 in spark-packages
found redis.clients#jedis;2.7.2 in central
found org.apache.commons#commons-pool2;2.3 in central
downloading https://dl.bintray.com/spark-packages/maven/RedisLabs/spark-redis/0.3.2/spark-redis-0.3.2.jar ...
[SUCCESSFUL ] RedisLabs#spark-redis;0.3.2!spark-redis.jar (824ms)
downloading https://repo1.maven.org/maven2/redis/clients/jedis/2.7.2/jedis-2.7.2.jar ...
[SUCCESSFUL ] redis.clients#jedis;2.7.2!jedis.jar (576ms)
downloading https://repo1.maven.org/maven2/org/apache/commons/commons-pool2/2.3/commons-pool2-2.3.jar ...
[SUCCESSFUL ] org.apache.commons#commons-pool2;2.3!commons-pool2.jar (150ms)
:: resolution report :: resolve 4586ms :: artifacts dl 1555ms
:: modules in use:
RedisLabs#spark-redis;0.3.2 from spark-packages in [default]
org.apache.commons#commons-pool2;2.3 from central in [default]
redis.clients#jedis;2.7.2 from central in [default]
---------------------------------------------------------------------
| | modules || artifacts |
| conf | number| search|dwnlded|evicted|| number|dwnlded|
---------------------------------------------------------------------
| default | 3 | 3 | 3 | 0 || 3 | 3 |
---------------------------------------------------------------------
```
Closes#24061 from HeartSaVioR/MINOR-use-https-to-bintray-repository.
Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
There is a race condition in the `ExecutorAllocationManager` that the `SparkListenerExecutorRemoved` event is posted before the `SparkListenerTaskStart` event, which will cause the incorrect result of `executorIds`. Then, when some executor idles, the real executors will be removed even actual executor number is equal to `minNumExecutors` due to the incorrect computation of `newExecutorTotal`(may greater than the `minNumExecutors`), thus may finally causing zero available executors but a wrong positive number of executorIds was kept in memory.
What's more, even the `SparkListenerTaskEnd` event can not make the fake `executorIds` released, because later idle event for the fake executors can not cause the real removal of these executors, as they are already removed and they are not exist in the `executorDataMap` of `CoaseGrainedSchedulerBackend`, so that the `onExecutorRemoved` method will never be called again.
For details see https://issues.apache.org/jira/browse/SPARK-26927
This PR is to fix this problem.
## How was this patch tested?
existUT and added UT
Closes#23842 from liupc/Fix-race-condition-that-casues-dyanmic-allocation-not-working.
Lead-authored-by: Liupengcheng <liupengcheng@xiaomi.com>
Co-authored-by: liupengcheng <liupengcheng@xiaomi.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
## What changes were proposed in this pull request?
SPARK-4105 added corruption detection in shuffle blocks but that was limited to blocks which are
smaller than maxBytesInFlight/3. This commit adds upon that by adding corruption check for large
blocks. There are two changes/improvements that are made in this commit:
1. Large blocks are checked upto maxBytesInFlight/3 size in a similar way as smaller blocks, so if a
large block is corrupt in the starting, that block will be re-fetched and if that also fails,
FetchFailureException will be thrown.
2. If large blocks are corrupt after size maxBytesInFlight/3, then any IOException thrown while
reading the stream will be converted to FetchFailureException. This is slightly more aggressive
than was originally intended but since the consumer of the stream may have already read some records and processed them, we can't just re-fetch the block, we need to fail the whole task. Additionally, we also thought about maybe adding a new type of TaskEndReason, which would re-try the task couple of times before failing the previous stage, but given the complexity involved in that solution we decided to not proceed in that direction.
Thanks to squito for direction and support.
## How was this patch tested?
Changed the junit test for big blocks to check for corruption.
Closes#23453 from ankuriitg/ankurgupta/SPARK-26089.
Authored-by: ankurgupta <ankur.gupta@cloudera.com>
Signed-off-by: Imran Rashid <irashid@cloudera.com>
## What changes were proposed in this pull request?
LEGACY_DRIVER_IDENTIFIER and its reference are removed.
corresponding references test are updated.
## How was this patch tested?
tested UT test cases
Closes#24026 from shivusondur/newjira2.
Authored-by: shivusondur <shivusondur@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
The change just use variable(_taskScheduler) instead of function(taskScheduler) to keep the format uniform in different situation.
## How was this patch tested?
Please review http://spark.apache.org/contributing.html before opening a pull request.
Closes#24048 from hddong/Use-variable-instead-of-function.
Authored-by: hongdongdong <hongdongdong@cmss.chinamobile.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
This PR proposes to have one base R runner.
In the high level,
Previously, it had `ArrowRRunner` and it inherited `RRunner`:
```
└── RRunner
└── ArrowRRunner
```
After this PR, now it has a `BaseRRunner`, and `ArrowRRunner` and `RRunner` inherit `BaseRRunner`:
```
└── BaseRRunner
├── ArrowRRunner
└── RRunner
```
This way is consistent with Python's.
In more details, see below:
```scala
class BaseRRunner[IN, OUT] {
def compute: Iterator[OUT] = {
...
newWriterThread(...).start()
...
newReaderIterator(...)
...
}
// Make a thread that writes data from JVM to R process
abstract protected def newWriterThread(..., iter: Iterator[IN], ...): WriterThread
// Make an iterator that reads data from the R process to JVM
abstract protected def newReaderIterator(...): ReaderIterator
abstract class WriterThread(..., iter: Iterator[IN], ...) extends Thread {
override def run(): Unit {
...
writeIteratorToStream(...)
...
}
// Actually writing logic to the socket stream.
abstract protected def writeIteratorToStream(dataOut: DataOutputStream): Unit
}
abstract class ReaderIterator extends Iterator[OUT] {
override def hasNext(): Boolean = {
...
read(...)
...
}
override def next(): OUT = {
...
hasNext()
...
}
// Actually reading logic from the socket stream.
abstract protected def read(...): OUT
}
}
```
```scala
case [Arrow]RRunner extends BaseRRunner {
override def newWriterThread(...) {
new WriterThread(...) {
override def writeIteratorToStream(...) {
...
}
}
}
override def newReaderIterator(...) {
new ReaderIterator(...) {
override def read(...) {
...
}
}
}
}
```
## How was this patch tested?
Manually tested and existing tests should cover.
Closes#23977 from HyukjinKwon/SPARK-26923.
Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
This code is from the era when Spark used an HTTP server to distribute
dependencies, which is long gone. Nowadays it only causes problems when
someone is using dependencies from an HTTP server with Spark auth on.
Closes#24033 from vanzin/SPARK-27004.
Authored-by: Marcelo Vanzin <vanzin@cloudera.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
## What changes were proposed in this pull request?
Environment tab in SparkUI do not have Hadoop Configuration sorted. All other tables in the same page like Spark Configrations, System Configuration etc are sorted by keys by default
## How was this patch tested?
Manually tested on SparkUI
Closes#24038 from ajithme/sqluisort.
Authored-by: Ajith <ajith2489@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
Web UI URLs are pointing to `http://` targets even if SSL is enabled. In this PR I've changed the code to point to `https://` URLs.
## How was this patch tested?
Existing unit tests + manually by starting standalone master/worker/spark-shell. Please see jira.
Closes#23991 from gaborgsomogyi/SPARK-24621.
Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
Currently, R's Scala codes happened to refer Python's Scala codes for code deduplications. It's a bit odd. For instance, when we face an exception from R, it shows python related code path, which makes confusing to debug. It should rather have one code base and R's and Python's should share.
This PR proposes:
1. Make a `SocketAuthServer` and move `PythonServer` so that `PythonRDD` and `RRDD` can share it.
2. Move `readRDDFromFile` and `readRDDFromInputStream` into `JavaRDD`.
3. Reuse `RAuthHelper` and remove `RSocketAuthHelper` in `RRDD`.
4. Rename `getEncryptionEnabled` to `isEncryptionEnabled` while I am here.
So, now, the places below:
- `sql/core/src/main/scala/org/apache/spark/sql/api/r`
- `core/src/main/scala/org/apache/spark/api/r`
- `mllib/src/main/scala/org/apache/spark/ml/r`
don't refer Python's Scala codes.
## How was this patch tested?
Existing tests should cover this.
Closes#24023 from HyukjinKwon/SPARK-27102.
Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
## What changes were proposed in this pull request?
Spark SQL performs whole-stage code generation to speed up query execution. There are two steps to it:
- Java source code is generated from the physical query plan on the driver. A single version of the source code is generated from a query plan, and sent to all executors.
- It's compiled to bytecode on the driver to catch compilation errors before sending to executors, but currently only the generated source code gets sent to the executors. The bytecode compilation is for fail-fast only.
- Executors receive the generated source code and compile to bytecode, then the query runs like a hand-written Java program.
In this model, there's an implicit assumption about the driver and executors being run on similar platforms. Some code paths accidentally embedded platform-dependent object layout information into the generated code, such as:
```java
Platform.putLong(buffer, /* offset */ 24, /* value */ 1);
```
This code expects a field to be at offset +24 of the `buffer` object, and sets a value to that field.
But whole-stage code generation generally uses platform-dependent information from the driver. If the object layout is significantly different on the driver and executors, the generated code can be reading/writing to wrong offsets on the executors, causing all kinds of data corruption.
One code pattern that leads to such problem is the use of `Platform.XXX` constants in generated code, e.g. `Platform.BYTE_ARRAY_OFFSET`.
Bad:
```scala
val baseOffset = Platform.BYTE_ARRAY_OFFSET
// codegen template:
s"Platform.putLong($buffer, $baseOffset, $value);"
```
This will embed the value of `Platform.BYTE_ARRAY_OFFSET` on the driver into the generated code.
Good:
```scala
val baseOffset = "Platform.BYTE_ARRAY_OFFSET"
// codegen template:
s"Platform.putLong($buffer, $baseOffset, $value);"
```
This will generate the offset symbolically -- `Platform.putLong(buffer, Platform.BYTE_ARRAY_OFFSET, value)`, which will be able to pick up the correct value on the executors.
Caveat: these offset constants are declared as runtime-initialized `static final` in Java, so they're not compile-time constants from the Java language's perspective. It does lead to a slightly increased size of the generated code, but this is necessary for correctness.
NOTE: there can be other patterns that generate platform-dependent code on the driver which is invalid on the executors. e.g. if the endianness is different between the driver and the executors, and if some generated code makes strong assumption about endianness, it would also be problematic.
## How was this patch tested?
Added a new test suite `WholeStageCodegenSparkSubmitSuite`. This test suite needs to set the driver's extraJavaOptions to force the driver and executor use different Java object layouts, so it's run as an actual SparkSubmit job.
Authored-by: Kris Mok <kris.mokdatabricks.com>
Closes#24031 from gatorsmile/cherrypickSPARK-27097.
Lead-authored-by: Kris Mok <kris.mok@databricks.com>
Co-authored-by: gatorsmile <gatorsmile@gmail.com>
Signed-off-by: DB Tsai <d_tsai@apple.com>
## What changes were proposed in this pull request?
This is another attempt to fix the more-than-one-active-task-set-managers bug.
https://github.com/apache/spark/pull/17208 is the first attempt. It marks the TSM as zombie before sending a task completion event to DAGScheduler. This is necessary, because when the DAGScheduler gets the task completion event, and it's for the last partition, then the stage is finished. However, if it's a shuffle stage and it has missing map outputs, DAGScheduler will resubmit it(see the [code](https://github.com/apache/spark/blob/v2.4.0/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1416-L1422)) and create a new TSM for this stage. This leads to more than one active TSM of a stage and fail.
This fix has a hole: Let's say a stage has 10 partitions and 2 task set managers: TSM1(zombie) and TSM2(active). TSM1 has a running task for partition 10 and it completes. TSM2 finishes tasks for partitions 1-9, and thinks he is still active because he hasn't finished partition 10 yet. However, DAGScheduler gets task completion events for all the 10 partitions and thinks the stage is finished. Then the same problem occurs: DAGScheduler may resubmit the stage and cause more than one actice TSM error.
https://github.com/apache/spark/pull/21131 fixed this hole by notifying all the task set managers when a task finishes. For the above case, TSM2 will know that partition 10 is already completed, so he can mark himself as zombie after partitions 1-9 are completed.
However, #21131 still has a hole: TSM2 may be created after the task from TSM1 is completed. Then TSM2 can't get notified about the task completion, and leads to the more than one active TSM error.
#22806 and #23871 are created to fix this hole. However the fix is complicated and there are still ongoing discussions.
This PR proposes a simple fix, which can be easy to backport: mark all existing task set managers as zombie when trying to create a new task set manager.
After this PR, #21131 is still necessary, to avoid launching unnecessary tasks and fix [SPARK-25250](https://issues.apache.org/jira/browse/SPARK-25250 ). #22806 and #23871 are its followups to fix the hole.
## How was this patch tested?
existing tests.
Closes#23927 from cloud-fan/scheduler.
Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Imran Rashid <irashid@cloudera.com>
## What changes were proposed in this pull request?
This is an optional solution for #22806 .
#21131 firstly implement that a previous successful completed task from zombie TaskSetManager could also succeed the active TaskSetManager, which based on an assumption that an active TaskSetManager always exists for that stage when this happen. But that's not always true as an active TaskSetManager may haven't been created when a previous task succeed, and this is the reason why #22806 hit the issue.
This pr extends #21131 's behavior by adding `stageIdToFinishedPartitions` into TaskSchedulerImpl, which recording the finished partition whenever a task(from zombie or active) succeed. Thus, a later created active TaskSetManager could also learn about the finished partition by looking into `stageIdToFinishedPartitions ` and won't launch any duplicate tasks.
## How was this patch tested?
Add.
Closes#23871 from Ngone51/dev-23433-25250.
Lead-authored-by: wuyi <ngone_5451@163.com>
Co-authored-by: Ngone51 <ngone_5451@163.com>
Signed-off-by: Imran Rashid <irashid@cloudera.com>
## What changes were proposed in this pull request?
Spelling mistake: forword -> forward
## How was this patch tested?
This is a private function, there is no place to call this function outside of this file.
Closes#23978 from moqimoqidea/master.
Authored-by: moqimoqidea <39821951+moqimoqidea@users.noreply.github.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
## What changes were proposed in this pull request?
Creating an Netty `EventLoopGroup` leads to creating a new Thread pool for handling the events. For stopping the threads of the pool the event loop group should be shut down which is properly done for transport servers and clients by calling for example the `shutdownGracefully()` method (for details see the `close()` method of `TransportClientFactory` and `TransportServer`). But there is a separate event loop group for shuffle chunk fetch requests which is in pipeline for handling fetch request (shared between the client and server) and owned by the `TransportContext` and this was never shut down.
## How was this patch tested?
With existing unittest.
This leak is in the production system too but its effect is spiking in the unittest.
Checking the core unittest logs before the PR:
```
$ grep "LEAK IN SUITE" unit-tests.log | grep -o shuffle-chunk-fetch-handler | wc -l
381
```
And after the PR without whitelisting in thread audit and with an extra `await` after the
` chunkFetchWorkers.shutdownGracefully()`:
```
$ grep "LEAK IN SUITE" unit-tests.log | grep -o shuffle-chunk-fetch-handler | wc -l
0
```
Closes#23930 from attilapiros/SPARK-27021.
Authored-by: “attilapiros” <piros.attila.zsolt@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
## What changes were proposed in this pull request?
This proposes to add instrumentation for the driver's JVM CPU time via the Spark Dropwizard/Codahale metrics system. It follows directly from previous work SPARK-25228 and shares similar motivations: it is intended as an improvement to be used for Spark performance dashboards and monitoring tools/instrumentation.
Implementation details: this PR takes the code introduced in SPARK-25228 and moves it to a new separate Source JVMCPUSource, which is then used to register the jvmCpuTime gauge metric for both executor and driver.
The registration of the jvmCpuTime metric for the driver is conditional, a new configuration parameter `spark.metrics.cpu.time.driver.enabled` (proposed default: false) is introduced for this purpose.
## How was this patch tested?
Manually tested, using local mode and using YARN.
Closes#23838 from LucaCanali/addCPUTimeMetricDriver.
Authored-by: Luca Canali <luca.canali@cern.ch>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
## What changes were proposed in this pull request?
After we cache a table, we can see its details in Storage Tab of spark UI. If the executor has shutdown ( graceful shutdown/ Dynamic executor scenario) UI still shows the rdd as cached and when we click the link it throws error. This is because on executor remove event, we fail to adjust rdd partition details org.apache.spark.status.AppStatusListener#onExecutorRemoved
## How was this patch tested?
Have tested this fix in UI manually
Edit: Added UT
Closes#23920 from ajithme/cachestorage.
Authored-by: Ajith <ajith2489@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
## What changes were proposed in this pull request?
Fasterxml Jackson version before 2.9.8 is affected by multiple [CVEs](https://github.com/FasterXML/jackson-databind/issues/2186), we need to fix bump the dependent Jackson to 2.9.8.
## How was this patch tested?
Existing tests and offline benchmark.
I have run ```SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain org.apache.spark.sql.execution.datasources.json.JSONBenchmark"``` to check there is no performance degradation for this upgrade.
Closes#23965 from yanboliang/SPARK-27051.
Authored-by: Yanbo Liang <ybliang8@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
## What changes were proposed in this pull request?
Clarify that text DataSource read/write, and RDD methods that read text, always use UTF-8 as they use Hadoop's implementation underneath. I think these are all the places that this needs a mention in the user-facing docs.
## How was this patch tested?
Doc tests.
Closes#23962 from srowen/SPARK-26016.
Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
## What changes were proposed in this pull request?
This pr makes it get proxy user's delegation token, otherwise throws `AccessControlException`([full log](https://issues.apache.org/jira/browse/SPARK-25689?focusedCommentId=16780609&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16780609)):
```java
org.apache.hadoop.security.AccessControlException: Client cannot authenticate via:[TOKEN, KERBEROS]
...
at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.waitForApplication(YarnClientSchedulerBackend.scala:95)
at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:62)
at org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:185)
```
How to reproduce this issue:
```shell
$ ssh user_admspark-getaway-host1
$ export HADOOP_PROXY_USER=user_a
$ spark-sql --master yarn
```
## How was this patch tested?
Test on our production environment.
Closes#23922 from wangyum/SPARK-25689.
Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
## What changes were proposed in this pull request?
Only memory usage without GC information could not help us to determinate the proper settings of memory. We need the GC metrics about frequency of major & minor GC. For example, two cases, their configured memory for executor are all 10GB and their usages are all near 10GB. So should we increase or decrease the configured memory for them? This metrics may be helpful. We can increase configured memory for the first one if it has very frequency major GC and decrease the second one if only some minor GC and none major GC.
GC metrics are only useful in entire lifetime of executors instead of separated stages.
## How was this patch tested?
Adding UT.
Closes#22874 from LantaoJin/SPARK-25865.
Authored-by: LantaoJin <jinlantao@gmail.com>
Signed-off-by: Imran Rashid <irashid@cloudera.com>