spark-instrumented-optimizer/core
Minchu Yang e5b01cd823 [SPARK-36705][FOLLOW-UP] Support the case when user's classes need to register for Kryo serialization
### What changes were proposed in this pull request?

- Make the val lazy wherever `isPushBasedShuffleEnabled` is invoked when it is a class instance variable, so it can happen after user-defined jars/classes in `spark.kryo.classesToRegister` are downloaded and available on executor-side, as part of the fix for the exception mentioned below.

- Add a flag `checkSerializer` to control whether we need to check a serializer is `supportsRelocationOfSerializedObjects` or not within `isPushBasedShuffleEnabled` as part of the fix for the exception mentioned below. Specifically, we don't check this in `registerWithExternalShuffleServer()` in `BlockManager` and `createLocalDirsForMergedShuffleBlocks()` in `DiskBlockManager.scala` as the same issue would raise otherwise.

- Move `instantiateClassFromConf` and `instantiateClass` from `SparkEnv` into `Utils`, in order to let `isPushBasedShuffleEnabled` to leverage them for instantiating serializer instances.

### Why are the changes needed?

When user tries to set classes for Kryo Serialization by `spark.kryo.classesToRegister`, below exception(or similar) would be encountered in `isPushBasedShuffleEnabled` as indicated below.
Reproduced the issue in our internal branch by launching spark-shell as:
```
spark-shell --spark-version 3.1.1 --packages ml.dmlc:xgboost4j_2.12:1.3.1 --conf spark.kryo.classesToRegister=ml.dmlc.xgboost4j.scala.Booster
```

```
Exception in thread "main" java.lang.reflect.UndeclaredThrowableException
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1911)
	at org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:61)
	at org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:393)
	at org.apache.spark.executor.YarnCoarseGrainedExecutorBackend$.main(YarnCoarseGrainedExecutorBackend.scala:83)
	at org.apache.spark.executor.YarnCoarseGrainedExecutorBackend.main(YarnCoarseGrainedExecutorBackend.scala)
Caused by: org.apache.spark.SparkException: Failed to register classes with Kryo
	at org.apache.spark.serializer.KryoSerializer.$anonfun$newKryo$5(KryoSerializer.scala:183)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:230)
	at org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:171)
	at org.apache.spark.serializer.KryoSerializer$$anon$1.create(KryoSerializer.scala:102)
	at com.esotericsoftware.kryo.pool.KryoPoolQueueImpl.borrow(KryoPoolQueueImpl.java:48)
	at org.apache.spark.serializer.KryoSerializer$PoolWrapper.borrow(KryoSerializer.scala:109)
	at org.apache.spark.serializer.KryoSerializerInstance.borrowKryo(KryoSerializer.scala:346)
	at org.apache.spark.serializer.KryoSerializerInstance.getAutoReset(KryoSerializer.scala:446)
	at org.apache.spark.serializer.KryoSerializer.supportsRelocationOfSerializedObjects$lzycompute(KryoSerializer.scala:253)
	at org.apache.spark.serializer.KryoSerializer.supportsRelocationOfSerializedObjects(KryoSerializer.scala:249)
	at org.apache.spark.util.Utils$.isPushBasedShuffleEnabled(Utils.scala:2584)
	at org.apache.spark.MapOutputTrackerWorker.<init>(MapOutputTracker.scala:1109)
	at org.apache.spark.SparkEnv$.create(SparkEnv.scala:322)
	at org.apache.spark.SparkEnv$.createExecutorEnv(SparkEnv.scala:205)
	at org.apache.spark.executor.CoarseGrainedExecutorBackend$.$anonfun$run$7(CoarseGrainedExecutorBackend.scala:442)
	at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:62)
	at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:61)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
	... 4 more
Caused by: java.lang.ClassNotFoundException: ml.dmlc.xgboost4j.scala.Booster
	at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
	at java.lang.Class.forName0(Native Method)
	at java.lang.Class.forName(Class.java:348)
	at org.apache.spark.util.Utils$.classForName(Utils.scala:217)
	at org.apache.spark.serializer.KryoSerializer.$anonfun$newKryo$6(KryoSerializer.scala:174)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.serializer.KryoSerializer.$anonfun$newKryo$5(KryoSerializer.scala:173)
	... 24 more
```
Registering user class for kryo serialization is happening after serializer creation in SparkEnv. Serializer creation can happen in `isPushBasedShuffleEnabled`, which can be called in some places prior to SparkEnv is created. Also, as per analysis by JoshRosen, this is probably due to Kryo instantiation was failing because added packages hadn't been downloaded to the executor yet (because this code is running during executor startup, not task startup). The proposed change helps fix this issue.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Passed existing tests.
Tested this patch in our internal branch where user reported the issue. Issue is now not reproducible with this patch.

Closes #34158 from rmcyang/SPARK-33781-bugFix.

Lead-authored-by: Minchu Yang <minyang@minyang-mn3.linkedin.biz>
Co-authored-by: Minchu Yang <31781684+rmcyang@users.noreply.github.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
2021-10-05 12:05:43 -05:00
..
benchmarks Revert "[SPARK-34309][BUILD][CORE][SQL][K8S] Use Caffeine instead of Guava Cache" 2021-08-22 09:36:15 +09:00
src [SPARK-36705][FOLLOW-UP] Support the case when user's classes need to register for Kryo serialization 2021-10-05 12:05:43 -05:00
pom.xml [SPARK-36712][BUILD] Make scala-parallel-collections in 2.13 POM a direct dependency (not in maven profile) 2021-09-13 11:06:50 -05:00